From 6f668d69bdf0f932c0ed684e07e46c0f7b121092 Mon Sep 17 00:00:00 2001 From: hailin Date: Mon, 2 Feb 2026 10:16:07 -0800 Subject: [PATCH] =?UTF-8?q?fix(infra):=20=E6=B7=BB=E5=8A=A0=20Kafka=20?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E6=8C=81=E4=B9=85=E5=8C=96=E5=8D=B7=EF=BC=8C?= =?UTF-8?q?=E9=98=B2=E6=AD=A2=E5=AE=B9=E5=99=A8=E9=87=8D=E5=BB=BA=E5=90=8E?= =?UTF-8?q?=20Debezium=20connector=20=E4=B8=A2=E5=A4=B1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 根因:Kafka 没有配置数据卷,容器重建后 debezium_configs/offsets 等内部 topic 丢失,导致 connector 注册信息消失。新 snapshot 的 offset 与旧 processed_events 碰撞,CDC 数据被跳过。 - docker-compose.yml: 添加 kafka_data 命名卷挂载到 /var/lib/kafka/data - register-connectors.sh: 添加环境变量替换、幂等注册、--force/--1.0/--2.0 参数 Co-Authored-By: Claude Opus 4.5 --- backend/services/docker-compose.yml | 3 + .../scripts/debezium/register-connectors.sh | 123 ++++++++++++------ 2 files changed, 85 insertions(+), 41 deletions(-) diff --git a/backend/services/docker-compose.yml b/backend/services/docker-compose.yml index f2dc5512..bc403803 100644 --- a/backend/services/docker-compose.yml +++ b/backend/services/docker-compose.yml @@ -110,6 +110,7 @@ services: # Java timezone for confluentinc images KAFKA_OPTS: "-Duser.timezone=Asia/Shanghai" volumes: + - kafka_data:/var/lib/kafka/data - /etc/localtime:/etc/localtime:ro - /etc/timezone:/etc/timezone:ro - /usr/share/zoneinfo:/usr/share/zoneinfo:ro @@ -776,6 +777,8 @@ volumes: driver: local admin_uploads_data: driver: local + kafka_data: + driver: local # =========================================================================== # Networks diff --git a/backend/services/scripts/debezium/register-connectors.sh b/backend/services/scripts/debezium/register-connectors.sh index 1b65ea47..a64b2911 100644 --- a/backend/services/scripts/debezium/register-connectors.sh +++ b/backend/services/scripts/debezium/register-connectors.sh @@ -2,7 +2,18 @@ # ============================================================================= # Debezium Connector Registration Script # ============================================================================= -# Usage: ./register-connectors.sh +# Usage: ./register-connectors.sh [--force] +# +# Environment variables: +# DEBEZIUM_CONNECT_URL - Debezium Connect REST URL (default: http://localhost:8083) +# POSTGRES_USER - Database user (default: rwa_user) +# POSTGRES_PASSWORD - Database password (default: rwa_secure_password) +# DEBEZIUM_DB_HOST - Database hostname (default: postgres) +# +# Options: +# --force Delete and re-register existing connectors (triggers new snapshot!) +# --1.0 Only register 1.0 system connectors +# --2.0 Only register 2.0 system outbox connectors # # This script registers all PostgreSQL connectors for CDC: # - 1.0 系统: identity, planting, referral, wallet, authorization @@ -11,14 +22,34 @@ set -e -CONNECT_URL="${DEBEZIUM_CONNECT_URL:-http://localhost:8083}" +CONNECT_URL="${DEBEZIUM_CONNECT_URL:-http://localhost:8084}" SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" MAX_RETRIES=30 RETRY_INTERVAL=5 +# Credentials for env var substitution in JSON configs +DB_USER="${POSTGRES_USER:-rwa_user}" +DB_PASSWORD="${POSTGRES_PASSWORD:-rwa_secure_password}" +DB_HOST="${DEBEZIUM_DB_HOST:-postgres}" + +# Parse arguments +FORCE=false +REGISTER_1_0=true +REGISTER_2_0=true + +for arg in "$@"; do + case $arg in + --force) FORCE=true ;; + --1.0) REGISTER_2_0=false ;; + --2.0) REGISTER_1_0=false ;; + esac +done + echo "=== Debezium Connector Registration ===" echo "Connect URL: $CONNECT_URL" -echo "Script Dir: $SCRIPT_DIR" +echo "DB Host: $DB_HOST" +echo "DB User: $DB_USER" +echo "Force: $FORCE" # Wait for Debezium Connect to be ready echo "" @@ -42,6 +73,16 @@ echo "Checking existing connectors..." EXISTING=$(curl -s "$CONNECT_URL/connectors") echo "Existing connectors: $EXISTING" +# Substitute environment variables in JSON config +# Replaces ${POSTGRES_USER:-...}, ${POSTGRES_PASSWORD:-...}, ${DEBEZIUM_DB_HOST:-...} +substitute_env_vars() { + local content="$1" + echo "$content" | \ + sed "s|\${POSTGRES_USER:-[^}]*}|$DB_USER|g" | \ + sed "s|\${POSTGRES_PASSWORD:-[^}]*}|$DB_PASSWORD|g" | \ + sed "s|\${DEBEZIUM_DB_HOST:-[^}]*}|$DB_HOST|g" +} + # Function to register a connector from JSON file register_connector() { local json_file="$1" @@ -52,20 +93,30 @@ register_connector() { return fi + # Skip if already exists (unless --force) + if echo "$EXISTING" | grep -q "$connector_name"; then + if [ "$FORCE" = true ]; then + echo "" + echo "Deleting existing $connector_name (--force)..." + curl -s -X DELETE "$CONNECT_URL/connectors/$connector_name" + sleep 2 + else + echo " $connector_name already exists, skipping (use --force to re-register)" + return + fi + fi + echo "" echo "Registering $connector_name..." - # Delete existing connector if exists - if echo "$EXISTING" | grep -q "$connector_name"; then - echo " Deleting existing $connector_name..." - curl -s -X DELETE "$CONNECT_URL/connectors/$connector_name" - sleep 2 - fi + # Read JSON and substitute env vars + local json_content + json_content=$(substitute_env_vars "$(cat "$json_file")") # Register connector RESULT=$(curl -s -X POST \ -H "Content-Type: application/json" \ - -d @"$json_file" \ + -d "$json_content" \ "$CONNECT_URL/connectors") echo " Result: $RESULT" @@ -79,26 +130,30 @@ register_connector() { # ============================================================================= # 1.0 系统 Connectors (监听业务表) # ============================================================================= -echo "" -echo "=== Registering 1.0 System Connectors ===" +if [ "$REGISTER_1_0" = true ]; then + echo "" + echo "=== Registering 1.0 System Connectors ===" -register_connector "$SCRIPT_DIR/identity-connector.json" "identity-postgres-connector" -register_connector "$SCRIPT_DIR/planting-connector.json" "planting-postgres-connector" -register_connector "$SCRIPT_DIR/referral-connector.json" "referral-postgres-connector" -register_connector "$SCRIPT_DIR/wallet-connector.json" "wallet-postgres-connector" -register_connector "$SCRIPT_DIR/authorization-connector.json" "authorization-postgres-connector" + register_connector "$SCRIPT_DIR/identity-connector.json" "identity-postgres-connector" + register_connector "$SCRIPT_DIR/planting-connector.json" "planting-postgres-connector" + register_connector "$SCRIPT_DIR/referral-connector.json" "referral-postgres-connector" + register_connector "$SCRIPT_DIR/wallet-connector.json" "wallet-postgres-connector" + register_connector "$SCRIPT_DIR/authorization-connector.json" "authorization-postgres-connector" +fi # ============================================================================= # 2.0 系统 Connectors (监听 outbox_events 表) # ============================================================================= -echo "" -echo "=== Registering 2.0 System Outbox Connectors ===" +if [ "$REGISTER_2_0" = true ]; then + echo "" + echo "=== Registering 2.0 System Outbox Connectors ===" -register_connector "$SCRIPT_DIR/auth-outbox-connector.json" "auth-outbox-connector" -register_connector "$SCRIPT_DIR/contribution-outbox-connector.json" "contribution-outbox-connector" -register_connector "$SCRIPT_DIR/mining-outbox-connector.json" "mining-outbox-connector" -register_connector "$SCRIPT_DIR/trading-outbox-connector.json" "trading-outbox-connector" -register_connector "$SCRIPT_DIR/mining-wallet-outbox-connector.json" "mining-wallet-outbox-connector" + register_connector "$SCRIPT_DIR/auth-outbox-connector.json" "auth-outbox-connector" + register_connector "$SCRIPT_DIR/contribution-outbox-connector.json" "contribution-outbox-connector" + register_connector "$SCRIPT_DIR/mining-outbox-connector.json" "mining-outbox-connector" + register_connector "$SCRIPT_DIR/trading-outbox-connector.json" "trading-outbox-connector" + register_connector "$SCRIPT_DIR/mining-wallet-outbox-connector.json" "mining-wallet-outbox-connector" +fi # ============================================================================= # Summary @@ -106,22 +161,8 @@ register_connector "$SCRIPT_DIR/mining-wallet-outbox-connector.json" "mining-wal echo "" echo "=== Registration Complete ===" echo "" -echo "Kafka topics created:" +FINAL=$(curl -s "$CONNECT_URL/connectors") +echo "Active connectors: $FINAL" echo "" -echo "1.0 System (业务表 CDC):" -echo " - cdc.identity.public.user_accounts" -echo " - cdc.planting.public.planting_orders" -echo " - cdc.referral.public.referral_relationships" -echo " - cdc.wallet.public.wallet_accounts" -echo " - cdc.authorization.public.*" -echo "" -echo "2.0 System (Outbox 表 CDC):" -echo " - cdc.auth.outbox" -echo " - cdc.contribution.outbox" -echo " - cdc.mining.outbox" -echo " - cdc.trading.outbox" -echo " - cdc.mining-wallet.outbox" -echo "" -echo "To verify:" -echo " curl $CONNECT_URL/connectors" +echo "To verify status:" echo " curl $CONNECT_URL/connectors//status"