rwadurian/backend/services/scripts/debezium/register-connectors.sh

184 lines
6.6 KiB
Bash
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/bin/bash
# =============================================================================
# Debezium Connector Registration Script
# =============================================================================
# Usage: ./register-connectors.sh [--force]
#
# Environment variables:
# DEBEZIUM_CONNECT_URL - Debezium Connect REST URL (default: http://localhost:8083)
# POSTGRES_USER - Database user (default: rwa_user)
# POSTGRES_PASSWORD - Database password (default: rwa_secure_password)
# DEBEZIUM_DB_HOST - Database hostname (default: postgres)
#
# Options:
# --force Delete and re-register existing connectors (triggers new snapshot!)
# --1.0 Only register 1.0 system connectors
# --2.0 Only register 2.0 system outbox connectors
#
# This script registers all PostgreSQL connectors for CDC:
# - 1.0 系统: identity, planting, referral, wallet, authorization
# - 2.0 系统: auth, contribution, mining, trading, mining-wallet (outbox tables)
# =============================================================================
set -e
CONNECT_URL="${DEBEZIUM_CONNECT_URL:-http://localhost:8084}"
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
MAX_RETRIES=30
RETRY_INTERVAL=5
# Credentials for env var substitution in JSON configs
DB_USER="${POSTGRES_USER:-rwa_user}"
DB_PASSWORD="${POSTGRES_PASSWORD:-rwa_secure_password}"
DB_HOST="${DEBEZIUM_DB_HOST:-postgres}"
# Parse arguments
FORCE=false
REGISTER_1_0=true
REGISTER_2_0=true
for arg in "$@"; do
case $arg in
--force) FORCE=true ;;
--1.0) REGISTER_2_0=false ;;
--2.0) REGISTER_1_0=false ;;
esac
done
echo "=== Debezium Connector Registration ==="
echo "Connect URL: $CONNECT_URL"
echo "DB Host: $DB_HOST"
echo "DB User: $DB_USER"
echo "Force: $FORCE"
# Wait for Debezium Connect to be ready
echo ""
echo "Waiting for Debezium Connect REST API to be ready..."
for i in $(seq 1 $MAX_RETRIES); do
if curl -sf "$CONNECT_URL/connectors" > /dev/null 2>&1; then
echo "Debezium Connect REST API is ready!"
break
fi
if [ $i -eq $MAX_RETRIES ]; then
echo "ERROR: Debezium Connect REST API is not ready after $MAX_RETRIES attempts"
exit 1
fi
echo "Attempt $i/$MAX_RETRIES - waiting ${RETRY_INTERVAL}s..."
sleep $RETRY_INTERVAL
done
# Check existing connectors
echo ""
echo "Checking existing connectors..."
EXISTING=$(curl -sf "$CONNECT_URL/connectors" 2>/dev/null || echo "[]")
echo "Existing connectors: $EXISTING"
# Substitute environment variables in JSON config
# Replaces ${POSTGRES_USER:-...}, ${POSTGRES_PASSWORD:-...}, ${DEBEZIUM_DB_HOST:-...}
substitute_env_vars() {
local content="$1"
echo "$content" | \
sed "s|\${POSTGRES_USER:-[^}]*}|$DB_USER|g" | \
sed "s|\${POSTGRES_PASSWORD:-[^}]*}|$DB_PASSWORD|g" | \
sed "s|\${DEBEZIUM_DB_HOST:-[^}]*}|$DB_HOST|g"
}
# Function to register a connector from JSON file
register_connector() {
local json_file="$1"
local connector_name="$2"
if [ ! -f "$json_file" ]; then
echo "WARNING: $json_file not found, skipping..."
return
fi
# Skip if already exists (unless --force)
if echo "$EXISTING" | grep -q "$connector_name"; then
if [ "$FORCE" = true ]; then
echo ""
echo "Deleting existing $connector_name (--force)..."
curl -s -X DELETE "$CONNECT_URL/connectors/$connector_name"
sleep 2
else
echo " $connector_name already exists, skipping (use --force to re-register)"
return
fi
fi
echo ""
echo "Registering $connector_name..."
# Read JSON and substitute env vars
local json_content
json_content=$(substitute_env_vars "$(cat "$json_file")")
# Register connector (with retry for transient 404s during Kafka Connect initialization)
local attempt
for attempt in 1 2 3; do
RESULT=$(curl -s -X POST \
-H "Content-Type: application/json" \
-d "$json_content" \
"$CONNECT_URL/connectors")
if echo "$RESULT" | grep -q '"name"'; then
echo " OK: $connector_name registered"
break
fi
if [ $attempt -lt 3 ]; then
echo " Attempt $attempt failed, retrying in 5s..."
sleep 5
else
echo " FAILED after 3 attempts: $RESULT"
fi
done
# Check status
sleep 2
STATUS=$(curl -s "$CONNECT_URL/connectors/$connector_name/status" 2>/dev/null || echo "Status check failed")
echo " Status: $STATUS"
}
# =============================================================================
# 1.0 系统 Connectors (监听业务表)
# =============================================================================
if [ "$REGISTER_1_0" = true ]; then
echo ""
echo "=== Registering 1.0 System Connectors ==="
register_connector "$SCRIPT_DIR/identity-connector.json" "identity-postgres-connector"
register_connector "$SCRIPT_DIR/planting-connector.json" "planting-postgres-connector"
# [2026-02-27] 新增3171预种计划 CDC connector同 rwa_planting 库,独立 slot/publication/topic
register_connector "$SCRIPT_DIR/pre-planting-connector.json" "pre-planting-postgres-connector"
register_connector "$SCRIPT_DIR/referral-connector.json" "referral-postgres-connector"
register_connector "$SCRIPT_DIR/wallet-connector.json" "wallet-postgres-connector"
register_connector "$SCRIPT_DIR/authorization-connector.json" "authorization-postgres-connector"
fi
# =============================================================================
# 2.0 系统 Connectors (监听 outbox_events 表)
# =============================================================================
if [ "$REGISTER_2_0" = true ]; then
echo ""
echo "=== Registering 2.0 System Outbox Connectors ==="
register_connector "$SCRIPT_DIR/auth-outbox-connector.json" "auth-outbox-connector"
register_connector "$SCRIPT_DIR/contribution-outbox-connector.json" "contribution-outbox-connector"
register_connector "$SCRIPT_DIR/mining-outbox-connector.json" "mining-outbox-connector"
register_connector "$SCRIPT_DIR/trading-outbox-connector.json" "trading-outbox-connector"
register_connector "$SCRIPT_DIR/mining-wallet-outbox-connector.json" "mining-wallet-outbox-connector"
fi
# =============================================================================
# Summary
# =============================================================================
echo ""
echo "=== Registration Complete ==="
echo ""
FINAL=$(curl -s "$CONNECT_URL/connectors")
echo "Active connectors: $FINAL"
echo ""
echo "To verify status:"
echo " curl $CONNECT_URL/connectors/<connector-name>/status"