fix(deploy): 修复 CDC 全量同步问题

问题:
- CDC_CONSUMER_GROUPS 缺少阶段性消费者组,导致 full-reset 时
  未重置 contribution-service-cdc-phase-* 消费者组
- 当 Kafka topic 数据丢失时,无法触发 Debezium 重新快照

修复:
- 添加阶段性消费者组到 CDC_CONSUMER_GROUPS
- 添加 CDC_POSTGRES_CONNECTORS 列表
- 新增 cdc-resnapshot 命令,用于强制 Debezium 重新快照

使用方法:
- ./deploy-mining.sh full-reset      # 完整重置
- ./deploy-mining.sh cdc-resnapshot  # Kafka 数据丢失时触发重新快照

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-20 07:38:21 -08:00
parent 5668de0a58
commit 60f2c29ad8
1 changed files with 124 additions and 0 deletions

View File

@ -25,6 +25,7 @@
# CDC & Sync:
# ./deploy-mining.sh sync-reset # Reset CDC consumer offsets to beginning
# ./deploy-mining.sh sync-status # Show CDC consumer group status
# ./deploy-mining.sh cdc-resnapshot # Force Debezium to re-snapshot (use when Kafka data lost)
#
# Full Reset (for development/testing):
# ./deploy-mining.sh full-reset # Complete reset: stop services, drop DBs, recreate, resync
@ -103,8 +104,13 @@ declare -A SERVICE_PORTS=(
)
# CDC Consumer Groups (all groups that need to be reset during full-reset)
# NOTE: contribution-service uses sequential phase consumption with separate consumer groups
# for each table (user_accounts, referral_relationships, planting_orders)
CDC_CONSUMER_GROUPS=(
"contribution-service-cdc-group"
"contribution-service-cdc-phase-user_accounts"
"contribution-service-cdc-phase-referral_relationships"
"contribution-service-cdc-phase-planting_orders"
"auth-service-cdc-group"
"mining-admin-service-cdc-group"
)
@ -119,6 +125,14 @@ OUTBOX_CONNECTORS=(
"mining-wallet-outbox-connector"
)
# Debezium CDC Postgres Connectors (for 1.0 -> 2.0 data sync)
# These connectors capture changes from 1.0 service databases
CDC_POSTGRES_CONNECTORS=(
"identity-postgres-connector"
"referral-postgres-connector"
"planting-postgres-connector"
)
# Debezium Connect URL (default port 8084 as mapped in docker-compose)
DEBEZIUM_CONNECT_URL="${DEBEZIUM_CONNECT_URL:-http://localhost:8084}"
@ -708,6 +722,111 @@ sync_reset() {
log_info "Run: ./deploy-mining.sh up contribution-service && ./deploy-mining.sh up auth-service"
}
# Trigger Debezium CDC connectors to re-snapshot
# This is needed when Kafka topic messages are deleted (due to retention or manual cleanup)
# and the connector needs to re-export all data from the source database
cdc_resnapshot() {
print_section "Triggering CDC Connectors Re-Snapshot"
local connect_url="$DEBEZIUM_CONNECT_URL"
# Check if Debezium Connect is available
if ! curl -s "$connect_url" &>/dev/null; then
log_error "Debezium Connect not available at $connect_url"
return 1
fi
echo -e "${YELLOW}WARNING: This will delete and recreate CDC Postgres connectors.${NC}"
echo -e "${YELLOW}All connectors will re-snapshot their source tables.${NC}"
echo ""
echo "Connectors to be re-created:"
for connector in "${CDC_POSTGRES_CONNECTORS[@]}"; do
echo " - $connector"
done
echo ""
read -p "Continue? (y/n): " confirm
if [ "$confirm" != "y" ]; then
log_warn "Aborted"
return 1
fi
# Stop CDC consumer services first
log_step "Stopping CDC consumer services..."
service_stop "contribution-service"
# Wait for consumer groups to become inactive
log_info "Waiting 10 seconds for consumers to disconnect..."
sleep 10
# Delete consumer groups to ensure fresh consumption
log_step "Deleting consumer groups..."
for group in "${CDC_CONSUMER_GROUPS[@]}"; do
log_info "Deleting consumer group: $group"
if docker ps --format '{{.Names}}' 2>/dev/null | grep -q "^${KAFKA_CONTAINER}$"; then
docker exec "$KAFKA_CONTAINER" kafka-consumer-groups --bootstrap-server localhost:9092 \
--delete --group "$group" 2>/dev/null && log_success "Deleted $group" || log_warn "Could not delete $group"
fi
done
# Clear processed_cdc_events table
log_step "Clearing processed CDC events..."
if run_psql "rwa_contribution" "TRUNCATE TABLE processed_cdc_events;" 2>/dev/null; then
log_success "Truncated processed_cdc_events in rwa_contribution"
else
log_warn "Could not truncate processed_cdc_events (table may not exist)"
fi
# For each CDC Postgres connector, save config, delete, and recreate
log_step "Re-creating CDC Postgres connectors..."
for connector in "${CDC_POSTGRES_CONNECTORS[@]}"; do
log_info "Processing connector: $connector"
# Get current config
local config
config=$(curl -s "$connect_url/connectors/$connector/config" 2>/dev/null)
if [ -z "$config" ] || echo "$config" | grep -q "error_code"; then
log_warn "Connector $connector not found or error getting config, skipping"
continue
fi
# Delete connector
log_info "Deleting connector: $connector"
curl -s -X DELETE "$connect_url/connectors/$connector" &>/dev/null
# Wait for deletion
sleep 2
# Recreate with the same config (Debezium will re-snapshot due to snapshot.mode=initial)
log_info "Recreating connector: $connector"
local result
result=$(curl -s -X POST "$connect_url/connectors" \
-H "Content-Type: application/json" \
-d "{\"name\":\"$connector\",\"config\":$config}" 2>/dev/null)
if echo "$result" | grep -q '"name"'; then
log_success "Recreated connector: $connector"
else
log_error "Failed to recreate connector $connector: $result"
fi
# Wait between connectors
sleep 3
done
# Wait for snapshots to complete
log_step "Waiting 30 seconds for Debezium snapshots to complete..."
sleep 30
# Start services
log_step "Starting CDC consumer services..."
service_start "contribution-service"
log_success "CDC re-snapshot completed!"
log_info "Monitor sync progress with: ./deploy-mining.sh sync-status"
}
sync_status() {
print_section "CDC Sync Status"
@ -1365,6 +1484,7 @@ show_help() {
echo -e "${BOLD}CDC / Sync Management:${NC}"
echo " sync-reset Reset CDC consumer to read from beginning"
echo " sync-status Show CDC consumer group status"
echo " cdc-resnapshot Force Debezium CDC connectors to re-snapshot ${YELLOW}(use when Kafka data lost)${NC}"
echo " outbox-register Register all Debezium outbox connectors"
echo " outbox-status Show outbox connector status"
echo " outbox-delete Delete all outbox connectors"
@ -1467,6 +1587,10 @@ main() {
sync-status)
sync_status
;;
cdc-resnapshot)
print_header
cdc_resnapshot
;;
# Outbox connector commands
outbox-register)