#!/bin/bash # ============================================================================= # Debezium Connector Registration Script # ============================================================================= # Usage: ./register-connectors.sh [--force] # # Environment variables: # DEBEZIUM_CONNECT_URL - Debezium Connect REST URL (default: http://localhost:8083) # POSTGRES_USER - Database user (default: rwa_user) # POSTGRES_PASSWORD - Database password (default: rwa_secure_password) # DEBEZIUM_DB_HOST - Database hostname (default: postgres) # # Options: # --force Delete and re-register existing connectors (triggers new snapshot!) # --1.0 Only register 1.0 system connectors # --2.0 Only register 2.0 system outbox connectors # # This script registers all PostgreSQL connectors for CDC: # - 1.0 系统: identity, planting, referral, wallet, authorization # - 2.0 系统: auth, contribution, mining, trading, mining-wallet (outbox tables) # ============================================================================= set -e CONNECT_URL="${DEBEZIUM_CONNECT_URL:-http://localhost:8084}" SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" MAX_RETRIES=30 RETRY_INTERVAL=5 # Credentials for env var substitution in JSON configs DB_USER="${POSTGRES_USER:-rwa_user}" DB_PASSWORD="${POSTGRES_PASSWORD:-rwa_secure_password}" DB_HOST="${DEBEZIUM_DB_HOST:-postgres}" # Parse arguments FORCE=false REGISTER_1_0=true REGISTER_2_0=true for arg in "$@"; do case $arg in --force) FORCE=true ;; --1.0) REGISTER_2_0=false ;; --2.0) REGISTER_1_0=false ;; esac done echo "=== Debezium Connector Registration ===" echo "Connect URL: $CONNECT_URL" echo "DB Host: $DB_HOST" echo "DB User: $DB_USER" echo "Force: $FORCE" # Wait for Debezium Connect to be ready echo "" echo "Waiting for Debezium Connect REST API to be ready..." for i in $(seq 1 $MAX_RETRIES); do if curl -sf "$CONNECT_URL/connectors" > /dev/null 2>&1; then echo "Debezium Connect REST API is ready!" break fi if [ $i -eq $MAX_RETRIES ]; then echo "ERROR: Debezium Connect REST API is not ready after $MAX_RETRIES attempts" exit 1 fi echo "Attempt $i/$MAX_RETRIES - waiting ${RETRY_INTERVAL}s..." sleep $RETRY_INTERVAL done # Check existing connectors echo "" echo "Checking existing connectors..." EXISTING=$(curl -sf "$CONNECT_URL/connectors" 2>/dev/null || echo "[]") echo "Existing connectors: $EXISTING" # Substitute environment variables in JSON config # Replaces ${POSTGRES_USER:-...}, ${POSTGRES_PASSWORD:-...}, ${DEBEZIUM_DB_HOST:-...} substitute_env_vars() { local content="$1" echo "$content" | \ sed "s|\${POSTGRES_USER:-[^}]*}|$DB_USER|g" | \ sed "s|\${POSTGRES_PASSWORD:-[^}]*}|$DB_PASSWORD|g" | \ sed "s|\${DEBEZIUM_DB_HOST:-[^}]*}|$DB_HOST|g" } # Function to register a connector from JSON file register_connector() { local json_file="$1" local connector_name="$2" if [ ! -f "$json_file" ]; then echo "WARNING: $json_file not found, skipping..." return fi # Skip if already exists (unless --force) if echo "$EXISTING" | grep -q "$connector_name"; then if [ "$FORCE" = true ]; then echo "" echo "Deleting existing $connector_name (--force)..." curl -s -X DELETE "$CONNECT_URL/connectors/$connector_name" sleep 2 else echo " $connector_name already exists, skipping (use --force to re-register)" return fi fi echo "" echo "Registering $connector_name..." # Read JSON and substitute env vars local json_content json_content=$(substitute_env_vars "$(cat "$json_file")") # Register connector (with retry for transient 404s during Kafka Connect initialization) local attempt for attempt in 1 2 3; do RESULT=$(curl -s -X POST \ -H "Content-Type: application/json" \ -d "$json_content" \ "$CONNECT_URL/connectors") if echo "$RESULT" | grep -q '"name"'; then echo " OK: $connector_name registered" break fi if [ $attempt -lt 3 ]; then echo " Attempt $attempt failed, retrying in 5s..." sleep 5 else echo " FAILED after 3 attempts: $RESULT" fi done # Check status sleep 2 STATUS=$(curl -s "$CONNECT_URL/connectors/$connector_name/status" 2>/dev/null || echo "Status check failed") echo " Status: $STATUS" } # ============================================================================= # 1.0 系统 Connectors (监听业务表) # ============================================================================= if [ "$REGISTER_1_0" = true ]; then echo "" echo "=== Registering 1.0 System Connectors ===" register_connector "$SCRIPT_DIR/identity-connector.json" "identity-postgres-connector" register_connector "$SCRIPT_DIR/planting-connector.json" "planting-postgres-connector" register_connector "$SCRIPT_DIR/referral-connector.json" "referral-postgres-connector" register_connector "$SCRIPT_DIR/wallet-connector.json" "wallet-postgres-connector" register_connector "$SCRIPT_DIR/authorization-connector.json" "authorization-postgres-connector" fi # ============================================================================= # 2.0 系统 Connectors (监听 outbox_events 表) # ============================================================================= if [ "$REGISTER_2_0" = true ]; then echo "" echo "=== Registering 2.0 System Outbox Connectors ===" register_connector "$SCRIPT_DIR/auth-outbox-connector.json" "auth-outbox-connector" register_connector "$SCRIPT_DIR/contribution-outbox-connector.json" "contribution-outbox-connector" register_connector "$SCRIPT_DIR/mining-outbox-connector.json" "mining-outbox-connector" register_connector "$SCRIPT_DIR/trading-outbox-connector.json" "trading-outbox-connector" register_connector "$SCRIPT_DIR/mining-wallet-outbox-connector.json" "mining-wallet-outbox-connector" fi # ============================================================================= # Summary # ============================================================================= echo "" echo "=== Registration Complete ===" echo "" FINAL=$(curl -s "$CONNECT_URL/connectors") echo "Active connectors: $FINAL" echo "" echo "To verify status:" echo " curl $CONNECT_URL/connectors//status"