diff --git a/backend/services/scripts/debezium/authorization-connector.json b/backend/services/scripts/debezium/authorization-connector.json index 781498d9..5169a93d 100644 --- a/backend/services/scripts/debezium/authorization-connector.json +++ b/backend/services/scripts/debezium/authorization-connector.json @@ -2,26 +2,39 @@ "name": "authorization-postgres-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", - "database.hostname": "postgres", + "tasks.max": "1", + + "database.hostname": "rwa-postgres", "database.port": "5432", - "database.user": "debezium", - "database.password": "debezium_password", + "database.user": "${POSTGRES_USER:-rwa_user}", + "database.password": "${POSTGRES_PASSWORD:-rwa_secure_password}", "database.dbname": "rwa_authorization", - "database.server.name": "authorization", + "topic.prefix": "cdc.authorization", - "plugin.name": "pgoutput", - "publication.name": "authorization_cdc_publication", - "slot.name": "authorization_cdc_slot", + "table.include.list": "public.authorization_roles,public.monthly_assessments,public.system_accounts,public.system_account_ledgers,public.benefit_assessment_records", - "transforms": "unwrap", - "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", - "transforms.unwrap.add.fields": "op,table,source.ts_ms", - "transforms.unwrap.delete.handling.mode": "rewrite", + + "plugin.name": "pgoutput", + "publication.name": "debezium_authorization_publication", + "publication.autocreate.mode": "filtered", + + "slot.name": "debezium_authorization_slot", + "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", + + "transforms": "unwrap", + "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", + "transforms.unwrap.drop.tombstones": "true", + "transforms.unwrap.delete.handling.mode": "rewrite", + "transforms.unwrap.add.fields": "op,table,source.ts_ms", + + "heartbeat.interval.ms": "10000", + "snapshot.mode": "initial", + "decimal.handling.mode": "string", "time.precision.mode": "connect" } diff --git a/backend/services/scripts/debezium/register-connectors.sh b/backend/services/scripts/debezium/register-connectors.sh index 7b99925a..6d8e9d66 100644 --- a/backend/services/scripts/debezium/register-connectors.sh +++ b/backend/services/scripts/debezium/register-connectors.sh @@ -113,13 +113,26 @@ register_connector() { local json_content json_content=$(substitute_env_vars "$(cat "$json_file")") - # Register connector - RESULT=$(curl -s -X POST \ - -H "Content-Type: application/json" \ - -d "$json_content" \ - "$CONNECT_URL/connectors") + # Register connector (with retry for transient 404s during Kafka Connect initialization) + local attempt + for attempt in 1 2 3; do + RESULT=$(curl -s -X POST \ + -H "Content-Type: application/json" \ + -d "$json_content" \ + "$CONNECT_URL/connectors") - echo " Result: $RESULT" + if echo "$RESULT" | grep -q '"name"'; then + echo " OK: $connector_name registered" + break + fi + + if [ $attempt -lt 3 ]; then + echo " Attempt $attempt failed, retrying in 5s..." + sleep 5 + else + echo " FAILED after 3 attempts: $RESULT" + fi + done # Check status sleep 2