diff --git a/backend/services/deploy-mining.sh b/backend/services/deploy-mining.sh index fb1124c3..f91964e4 100755 --- a/backend/services/deploy-mining.sh +++ b/backend/services/deploy-mining.sh @@ -1096,9 +1096,11 @@ full_reset() { echo -e "${RED}${BOLD}║ - Recreate databases ║${NC}" echo -e "${RED}${BOLD}║ - Run migrations ║${NC}" echo -e "${RED}${BOLD}║ - Reset CDC consumer offsets ║${NC}" + echo -e "${RED}${BOLD}║ - Re-snapshot 1.0 source CDC connectors ║${NC}" echo -e "${RED}${BOLD}║ - Restart services (will sync from 1.0) ║${NC}" echo -e "${RED}${BOLD}║ ║${NC}" - echo -e "${RED}${BOLD}║ This will NOT affect the 1.0 system in any way. ║${NC}" + echo -e "${RED}${BOLD}║ 1.0 services are NOT affected, but source CDC connectors ║${NC}" + echo -e "${RED}${BOLD}║ will be briefly re-created for a fresh snapshot. ║${NC}" echo -e "${RED}${BOLD}╚════════════════════════════════════════════════════════════╝${NC}" echo "" read -p "Type 'RESET' to confirm: " confirm @@ -1109,16 +1111,16 @@ full_reset() { fi echo "" - log_step "Step 1/18: Stopping 2.0 services..." + log_step "Step 1/19: Stopping 2.0 services..." for service in "${MINING_SERVICES[@]}"; do service_stop "$service" done - log_step "Step 2/18: Waiting for Kafka consumers to become inactive..." + log_step "Step 2/19: Waiting for Kafka consumers to become inactive..." log_info "Waiting 15 seconds for consumer group session timeout..." sleep 15 - log_step "Step 3/18: Resetting CDC consumer offsets..." + log_step "Step 3/19: Resetting CDC consumer offsets..." # Reset offsets BEFORE migrations (which may start containers) for group in "${CDC_CONSUMER_GROUPS[@]}"; do log_info "Resetting consumer group: $group" @@ -1155,7 +1157,7 @@ full_reset() { fi done - log_step "Step 4/18: Deleting Debezium outbox connectors, offsets, and Kafka topics..." + log_step "Step 4/19: Deleting Debezium connectors, offsets, and Kafka topics..." # Delete connectors BEFORE dropping databases to release replication slots local connectors=("auth-outbox-connector" "contribution-outbox-connector" "mining-outbox-connector" "trading-outbox-connector" "mining-wallet-outbox-connector") for connector in "${connectors[@]}"; do @@ -1252,17 +1254,67 @@ full_reset() { fi done - log_step "Step 5/18: Dropping 2.0 databases..." + # Also delete 1.0 source CDC connectors so they can be re-created with a fresh snapshot. + # Without this, connectors retain old Debezium offsets and skip re-snapshot, which means + # any tables added to the config after initial registration (e.g., wallet_addresses) will + # never have their existing data captured into Kafka topics. + log_info "Deleting 1.0 source CDC connectors for re-snapshot..." + for connector in "${CDC_POSTGRES_CONNECTORS[@]}"; do + log_info "Deleting source connector: $connector" + curl -s -X DELETE "$DEBEZIUM_CONNECT_URL/connectors/$connector" 2>/dev/null || true + done + sleep 3 + + # Clear source connector offsets (same tombstone approach as outbox connectors) + log_info "Clearing 1.0 source connector offsets..." + for connector in "${CDC_POSTGRES_CONNECTORS[@]}"; do + local topic_prefix="" + case "$connector" in + "identity-postgres-connector") topic_prefix="cdc.identity" ;; + "referral-postgres-connector") topic_prefix="cdc.referral" ;; + "planting-postgres-connector") topic_prefix="cdc.planting" ;; + esac + + if [ -n "$topic_prefix" ]; then + local offset_key="[\"$connector\",{\"server\":\"$topic_prefix\"}]" + if printf '%s\t%s\n' "$offset_key" "__NULL__" | docker exec -i "$KAFKA_CONTAINER" kafka-console-producer \ + --bootstrap-server localhost:9092 \ + --topic "$offset_topic" \ + --property parse.key=true \ + --property "key.separator= " \ + --property "null.marker=__NULL__" 2>/dev/null; then + log_success "Cleared offset for: $connector" + else + log_warn "Could not clear offset for $connector" + fi + fi + done + + # Drop replication slots so new connectors will do a clean initial snapshot + log_info "Dropping 1.0 source connector replication slots..." + local source_slots=("debezium_identity_slot" "debezium_referral_slot" "debezium_planting_slot") + for slot in "${source_slots[@]}"; do + if run_psql "rwa_identity" "SELECT pg_drop_replication_slot('$slot');" 2>/dev/null; then + log_success "Dropped replication slot: $slot" + else + log_warn "Could not drop replication slot: $slot (may not exist or still active)" + fi + done + + log_info "Waiting 5 seconds for offset and slot changes to take effect..." + sleep 5 + + log_step "Step 5/19: Dropping 2.0 databases..." db_drop - log_step "Step 6/18: Creating 2.0 databases..." + log_step "Step 6/19: Creating 2.0 databases..." db_create - log_step "Step 7/18: Running migrations..." + log_step "Step 7/19: Running migrations..." db_migrate # Stop any containers that were started during migration - log_step "Step 8/18: Stopping containers and resetting CDC offsets again..." + log_step "Step 8/19: Stopping containers and resetting CDC offsets again..." log_info "Migration may have started CDC consumers, stopping them now..." for service in "${MINING_SERVICES[@]}"; do docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" stop "$service" 2>/dev/null || true @@ -1325,12 +1377,48 @@ full_reset() { log_warn "Could not truncate processed_events in rwa_mining_admin (table may not exist yet)" fi - log_step "Step 9/18: Starting 2.0 services..." + log_step "Step 9/19: Re-registering 1.0 source CDC connectors (fresh snapshot)..." + # Re-create source connectors using JSON config files. + # Since we cleared their offsets and dropped replication slots in Step 4, + # snapshot.mode=initial will trigger a full re-snapshot of all configured tables. + local scripts_dir="$SCRIPT_DIR/scripts/debezium" + for connector in "${CDC_POSTGRES_CONNECTORS[@]}"; do + local config_file="" + case "$connector" in + "identity-postgres-connector") config_file="$scripts_dir/identity-connector.json" ;; + "referral-postgres-connector") config_file="$scripts_dir/referral-connector.json" ;; + "planting-postgres-connector") config_file="$scripts_dir/planting-connector.json" ;; + esac + + if [ -n "$config_file" ] && [ -f "$config_file" ]; then + log_info "Registering source connector: $connector" + local result + result=$(cat "$config_file" | envsubst | curl -s -X POST "$DEBEZIUM_CONNECT_URL/connectors" \ + -H "Content-Type: application/json" \ + -d @- 2>/dev/null) + + if echo "$result" | grep -q '"name"'; then + log_success "Registered source connector: $connector" + else + log_warn "Failed to register source connector $connector: $result" + fi + else + log_warn "Config file not found for $connector, skipping" + fi + + sleep 2 + done + + # Wait for Debezium snapshots to produce data to Kafka topics + log_info "Waiting 15 seconds for source connector snapshots to complete..." + sleep 15 + + log_step "Step 10/19: Starting 2.0 services..." for service in "${MINING_SERVICES[@]}"; do service_start "$service" done - log_step "Step 10/18: Waiting for contribution-service CDC sync to complete..." + log_step "Step 11/19: Waiting for contribution-service CDC sync to complete..." log_info "Waiting for contribution-service to complete CDC sync (user_accounts -> referral_relationships -> planting_orders)..." # 等待 contribution-service 的 CDC 顺序同步完成 @@ -1372,12 +1460,12 @@ full_reset() { log_info "You may need to wait longer or check: curl $cdc_sync_url" fi - log_step "Step 11/18: Registering Debezium outbox connectors..." + log_step "Step 12/19: Registering Debezium outbox connectors..." # Register outbox connectors AFTER services are running and have synced data # This ensures outbox_events tables exist and contain data to be captured register_outbox_connectors || log_warn "Some connectors may not be registered" - log_step "Step 12/18: Publishing legacy users to mining-admin-service..." + log_step "Step 13/19: Publishing legacy users to mining-admin-service..." # 调用 auth-service API 发布所有旧用户事件到 outbox # 这样 mining-admin-service 才能通过 Debezium 收到用户数据 local publish_url="http://localhost:3024/api/v2/admin/legacy-users/publish-all" @@ -1393,11 +1481,11 @@ full_reset() { log_info "You may need to manually call: curl -X POST $publish_url" fi - log_step "Step 13/18: Waiting for connectors to start capturing..." + log_step "Step 14/19: Waiting for connectors to start capturing..." log_info "Waiting 10 seconds for Debezium connectors to initialize..." sleep 10 - log_step "Step 14/18: Publishing contribution data to mining-admin-service..." + log_step "Step 15/19: Publishing contribution data to mining-admin-service..." # 调用 contribution-service API 发布所有算力账户事件到 outbox local contrib_publish_url="http://localhost:3020/api/v2/admin/contribution-accounts/publish-all" local contrib_result @@ -1412,7 +1500,7 @@ full_reset() { log_info "You may need to manually call: curl -X POST $contrib_publish_url" fi - log_step "Step 15/18: Publishing referral relationships to mining-admin-service..." + log_step "Step 16/19: Publishing referral relationships to mining-admin-service..." # 调用 contribution-service API 发布所有推荐关系事件到 outbox local referral_publish_url="http://localhost:3020/api/v2/admin/referrals/publish-all" local referral_result @@ -1427,7 +1515,7 @@ full_reset() { log_info "You may need to manually call: curl -X POST $referral_publish_url" fi - log_step "Step 16/18: Publishing adoption records to mining-admin-service..." + log_step "Step 17/19: Publishing adoption records to mining-admin-service..." # 调用 contribution-service API 发布所有认种记录事件到 outbox local adoption_publish_url="http://localhost:3020/api/v2/admin/adoptions/publish-all" local adoption_result @@ -1448,7 +1536,7 @@ full_reset() { # - Calling publish-all again would cause duplicate records in mining-admin-service # - See: contribution-calculation.service.ts -> publishContributionRecordEvents() - log_step "Step 17/18: Publishing network progress to mining-admin-service..." + log_step "Step 18/19: Publishing network progress to mining-admin-service..." # 调用 contribution-service API 发布全网进度事件到 outbox local progress_publish_url="http://localhost:3020/api/v2/admin/network-progress/publish" local progress_result @@ -1461,7 +1549,7 @@ full_reset() { log_info "You may need to manually call: curl -X POST $progress_publish_url" fi - log_step "Step 18/18: Waiting for mining-admin-service to sync all data..." + log_step "Step 19/19: Waiting for mining-admin-service to sync all data..." # 等待 mining-admin-service 消费 outbox 事件 log_info "Waiting 15 seconds for mining-admin-service to sync all data..." sleep 15