fix(deploy): full-reset 时重建 1.0 source CDC connectors 以确保完整数据同步

问题:
  执行 deploy-mining.sh full-reset 后,identity-postgres-connector 等 1.0 source
  CDC connectors 保留了旧的 Debezium offset 和 replication slot,导致 Debezium
  认为 initial snapshot 已完成而跳过重新快照。如果 connector 的 JSON 配置文件中
  新增了表(如 identity-connector.json 中的 wallet_addresses),这些表的存量数据
  永远不会被捕获到 Kafka topic,2.0 服务无法通过 CDC 同步到这些数据。

  具体表现:Kafka topic cdc.identity.public.wallet_addresses 始终为空(0条消息),
  2.0 auth-service 的 synced_wallet_addresses 表为空,用户 KAVA 地址未同步。

修复:
  在 full_reset() 的 Step 4 中,除了处理 outbox connectors,还新增了对 1.0 source
  CDC connectors (identity/referral/planting) 的处理:
  1. 删除 connector(释放 replication slot)
  2. 发送 tombstone 清除 debezium_offsets topic 中的偏移量
  3. 调用 pg_drop_replication_slot() 删除 PostgreSQL replication slot

  新增 Step 9:从 scripts/debezium/*.json 配置文件重新注册 source connectors,
  因为 offset 和 slot 都已清除,snapshot.mode=initial 会触发全新的 initial snapshot,
  确保所有配置表(包括新增的 wallet_addresses)的存量数据都被灌入 Kafka topic。

  总步骤数从 18 调整为 19。

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-31 20:26:06 -08:00
parent 6bcc571453
commit 5b1f4c82e6
1 changed files with 107 additions and 19 deletions

View File

@ -1096,9 +1096,11 @@ full_reset() {
echo -e "${RED}${BOLD}║ - Recreate databases ║${NC}" echo -e "${RED}${BOLD}║ - Recreate databases ║${NC}"
echo -e "${RED}${BOLD}║ - Run migrations ║${NC}" echo -e "${RED}${BOLD}║ - Run migrations ║${NC}"
echo -e "${RED}${BOLD}║ - Reset CDC consumer offsets ║${NC}" echo -e "${RED}${BOLD}║ - Reset CDC consumer offsets ║${NC}"
echo -e "${RED}${BOLD}║ - Re-snapshot 1.0 source CDC connectors ║${NC}"
echo -e "${RED}${BOLD}║ - Restart services (will sync from 1.0) ║${NC}" echo -e "${RED}${BOLD}║ - Restart services (will sync from 1.0) ║${NC}"
echo -e "${RED}${BOLD}║ ║${NC}" echo -e "${RED}${BOLD}║ ║${NC}"
echo -e "${RED}${BOLD}║ This will NOT affect the 1.0 system in any way. ║${NC}" echo -e "${RED}${BOLD}║ 1.0 services are NOT affected, but source CDC connectors ║${NC}"
echo -e "${RED}${BOLD}║ will be briefly re-created for a fresh snapshot. ║${NC}"
echo -e "${RED}${BOLD}╚════════════════════════════════════════════════════════════╝${NC}" echo -e "${RED}${BOLD}╚════════════════════════════════════════════════════════════╝${NC}"
echo "" echo ""
read -p "Type 'RESET' to confirm: " confirm read -p "Type 'RESET' to confirm: " confirm
@ -1109,16 +1111,16 @@ full_reset() {
fi fi
echo "" echo ""
log_step "Step 1/18: Stopping 2.0 services..." log_step "Step 1/19: Stopping 2.0 services..."
for service in "${MINING_SERVICES[@]}"; do for service in "${MINING_SERVICES[@]}"; do
service_stop "$service" service_stop "$service"
done done
log_step "Step 2/18: Waiting for Kafka consumers to become inactive..." log_step "Step 2/19: Waiting for Kafka consumers to become inactive..."
log_info "Waiting 15 seconds for consumer group session timeout..." log_info "Waiting 15 seconds for consumer group session timeout..."
sleep 15 sleep 15
log_step "Step 3/18: Resetting CDC consumer offsets..." log_step "Step 3/19: Resetting CDC consumer offsets..."
# Reset offsets BEFORE migrations (which may start containers) # Reset offsets BEFORE migrations (which may start containers)
for group in "${CDC_CONSUMER_GROUPS[@]}"; do for group in "${CDC_CONSUMER_GROUPS[@]}"; do
log_info "Resetting consumer group: $group" log_info "Resetting consumer group: $group"
@ -1155,7 +1157,7 @@ full_reset() {
fi fi
done done
log_step "Step 4/18: Deleting Debezium outbox connectors, offsets, and Kafka topics..." log_step "Step 4/19: Deleting Debezium connectors, offsets, and Kafka topics..."
# Delete connectors BEFORE dropping databases to release replication slots # Delete connectors BEFORE dropping databases to release replication slots
local connectors=("auth-outbox-connector" "contribution-outbox-connector" "mining-outbox-connector" "trading-outbox-connector" "mining-wallet-outbox-connector") local connectors=("auth-outbox-connector" "contribution-outbox-connector" "mining-outbox-connector" "trading-outbox-connector" "mining-wallet-outbox-connector")
for connector in "${connectors[@]}"; do for connector in "${connectors[@]}"; do
@ -1252,17 +1254,67 @@ full_reset() {
fi fi
done done
log_step "Step 5/18: Dropping 2.0 databases..." # Also delete 1.0 source CDC connectors so they can be re-created with a fresh snapshot.
# Without this, connectors retain old Debezium offsets and skip re-snapshot, which means
# any tables added to the config after initial registration (e.g., wallet_addresses) will
# never have their existing data captured into Kafka topics.
log_info "Deleting 1.0 source CDC connectors for re-snapshot..."
for connector in "${CDC_POSTGRES_CONNECTORS[@]}"; do
log_info "Deleting source connector: $connector"
curl -s -X DELETE "$DEBEZIUM_CONNECT_URL/connectors/$connector" 2>/dev/null || true
done
sleep 3
# Clear source connector offsets (same tombstone approach as outbox connectors)
log_info "Clearing 1.0 source connector offsets..."
for connector in "${CDC_POSTGRES_CONNECTORS[@]}"; do
local topic_prefix=""
case "$connector" in
"identity-postgres-connector") topic_prefix="cdc.identity" ;;
"referral-postgres-connector") topic_prefix="cdc.referral" ;;
"planting-postgres-connector") topic_prefix="cdc.planting" ;;
esac
if [ -n "$topic_prefix" ]; then
local offset_key="[\"$connector\",{\"server\":\"$topic_prefix\"}]"
if printf '%s\t%s\n' "$offset_key" "__NULL__" | docker exec -i "$KAFKA_CONTAINER" kafka-console-producer \
--bootstrap-server localhost:9092 \
--topic "$offset_topic" \
--property parse.key=true \
--property "key.separator= " \
--property "null.marker=__NULL__" 2>/dev/null; then
log_success "Cleared offset for: $connector"
else
log_warn "Could not clear offset for $connector"
fi
fi
done
# Drop replication slots so new connectors will do a clean initial snapshot
log_info "Dropping 1.0 source connector replication slots..."
local source_slots=("debezium_identity_slot" "debezium_referral_slot" "debezium_planting_slot")
for slot in "${source_slots[@]}"; do
if run_psql "rwa_identity" "SELECT pg_drop_replication_slot('$slot');" 2>/dev/null; then
log_success "Dropped replication slot: $slot"
else
log_warn "Could not drop replication slot: $slot (may not exist or still active)"
fi
done
log_info "Waiting 5 seconds for offset and slot changes to take effect..."
sleep 5
log_step "Step 5/19: Dropping 2.0 databases..."
db_drop db_drop
log_step "Step 6/18: Creating 2.0 databases..." log_step "Step 6/19: Creating 2.0 databases..."
db_create db_create
log_step "Step 7/18: Running migrations..." log_step "Step 7/19: Running migrations..."
db_migrate db_migrate
# Stop any containers that were started during migration # Stop any containers that were started during migration
log_step "Step 8/18: Stopping containers and resetting CDC offsets again..." log_step "Step 8/19: Stopping containers and resetting CDC offsets again..."
log_info "Migration may have started CDC consumers, stopping them now..." log_info "Migration may have started CDC consumers, stopping them now..."
for service in "${MINING_SERVICES[@]}"; do for service in "${MINING_SERVICES[@]}"; do
docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" stop "$service" 2>/dev/null || true docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" stop "$service" 2>/dev/null || true
@ -1325,12 +1377,48 @@ full_reset() {
log_warn "Could not truncate processed_events in rwa_mining_admin (table may not exist yet)" log_warn "Could not truncate processed_events in rwa_mining_admin (table may not exist yet)"
fi fi
log_step "Step 9/18: Starting 2.0 services..." log_step "Step 9/19: Re-registering 1.0 source CDC connectors (fresh snapshot)..."
# Re-create source connectors using JSON config files.
# Since we cleared their offsets and dropped replication slots in Step 4,
# snapshot.mode=initial will trigger a full re-snapshot of all configured tables.
local scripts_dir="$SCRIPT_DIR/scripts/debezium"
for connector in "${CDC_POSTGRES_CONNECTORS[@]}"; do
local config_file=""
case "$connector" in
"identity-postgres-connector") config_file="$scripts_dir/identity-connector.json" ;;
"referral-postgres-connector") config_file="$scripts_dir/referral-connector.json" ;;
"planting-postgres-connector") config_file="$scripts_dir/planting-connector.json" ;;
esac
if [ -n "$config_file" ] && [ -f "$config_file" ]; then
log_info "Registering source connector: $connector"
local result
result=$(cat "$config_file" | envsubst | curl -s -X POST "$DEBEZIUM_CONNECT_URL/connectors" \
-H "Content-Type: application/json" \
-d @- 2>/dev/null)
if echo "$result" | grep -q '"name"'; then
log_success "Registered source connector: $connector"
else
log_warn "Failed to register source connector $connector: $result"
fi
else
log_warn "Config file not found for $connector, skipping"
fi
sleep 2
done
# Wait for Debezium snapshots to produce data to Kafka topics
log_info "Waiting 15 seconds for source connector snapshots to complete..."
sleep 15
log_step "Step 10/19: Starting 2.0 services..."
for service in "${MINING_SERVICES[@]}"; do for service in "${MINING_SERVICES[@]}"; do
service_start "$service" service_start "$service"
done done
log_step "Step 10/18: Waiting for contribution-service CDC sync to complete..." log_step "Step 11/19: Waiting for contribution-service CDC sync to complete..."
log_info "Waiting for contribution-service to complete CDC sync (user_accounts -> referral_relationships -> planting_orders)..." log_info "Waiting for contribution-service to complete CDC sync (user_accounts -> referral_relationships -> planting_orders)..."
# 等待 contribution-service 的 CDC 顺序同步完成 # 等待 contribution-service 的 CDC 顺序同步完成
@ -1372,12 +1460,12 @@ full_reset() {
log_info "You may need to wait longer or check: curl $cdc_sync_url" log_info "You may need to wait longer or check: curl $cdc_sync_url"
fi fi
log_step "Step 11/18: Registering Debezium outbox connectors..." log_step "Step 12/19: Registering Debezium outbox connectors..."
# Register outbox connectors AFTER services are running and have synced data # Register outbox connectors AFTER services are running and have synced data
# This ensures outbox_events tables exist and contain data to be captured # This ensures outbox_events tables exist and contain data to be captured
register_outbox_connectors || log_warn "Some connectors may not be registered" register_outbox_connectors || log_warn "Some connectors may not be registered"
log_step "Step 12/18: Publishing legacy users to mining-admin-service..." log_step "Step 13/19: Publishing legacy users to mining-admin-service..."
# 调用 auth-service API 发布所有旧用户事件到 outbox # 调用 auth-service API 发布所有旧用户事件到 outbox
# 这样 mining-admin-service 才能通过 Debezium 收到用户数据 # 这样 mining-admin-service 才能通过 Debezium 收到用户数据
local publish_url="http://localhost:3024/api/v2/admin/legacy-users/publish-all" local publish_url="http://localhost:3024/api/v2/admin/legacy-users/publish-all"
@ -1393,11 +1481,11 @@ full_reset() {
log_info "You may need to manually call: curl -X POST $publish_url" log_info "You may need to manually call: curl -X POST $publish_url"
fi fi
log_step "Step 13/18: Waiting for connectors to start capturing..." log_step "Step 14/19: Waiting for connectors to start capturing..."
log_info "Waiting 10 seconds for Debezium connectors to initialize..." log_info "Waiting 10 seconds for Debezium connectors to initialize..."
sleep 10 sleep 10
log_step "Step 14/18: Publishing contribution data to mining-admin-service..." log_step "Step 15/19: Publishing contribution data to mining-admin-service..."
# 调用 contribution-service API 发布所有算力账户事件到 outbox # 调用 contribution-service API 发布所有算力账户事件到 outbox
local contrib_publish_url="http://localhost:3020/api/v2/admin/contribution-accounts/publish-all" local contrib_publish_url="http://localhost:3020/api/v2/admin/contribution-accounts/publish-all"
local contrib_result local contrib_result
@ -1412,7 +1500,7 @@ full_reset() {
log_info "You may need to manually call: curl -X POST $contrib_publish_url" log_info "You may need to manually call: curl -X POST $contrib_publish_url"
fi fi
log_step "Step 15/18: Publishing referral relationships to mining-admin-service..." log_step "Step 16/19: Publishing referral relationships to mining-admin-service..."
# 调用 contribution-service API 发布所有推荐关系事件到 outbox # 调用 contribution-service API 发布所有推荐关系事件到 outbox
local referral_publish_url="http://localhost:3020/api/v2/admin/referrals/publish-all" local referral_publish_url="http://localhost:3020/api/v2/admin/referrals/publish-all"
local referral_result local referral_result
@ -1427,7 +1515,7 @@ full_reset() {
log_info "You may need to manually call: curl -X POST $referral_publish_url" log_info "You may need to manually call: curl -X POST $referral_publish_url"
fi fi
log_step "Step 16/18: Publishing adoption records to mining-admin-service..." log_step "Step 17/19: Publishing adoption records to mining-admin-service..."
# 调用 contribution-service API 发布所有认种记录事件到 outbox # 调用 contribution-service API 发布所有认种记录事件到 outbox
local adoption_publish_url="http://localhost:3020/api/v2/admin/adoptions/publish-all" local adoption_publish_url="http://localhost:3020/api/v2/admin/adoptions/publish-all"
local adoption_result local adoption_result
@ -1448,7 +1536,7 @@ full_reset() {
# - Calling publish-all again would cause duplicate records in mining-admin-service # - Calling publish-all again would cause duplicate records in mining-admin-service
# - See: contribution-calculation.service.ts -> publishContributionRecordEvents() # - See: contribution-calculation.service.ts -> publishContributionRecordEvents()
log_step "Step 17/18: Publishing network progress to mining-admin-service..." log_step "Step 18/19: Publishing network progress to mining-admin-service..."
# 调用 contribution-service API 发布全网进度事件到 outbox # 调用 contribution-service API 发布全网进度事件到 outbox
local progress_publish_url="http://localhost:3020/api/v2/admin/network-progress/publish" local progress_publish_url="http://localhost:3020/api/v2/admin/network-progress/publish"
local progress_result local progress_result
@ -1461,7 +1549,7 @@ full_reset() {
log_info "You may need to manually call: curl -X POST $progress_publish_url" log_info "You may need to manually call: curl -X POST $progress_publish_url"
fi fi
log_step "Step 18/18: Waiting for mining-admin-service to sync all data..." log_step "Step 19/19: Waiting for mining-admin-service to sync all data..."
# 等待 mining-admin-service 消费 outbox 事件 # 等待 mining-admin-service 消费 outbox 事件
log_info "Waiting 15 seconds for mining-admin-service to sync all data..." log_info "Waiting 15 seconds for mining-admin-service to sync all data..."
sleep 15 sleep 15