fix(deploy): 修复 deploy.sh Debezium connector 注册失败的问题
根因:就绪检查只检测 Jetty 根路径 /,但 Kafka Connect REST API (/connectors) 需要更长时间初始化,导致注册时返回 404。 - deploy.sh: 就绪检查改为 /connectors,等待时间从 60s 增加到 120s - deploy.sh: 删除 250 行硬编码的 connector 配置,改为调用 register-connectors.sh(单一数据源,JSON 文件) - register-connectors.sh: 同步修复就绪检查 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
6f668d69bd
commit
d575287713
|
|
@ -201,20 +201,21 @@ up() {
|
|||
log_info "Starting Debezium Connect..."
|
||||
docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" up -d debezium-connect
|
||||
|
||||
# Wait for Debezium Connect to be ready
|
||||
log_info "Waiting for Debezium Connect to be ready..."
|
||||
for i in {1..30}; do
|
||||
if curl -s http://localhost:8084/ > /dev/null 2>&1; then
|
||||
log_info "Debezium Connect is ready!"
|
||||
# Wait for Debezium Connect REST API to be fully ready
|
||||
# Note: check /connectors not / — Jetty responds on / before Kafka Connect REST is initialized
|
||||
log_info "Waiting for Debezium Connect REST API to be ready..."
|
||||
for i in {1..60}; do
|
||||
if curl -s http://localhost:8084/connectors > /dev/null 2>&1; then
|
||||
log_info "Debezium Connect REST API is ready!"
|
||||
break
|
||||
fi
|
||||
if [ $i -eq 30 ]; then
|
||||
log_warn "Debezium Connect not ready after 60s, continuing anyway..."
|
||||
if [ $i -eq 60 ]; then
|
||||
log_warn "Debezium Connect REST API not ready after 120s, continuing anyway..."
|
||||
fi
|
||||
sleep 2
|
||||
done
|
||||
|
||||
# Register Debezium connectors
|
||||
# Register Debezium connectors (using register-connectors.sh with JSON configs)
|
||||
register_debezium_connectors
|
||||
|
||||
# Start application services
|
||||
|
|
@ -228,258 +229,19 @@ up() {
|
|||
}
|
||||
|
||||
register_debezium_connectors() {
|
||||
log_info "Registering Debezium connectors..."
|
||||
local register_script="$SCRIPT_DIR/scripts/debezium/register-connectors.sh"
|
||||
|
||||
# Check existing connectors
|
||||
EXISTING=$(curl -s http://localhost:8084/connectors 2>/dev/null || echo "[]")
|
||||
if [ ! -f "$register_script" ]; then
|
||||
log_warn "register-connectors.sh not found at $register_script, skipping..."
|
||||
return
|
||||
fi
|
||||
|
||||
# Read database credentials from .env
|
||||
# Source .env for credentials, then call register script (only 1.0 connectors on this server)
|
||||
source "$ENV_FILE"
|
||||
|
||||
# Register identity-postgres-connector
|
||||
if echo "$EXISTING" | grep -q "identity-postgres-connector"; then
|
||||
log_info "identity-postgres-connector already registered"
|
||||
else
|
||||
log_info "Registering identity-postgres-connector..."
|
||||
IDENTITY_CONFIG='{
|
||||
"name": "identity-postgres-connector",
|
||||
"config": {
|
||||
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
|
||||
"tasks.max": "1",
|
||||
"database.hostname": "postgres",
|
||||
"database.port": "5432",
|
||||
"database.user": "'${POSTGRES_USER:-rwa_user}'",
|
||||
"database.password": "'${POSTGRES_PASSWORD:-rwa_secure_password}'",
|
||||
"database.dbname": "rwa_identity",
|
||||
"topic.prefix": "cdc.identity",
|
||||
"table.include.list": "public.user_accounts",
|
||||
"plugin.name": "pgoutput",
|
||||
"publication.name": "debezium_identity_publication",
|
||||
"publication.autocreate.mode": "filtered",
|
||||
"slot.name": "debezium_identity_slot",
|
||||
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
|
||||
"key.converter.schemas.enable": "false",
|
||||
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
|
||||
"value.converter.schemas.enable": "false",
|
||||
"transforms": "unwrap",
|
||||
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
|
||||
"transforms.unwrap.drop.tombstones": "true",
|
||||
"transforms.unwrap.delete.handling.mode": "rewrite",
|
||||
"transforms.unwrap.add.fields": "op,table,source.ts_ms",
|
||||
"heartbeat.interval.ms": "10000",
|
||||
"snapshot.mode": "initial",
|
||||
"decimal.handling.mode": "string",
|
||||
"time.precision.mode": "connect"
|
||||
}
|
||||
}'
|
||||
|
||||
RESULT=$(curl -s -X POST \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "$IDENTITY_CONFIG" \
|
||||
"http://localhost:8084/connectors" 2>/dev/null || echo "failed")
|
||||
|
||||
if echo "$RESULT" | grep -q "identity-postgres-connector"; then
|
||||
log_info "identity-postgres-connector registered successfully"
|
||||
else
|
||||
log_warn "Failed to register identity connector: $RESULT"
|
||||
fi
|
||||
fi
|
||||
|
||||
# Register referral-postgres-connector
|
||||
if echo "$EXISTING" | grep -q "referral-postgres-connector"; then
|
||||
log_info "referral-postgres-connector already registered"
|
||||
else
|
||||
log_info "Registering referral-postgres-connector..."
|
||||
REFERRAL_CONFIG='{
|
||||
"name": "referral-postgres-connector",
|
||||
"config": {
|
||||
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
|
||||
"tasks.max": "1",
|
||||
"database.hostname": "postgres",
|
||||
"database.port": "5432",
|
||||
"database.user": "'${POSTGRES_USER:-rwa_user}'",
|
||||
"database.password": "'${POSTGRES_PASSWORD:-rwa_secure_password}'",
|
||||
"database.dbname": "rwa_referral",
|
||||
"topic.prefix": "cdc.referral",
|
||||
"table.include.list": "public.referral_relationships",
|
||||
"plugin.name": "pgoutput",
|
||||
"publication.name": "debezium_referral_publication",
|
||||
"publication.autocreate.mode": "filtered",
|
||||
"slot.name": "debezium_referral_slot",
|
||||
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
|
||||
"key.converter.schemas.enable": "false",
|
||||
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
|
||||
"value.converter.schemas.enable": "false",
|
||||
"transforms": "unwrap",
|
||||
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
|
||||
"transforms.unwrap.drop.tombstones": "true",
|
||||
"transforms.unwrap.delete.handling.mode": "rewrite",
|
||||
"transforms.unwrap.add.fields": "op,table,source.ts_ms",
|
||||
"heartbeat.interval.ms": "10000",
|
||||
"snapshot.mode": "initial",
|
||||
"decimal.handling.mode": "string",
|
||||
"time.precision.mode": "connect"
|
||||
}
|
||||
}'
|
||||
|
||||
RESULT=$(curl -s -X POST \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "$REFERRAL_CONFIG" \
|
||||
"http://localhost:8084/connectors" 2>/dev/null || echo "failed")
|
||||
|
||||
if echo "$RESULT" | grep -q "referral-postgres-connector"; then
|
||||
log_info "referral-postgres-connector registered successfully"
|
||||
else
|
||||
log_warn "Failed to register referral connector: $RESULT"
|
||||
fi
|
||||
fi
|
||||
|
||||
# Register wallet-postgres-connector
|
||||
if echo "$EXISTING" | grep -q "wallet-postgres-connector"; then
|
||||
log_info "wallet-postgres-connector already registered"
|
||||
else
|
||||
log_info "Registering wallet-postgres-connector..."
|
||||
WALLET_CONFIG='{
|
||||
"name": "wallet-postgres-connector",
|
||||
"config": {
|
||||
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
|
||||
"tasks.max": "1",
|
||||
"database.hostname": "postgres",
|
||||
"database.port": "5432",
|
||||
"database.user": "'${POSTGRES_USER:-rwa_user}'",
|
||||
"database.password": "'${POSTGRES_PASSWORD:-rwa_secure_password}'",
|
||||
"database.dbname": "rwa_wallet",
|
||||
"topic.prefix": "cdc.wallet",
|
||||
"table.include.list": "public.wallet_accounts,public.withdrawal_orders,public.fiat_withdrawal_orders,public.wallet_ledger_entries",
|
||||
"plugin.name": "pgoutput",
|
||||
"publication.name": "debezium_wallet_publication",
|
||||
"publication.autocreate.mode": "filtered",
|
||||
"slot.name": "debezium_wallet_slot",
|
||||
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
|
||||
"key.converter.schemas.enable": "false",
|
||||
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
|
||||
"value.converter.schemas.enable": "false",
|
||||
"transforms": "unwrap",
|
||||
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
|
||||
"transforms.unwrap.drop.tombstones": "true",
|
||||
"transforms.unwrap.delete.handling.mode": "rewrite",
|
||||
"transforms.unwrap.add.fields": "op,table,source.ts_ms",
|
||||
"heartbeat.interval.ms": "10000",
|
||||
"snapshot.mode": "initial",
|
||||
"decimal.handling.mode": "string",
|
||||
"time.precision.mode": "connect"
|
||||
}
|
||||
}'
|
||||
|
||||
RESULT=$(curl -s -X POST \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "$WALLET_CONFIG" \
|
||||
"http://localhost:8084/connectors" 2>/dev/null || echo "failed")
|
||||
|
||||
if echo "$RESULT" | grep -q "wallet-postgres-connector"; then
|
||||
log_info "wallet-postgres-connector registered successfully"
|
||||
else
|
||||
log_warn "Failed to register wallet connector: $RESULT"
|
||||
fi
|
||||
fi
|
||||
|
||||
# Register planting-postgres-connector
|
||||
if echo "$EXISTING" | grep -q "planting-postgres-connector"; then
|
||||
log_info "planting-postgres-connector already registered"
|
||||
else
|
||||
log_info "Registering planting-postgres-connector..."
|
||||
PLANTING_CONFIG='{
|
||||
"name": "planting-postgres-connector",
|
||||
"config": {
|
||||
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
|
||||
"tasks.max": "1",
|
||||
"database.hostname": "postgres",
|
||||
"database.port": "5432",
|
||||
"database.user": "'${POSTGRES_USER:-rwa_user}'",
|
||||
"database.password": "'${POSTGRES_PASSWORD:-rwa_secure_password}'",
|
||||
"database.dbname": "rwa_planting",
|
||||
"topic.prefix": "cdc.planting",
|
||||
"table.include.list": "public.planting_orders,public.planting_positions,public.contract_signing_tasks,public.fund_allocations",
|
||||
"plugin.name": "pgoutput",
|
||||
"publication.name": "debezium_planting_publication",
|
||||
"publication.autocreate.mode": "filtered",
|
||||
"slot.name": "debezium_planting_slot",
|
||||
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
|
||||
"key.converter.schemas.enable": "false",
|
||||
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
|
||||
"value.converter.schemas.enable": "false",
|
||||
"transforms": "unwrap",
|
||||
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
|
||||
"transforms.unwrap.drop.tombstones": "true",
|
||||
"transforms.unwrap.delete.handling.mode": "rewrite",
|
||||
"transforms.unwrap.add.fields": "op,table,source.ts_ms",
|
||||
"heartbeat.interval.ms": "10000",
|
||||
"snapshot.mode": "initial",
|
||||
"decimal.handling.mode": "string",
|
||||
"time.precision.mode": "connect"
|
||||
}
|
||||
}'
|
||||
|
||||
RESULT=$(curl -s -X POST \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "$PLANTING_CONFIG" \
|
||||
"http://localhost:8084/connectors" 2>/dev/null || echo "failed")
|
||||
|
||||
if echo "$RESULT" | grep -q "planting-postgres-connector"; then
|
||||
log_info "planting-postgres-connector registered successfully"
|
||||
else
|
||||
log_warn "Failed to register planting connector: $RESULT"
|
||||
fi
|
||||
fi
|
||||
|
||||
# Register authorization-postgres-connector
|
||||
if echo "$EXISTING" | grep -q "authorization-postgres-connector"; then
|
||||
log_info "authorization-postgres-connector already registered"
|
||||
else
|
||||
log_info "Registering authorization-postgres-connector..."
|
||||
AUTHORIZATION_CONFIG='{
|
||||
"name": "authorization-postgres-connector",
|
||||
"config": {
|
||||
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
|
||||
"tasks.max": "1",
|
||||
"database.hostname": "postgres",
|
||||
"database.port": "5432",
|
||||
"database.user": "'${POSTGRES_USER:-rwa_user}'",
|
||||
"database.password": "'${POSTGRES_PASSWORD:-rwa_secure_password}'",
|
||||
"database.dbname": "rwa_authorization",
|
||||
"topic.prefix": "cdc.authorization",
|
||||
"table.include.list": "public.authorization_roles,public.monthly_assessments,public.system_accounts,public.system_account_ledgers",
|
||||
"plugin.name": "pgoutput",
|
||||
"publication.name": "debezium_authorization_publication",
|
||||
"publication.autocreate.mode": "filtered",
|
||||
"slot.name": "debezium_authorization_slot",
|
||||
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
|
||||
"key.converter.schemas.enable": "false",
|
||||
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
|
||||
"value.converter.schemas.enable": "false",
|
||||
"transforms": "unwrap",
|
||||
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
|
||||
"transforms.unwrap.drop.tombstones": "true",
|
||||
"transforms.unwrap.delete.handling.mode": "rewrite",
|
||||
"transforms.unwrap.add.fields": "op,table,source.ts_ms",
|
||||
"heartbeat.interval.ms": "10000",
|
||||
"snapshot.mode": "initial",
|
||||
"decimal.handling.mode": "string",
|
||||
"time.precision.mode": "connect"
|
||||
}
|
||||
}'
|
||||
|
||||
RESULT=$(curl -s -X POST \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "$AUTHORIZATION_CONFIG" \
|
||||
"http://localhost:8084/connectors" 2>/dev/null || echo "failed")
|
||||
|
||||
if echo "$RESULT" | grep -q "authorization-postgres-connector"; then
|
||||
log_info "authorization-postgres-connector registered successfully"
|
||||
else
|
||||
log_warn "Failed to register authorization connector: $RESULT"
|
||||
fi
|
||||
fi
|
||||
POSTGRES_PASSWORD="$POSTGRES_PASSWORD" \
|
||||
POSTGRES_USER="${POSTGRES_USER:-rwa_user}" \
|
||||
DEBEZIUM_CONNECT_URL="http://localhost:8084" \
|
||||
bash "$register_script" --1.0
|
||||
}
|
||||
|
||||
down() {
|
||||
|
|
|
|||
|
|
@ -53,14 +53,14 @@ echo "Force: $FORCE"
|
|||
|
||||
# Wait for Debezium Connect to be ready
|
||||
echo ""
|
||||
echo "Waiting for Debezium Connect to be ready..."
|
||||
echo "Waiting for Debezium Connect REST API to be ready..."
|
||||
for i in $(seq 1 $MAX_RETRIES); do
|
||||
if curl -s "$CONNECT_URL/" > /dev/null 2>&1; then
|
||||
echo "Debezium Connect is ready!"
|
||||
if curl -s "$CONNECT_URL/connectors" > /dev/null 2>&1; then
|
||||
echo "Debezium Connect REST API is ready!"
|
||||
break
|
||||
fi
|
||||
if [ $i -eq $MAX_RETRIES ]; then
|
||||
echo "ERROR: Debezium Connect is not ready after $MAX_RETRIES attempts"
|
||||
echo "ERROR: Debezium Connect REST API is not ready after $MAX_RETRIES attempts"
|
||||
exit 1
|
||||
fi
|
||||
echo "Attempt $i/$MAX_RETRIES - waiting ${RETRY_INTERVAL}s..."
|
||||
|
|
|
|||
Loading…
Reference in New Issue