diff --git a/backend/services/deploy-mining.sh b/backend/services/deploy-mining.sh index 2b2e137e..3cc6237e 100755 --- a/backend/services/deploy-mining.sh +++ b/backend/services/deploy-mining.sh @@ -25,6 +25,7 @@ # CDC & Sync: # ./deploy-mining.sh sync-reset # Reset CDC consumer offsets to beginning # ./deploy-mining.sh sync-status # Show CDC consumer group status +# ./deploy-mining.sh cdc-resnapshot # Force Debezium to re-snapshot (use when Kafka data lost) # # Full Reset (for development/testing): # ./deploy-mining.sh full-reset # Complete reset: stop services, drop DBs, recreate, resync @@ -103,8 +104,13 @@ declare -A SERVICE_PORTS=( ) # CDC Consumer Groups (all groups that need to be reset during full-reset) +# NOTE: contribution-service uses sequential phase consumption with separate consumer groups +# for each table (user_accounts, referral_relationships, planting_orders) CDC_CONSUMER_GROUPS=( "contribution-service-cdc-group" + "contribution-service-cdc-phase-user_accounts" + "contribution-service-cdc-phase-referral_relationships" + "contribution-service-cdc-phase-planting_orders" "auth-service-cdc-group" "mining-admin-service-cdc-group" ) @@ -119,6 +125,14 @@ OUTBOX_CONNECTORS=( "mining-wallet-outbox-connector" ) +# Debezium CDC Postgres Connectors (for 1.0 -> 2.0 data sync) +# These connectors capture changes from 1.0 service databases +CDC_POSTGRES_CONNECTORS=( + "identity-postgres-connector" + "referral-postgres-connector" + "planting-postgres-connector" +) + # Debezium Connect URL (default port 8084 as mapped in docker-compose) DEBEZIUM_CONNECT_URL="${DEBEZIUM_CONNECT_URL:-http://localhost:8084}" @@ -708,6 +722,111 @@ sync_reset() { log_info "Run: ./deploy-mining.sh up contribution-service && ./deploy-mining.sh up auth-service" } +# Trigger Debezium CDC connectors to re-snapshot +# This is needed when Kafka topic messages are deleted (due to retention or manual cleanup) +# and the connector needs to re-export all data from the source database +cdc_resnapshot() { + print_section "Triggering CDC Connectors Re-Snapshot" + + local connect_url="$DEBEZIUM_CONNECT_URL" + + # Check if Debezium Connect is available + if ! curl -s "$connect_url" &>/dev/null; then + log_error "Debezium Connect not available at $connect_url" + return 1 + fi + + echo -e "${YELLOW}WARNING: This will delete and recreate CDC Postgres connectors.${NC}" + echo -e "${YELLOW}All connectors will re-snapshot their source tables.${NC}" + echo "" + echo "Connectors to be re-created:" + for connector in "${CDC_POSTGRES_CONNECTORS[@]}"; do + echo " - $connector" + done + echo "" + read -p "Continue? (y/n): " confirm + + if [ "$confirm" != "y" ]; then + log_warn "Aborted" + return 1 + fi + + # Stop CDC consumer services first + log_step "Stopping CDC consumer services..." + service_stop "contribution-service" + + # Wait for consumer groups to become inactive + log_info "Waiting 10 seconds for consumers to disconnect..." + sleep 10 + + # Delete consumer groups to ensure fresh consumption + log_step "Deleting consumer groups..." + for group in "${CDC_CONSUMER_GROUPS[@]}"; do + log_info "Deleting consumer group: $group" + if docker ps --format '{{.Names}}' 2>/dev/null | grep -q "^${KAFKA_CONTAINER}$"; then + docker exec "$KAFKA_CONTAINER" kafka-consumer-groups --bootstrap-server localhost:9092 \ + --delete --group "$group" 2>/dev/null && log_success "Deleted $group" || log_warn "Could not delete $group" + fi + done + + # Clear processed_cdc_events table + log_step "Clearing processed CDC events..." + if run_psql "rwa_contribution" "TRUNCATE TABLE processed_cdc_events;" 2>/dev/null; then + log_success "Truncated processed_cdc_events in rwa_contribution" + else + log_warn "Could not truncate processed_cdc_events (table may not exist)" + fi + + # For each CDC Postgres connector, save config, delete, and recreate + log_step "Re-creating CDC Postgres connectors..." + for connector in "${CDC_POSTGRES_CONNECTORS[@]}"; do + log_info "Processing connector: $connector" + + # Get current config + local config + config=$(curl -s "$connect_url/connectors/$connector/config" 2>/dev/null) + + if [ -z "$config" ] || echo "$config" | grep -q "error_code"; then + log_warn "Connector $connector not found or error getting config, skipping" + continue + fi + + # Delete connector + log_info "Deleting connector: $connector" + curl -s -X DELETE "$connect_url/connectors/$connector" &>/dev/null + + # Wait for deletion + sleep 2 + + # Recreate with the same config (Debezium will re-snapshot due to snapshot.mode=initial) + log_info "Recreating connector: $connector" + local result + result=$(curl -s -X POST "$connect_url/connectors" \ + -H "Content-Type: application/json" \ + -d "{\"name\":\"$connector\",\"config\":$config}" 2>/dev/null) + + if echo "$result" | grep -q '"name"'; then + log_success "Recreated connector: $connector" + else + log_error "Failed to recreate connector $connector: $result" + fi + + # Wait between connectors + sleep 3 + done + + # Wait for snapshots to complete + log_step "Waiting 30 seconds for Debezium snapshots to complete..." + sleep 30 + + # Start services + log_step "Starting CDC consumer services..." + service_start "contribution-service" + + log_success "CDC re-snapshot completed!" + log_info "Monitor sync progress with: ./deploy-mining.sh sync-status" +} + sync_status() { print_section "CDC Sync Status" @@ -1365,6 +1484,7 @@ show_help() { echo -e "${BOLD}CDC / Sync Management:${NC}" echo " sync-reset Reset CDC consumer to read from beginning" echo " sync-status Show CDC consumer group status" + echo " cdc-resnapshot Force Debezium CDC connectors to re-snapshot ${YELLOW}(use when Kafka data lost)${NC}" echo " outbox-register Register all Debezium outbox connectors" echo " outbox-status Show outbox connector status" echo " outbox-delete Delete all outbox connectors" @@ -1467,6 +1587,10 @@ main() { sync-status) sync_status ;; + cdc-resnapshot) + print_header + cdc_resnapshot + ;; # Outbox connector commands outbox-register)