fix(infra): 添加 Kafka 数据持久化卷,防止容器重建后 Debezium connector 丢失
根因:Kafka 没有配置数据卷,容器重建后 debezium_configs/offsets 等内部 topic 丢失,导致 connector 注册信息消失。新 snapshot 的 offset 与旧 processed_events 碰撞,CDC 数据被跳过。 - docker-compose.yml: 添加 kafka_data 命名卷挂载到 /var/lib/kafka/data - register-connectors.sh: 添加环境变量替换、幂等注册、--force/--1.0/--2.0 参数 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
0f3d03d832
commit
6f668d69bd
|
|
@ -110,6 +110,7 @@ services:
|
|||
# Java timezone for confluentinc images
|
||||
KAFKA_OPTS: "-Duser.timezone=Asia/Shanghai"
|
||||
volumes:
|
||||
- kafka_data:/var/lib/kafka/data
|
||||
- /etc/localtime:/etc/localtime:ro
|
||||
- /etc/timezone:/etc/timezone:ro
|
||||
- /usr/share/zoneinfo:/usr/share/zoneinfo:ro
|
||||
|
|
@ -776,6 +777,8 @@ volumes:
|
|||
driver: local
|
||||
admin_uploads_data:
|
||||
driver: local
|
||||
kafka_data:
|
||||
driver: local
|
||||
|
||||
# ===========================================================================
|
||||
# Networks
|
||||
|
|
|
|||
|
|
@ -2,7 +2,18 @@
|
|||
# =============================================================================
|
||||
# Debezium Connector Registration Script
|
||||
# =============================================================================
|
||||
# Usage: ./register-connectors.sh
|
||||
# Usage: ./register-connectors.sh [--force]
|
||||
#
|
||||
# Environment variables:
|
||||
# DEBEZIUM_CONNECT_URL - Debezium Connect REST URL (default: http://localhost:8083)
|
||||
# POSTGRES_USER - Database user (default: rwa_user)
|
||||
# POSTGRES_PASSWORD - Database password (default: rwa_secure_password)
|
||||
# DEBEZIUM_DB_HOST - Database hostname (default: postgres)
|
||||
#
|
||||
# Options:
|
||||
# --force Delete and re-register existing connectors (triggers new snapshot!)
|
||||
# --1.0 Only register 1.0 system connectors
|
||||
# --2.0 Only register 2.0 system outbox connectors
|
||||
#
|
||||
# This script registers all PostgreSQL connectors for CDC:
|
||||
# - 1.0 系统: identity, planting, referral, wallet, authorization
|
||||
|
|
@ -11,14 +22,34 @@
|
|||
|
||||
set -e
|
||||
|
||||
CONNECT_URL="${DEBEZIUM_CONNECT_URL:-http://localhost:8083}"
|
||||
CONNECT_URL="${DEBEZIUM_CONNECT_URL:-http://localhost:8084}"
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
MAX_RETRIES=30
|
||||
RETRY_INTERVAL=5
|
||||
|
||||
# Credentials for env var substitution in JSON configs
|
||||
DB_USER="${POSTGRES_USER:-rwa_user}"
|
||||
DB_PASSWORD="${POSTGRES_PASSWORD:-rwa_secure_password}"
|
||||
DB_HOST="${DEBEZIUM_DB_HOST:-postgres}"
|
||||
|
||||
# Parse arguments
|
||||
FORCE=false
|
||||
REGISTER_1_0=true
|
||||
REGISTER_2_0=true
|
||||
|
||||
for arg in "$@"; do
|
||||
case $arg in
|
||||
--force) FORCE=true ;;
|
||||
--1.0) REGISTER_2_0=false ;;
|
||||
--2.0) REGISTER_1_0=false ;;
|
||||
esac
|
||||
done
|
||||
|
||||
echo "=== Debezium Connector Registration ==="
|
||||
echo "Connect URL: $CONNECT_URL"
|
||||
echo "Script Dir: $SCRIPT_DIR"
|
||||
echo "DB Host: $DB_HOST"
|
||||
echo "DB User: $DB_USER"
|
||||
echo "Force: $FORCE"
|
||||
|
||||
# Wait for Debezium Connect to be ready
|
||||
echo ""
|
||||
|
|
@ -42,6 +73,16 @@ echo "Checking existing connectors..."
|
|||
EXISTING=$(curl -s "$CONNECT_URL/connectors")
|
||||
echo "Existing connectors: $EXISTING"
|
||||
|
||||
# Substitute environment variables in JSON config
|
||||
# Replaces ${POSTGRES_USER:-...}, ${POSTGRES_PASSWORD:-...}, ${DEBEZIUM_DB_HOST:-...}
|
||||
substitute_env_vars() {
|
||||
local content="$1"
|
||||
echo "$content" | \
|
||||
sed "s|\${POSTGRES_USER:-[^}]*}|$DB_USER|g" | \
|
||||
sed "s|\${POSTGRES_PASSWORD:-[^}]*}|$DB_PASSWORD|g" | \
|
||||
sed "s|\${DEBEZIUM_DB_HOST:-[^}]*}|$DB_HOST|g"
|
||||
}
|
||||
|
||||
# Function to register a connector from JSON file
|
||||
register_connector() {
|
||||
local json_file="$1"
|
||||
|
|
@ -52,20 +93,30 @@ register_connector() {
|
|||
return
|
||||
fi
|
||||
|
||||
# Skip if already exists (unless --force)
|
||||
if echo "$EXISTING" | grep -q "$connector_name"; then
|
||||
if [ "$FORCE" = true ]; then
|
||||
echo ""
|
||||
echo "Deleting existing $connector_name (--force)..."
|
||||
curl -s -X DELETE "$CONNECT_URL/connectors/$connector_name"
|
||||
sleep 2
|
||||
else
|
||||
echo " $connector_name already exists, skipping (use --force to re-register)"
|
||||
return
|
||||
fi
|
||||
fi
|
||||
|
||||
echo ""
|
||||
echo "Registering $connector_name..."
|
||||
|
||||
# Delete existing connector if exists
|
||||
if echo "$EXISTING" | grep -q "$connector_name"; then
|
||||
echo " Deleting existing $connector_name..."
|
||||
curl -s -X DELETE "$CONNECT_URL/connectors/$connector_name"
|
||||
sleep 2
|
||||
fi
|
||||
# Read JSON and substitute env vars
|
||||
local json_content
|
||||
json_content=$(substitute_env_vars "$(cat "$json_file")")
|
||||
|
||||
# Register connector
|
||||
RESULT=$(curl -s -X POST \
|
||||
-H "Content-Type: application/json" \
|
||||
-d @"$json_file" \
|
||||
-d "$json_content" \
|
||||
"$CONNECT_URL/connectors")
|
||||
|
||||
echo " Result: $RESULT"
|
||||
|
|
@ -79,26 +130,30 @@ register_connector() {
|
|||
# =============================================================================
|
||||
# 1.0 系统 Connectors (监听业务表)
|
||||
# =============================================================================
|
||||
echo ""
|
||||
echo "=== Registering 1.0 System Connectors ==="
|
||||
if [ "$REGISTER_1_0" = true ]; then
|
||||
echo ""
|
||||
echo "=== Registering 1.0 System Connectors ==="
|
||||
|
||||
register_connector "$SCRIPT_DIR/identity-connector.json" "identity-postgres-connector"
|
||||
register_connector "$SCRIPT_DIR/planting-connector.json" "planting-postgres-connector"
|
||||
register_connector "$SCRIPT_DIR/referral-connector.json" "referral-postgres-connector"
|
||||
register_connector "$SCRIPT_DIR/wallet-connector.json" "wallet-postgres-connector"
|
||||
register_connector "$SCRIPT_DIR/authorization-connector.json" "authorization-postgres-connector"
|
||||
register_connector "$SCRIPT_DIR/identity-connector.json" "identity-postgres-connector"
|
||||
register_connector "$SCRIPT_DIR/planting-connector.json" "planting-postgres-connector"
|
||||
register_connector "$SCRIPT_DIR/referral-connector.json" "referral-postgres-connector"
|
||||
register_connector "$SCRIPT_DIR/wallet-connector.json" "wallet-postgres-connector"
|
||||
register_connector "$SCRIPT_DIR/authorization-connector.json" "authorization-postgres-connector"
|
||||
fi
|
||||
|
||||
# =============================================================================
|
||||
# 2.0 系统 Connectors (监听 outbox_events 表)
|
||||
# =============================================================================
|
||||
echo ""
|
||||
echo "=== Registering 2.0 System Outbox Connectors ==="
|
||||
if [ "$REGISTER_2_0" = true ]; then
|
||||
echo ""
|
||||
echo "=== Registering 2.0 System Outbox Connectors ==="
|
||||
|
||||
register_connector "$SCRIPT_DIR/auth-outbox-connector.json" "auth-outbox-connector"
|
||||
register_connector "$SCRIPT_DIR/contribution-outbox-connector.json" "contribution-outbox-connector"
|
||||
register_connector "$SCRIPT_DIR/mining-outbox-connector.json" "mining-outbox-connector"
|
||||
register_connector "$SCRIPT_DIR/trading-outbox-connector.json" "trading-outbox-connector"
|
||||
register_connector "$SCRIPT_DIR/mining-wallet-outbox-connector.json" "mining-wallet-outbox-connector"
|
||||
register_connector "$SCRIPT_DIR/auth-outbox-connector.json" "auth-outbox-connector"
|
||||
register_connector "$SCRIPT_DIR/contribution-outbox-connector.json" "contribution-outbox-connector"
|
||||
register_connector "$SCRIPT_DIR/mining-outbox-connector.json" "mining-outbox-connector"
|
||||
register_connector "$SCRIPT_DIR/trading-outbox-connector.json" "trading-outbox-connector"
|
||||
register_connector "$SCRIPT_DIR/mining-wallet-outbox-connector.json" "mining-wallet-outbox-connector"
|
||||
fi
|
||||
|
||||
# =============================================================================
|
||||
# Summary
|
||||
|
|
@ -106,22 +161,8 @@ register_connector "$SCRIPT_DIR/mining-wallet-outbox-connector.json" "mining-wal
|
|||
echo ""
|
||||
echo "=== Registration Complete ==="
|
||||
echo ""
|
||||
echo "Kafka topics created:"
|
||||
FINAL=$(curl -s "$CONNECT_URL/connectors")
|
||||
echo "Active connectors: $FINAL"
|
||||
echo ""
|
||||
echo "1.0 System (业务表 CDC):"
|
||||
echo " - cdc.identity.public.user_accounts"
|
||||
echo " - cdc.planting.public.planting_orders"
|
||||
echo " - cdc.referral.public.referral_relationships"
|
||||
echo " - cdc.wallet.public.wallet_accounts"
|
||||
echo " - cdc.authorization.public.*"
|
||||
echo ""
|
||||
echo "2.0 System (Outbox 表 CDC):"
|
||||
echo " - cdc.auth.outbox"
|
||||
echo " - cdc.contribution.outbox"
|
||||
echo " - cdc.mining.outbox"
|
||||
echo " - cdc.trading.outbox"
|
||||
echo " - cdc.mining-wallet.outbox"
|
||||
echo ""
|
||||
echo "To verify:"
|
||||
echo " curl $CONNECT_URL/connectors"
|
||||
echo "To verify status:"
|
||||
echo " curl $CONNECT_URL/connectors/<connector-name>/status"
|
||||
|
|
|
|||
Loading…
Reference in New Issue