fix(debezium): 修复 authorization connector 凭据和注册重试逻辑
- authorization-connector.json: 将硬编码的 debezium/debezium_password
改为 ${POSTGRES_USER}/${POSTGRES_PASSWORD} 占位符,与其他 connector 一致
- register-connectors.sh: 添加 3 次重试逻辑,应对 Kafka Connect REST API
初始化期间的间歇性 404
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
5b3c391340
commit
d8ef156b5e
|
|
@ -2,26 +2,39 @@
|
||||||
"name": "authorization-postgres-connector",
|
"name": "authorization-postgres-connector",
|
||||||
"config": {
|
"config": {
|
||||||
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
|
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
|
||||||
"database.hostname": "postgres",
|
"tasks.max": "1",
|
||||||
|
|
||||||
|
"database.hostname": "rwa-postgres",
|
||||||
"database.port": "5432",
|
"database.port": "5432",
|
||||||
"database.user": "debezium",
|
"database.user": "${POSTGRES_USER:-rwa_user}",
|
||||||
"database.password": "debezium_password",
|
"database.password": "${POSTGRES_PASSWORD:-rwa_secure_password}",
|
||||||
"database.dbname": "rwa_authorization",
|
"database.dbname": "rwa_authorization",
|
||||||
"database.server.name": "authorization",
|
|
||||||
"topic.prefix": "cdc.authorization",
|
"topic.prefix": "cdc.authorization",
|
||||||
"plugin.name": "pgoutput",
|
|
||||||
"publication.name": "authorization_cdc_publication",
|
|
||||||
"slot.name": "authorization_cdc_slot",
|
|
||||||
"table.include.list": "public.authorization_roles,public.monthly_assessments,public.system_accounts,public.system_account_ledgers,public.benefit_assessment_records",
|
"table.include.list": "public.authorization_roles,public.monthly_assessments,public.system_accounts,public.system_account_ledgers,public.benefit_assessment_records",
|
||||||
"transforms": "unwrap",
|
|
||||||
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
|
"plugin.name": "pgoutput",
|
||||||
"transforms.unwrap.add.fields": "op,table,source.ts_ms",
|
"publication.name": "debezium_authorization_publication",
|
||||||
"transforms.unwrap.delete.handling.mode": "rewrite",
|
"publication.autocreate.mode": "filtered",
|
||||||
|
|
||||||
|
"slot.name": "debezium_authorization_slot",
|
||||||
|
|
||||||
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
|
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
|
||||||
"key.converter.schemas.enable": "false",
|
"key.converter.schemas.enable": "false",
|
||||||
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
|
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
|
||||||
"value.converter.schemas.enable": "false",
|
"value.converter.schemas.enable": "false",
|
||||||
|
|
||||||
|
"transforms": "unwrap",
|
||||||
|
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
|
||||||
|
"transforms.unwrap.drop.tombstones": "true",
|
||||||
|
"transforms.unwrap.delete.handling.mode": "rewrite",
|
||||||
|
"transforms.unwrap.add.fields": "op,table,source.ts_ms",
|
||||||
|
|
||||||
|
"heartbeat.interval.ms": "10000",
|
||||||
|
|
||||||
"snapshot.mode": "initial",
|
"snapshot.mode": "initial",
|
||||||
|
|
||||||
"decimal.handling.mode": "string",
|
"decimal.handling.mode": "string",
|
||||||
"time.precision.mode": "connect"
|
"time.precision.mode": "connect"
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -113,13 +113,26 @@ register_connector() {
|
||||||
local json_content
|
local json_content
|
||||||
json_content=$(substitute_env_vars "$(cat "$json_file")")
|
json_content=$(substitute_env_vars "$(cat "$json_file")")
|
||||||
|
|
||||||
# Register connector
|
# Register connector (with retry for transient 404s during Kafka Connect initialization)
|
||||||
RESULT=$(curl -s -X POST \
|
local attempt
|
||||||
-H "Content-Type: application/json" \
|
for attempt in 1 2 3; do
|
||||||
-d "$json_content" \
|
RESULT=$(curl -s -X POST \
|
||||||
"$CONNECT_URL/connectors")
|
-H "Content-Type: application/json" \
|
||||||
|
-d "$json_content" \
|
||||||
|
"$CONNECT_URL/connectors")
|
||||||
|
|
||||||
echo " Result: $RESULT"
|
if echo "$RESULT" | grep -q '"name"'; then
|
||||||
|
echo " OK: $connector_name registered"
|
||||||
|
break
|
||||||
|
fi
|
||||||
|
|
||||||
|
if [ $attempt -lt 3 ]; then
|
||||||
|
echo " Attempt $attempt failed, retrying in 5s..."
|
||||||
|
sleep 5
|
||||||
|
else
|
||||||
|
echo " FAILED after 3 attempts: $RESULT"
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
|
||||||
# Check status
|
# Check status
|
||||||
sleep 2
|
sleep 2
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue