diff --git a/backend/services/deploy.sh b/backend/services/deploy.sh index c0807f08..f9c5c0b5 100755 --- a/backend/services/deploy.sh +++ b/backend/services/deploy.sh @@ -201,20 +201,21 @@ up() { log_info "Starting Debezium Connect..." docker compose -f "$COMPOSE_FILE" --env-file "$ENV_FILE" up -d debezium-connect - # Wait for Debezium Connect to be ready - log_info "Waiting for Debezium Connect to be ready..." - for i in {1..30}; do - if curl -s http://localhost:8084/ > /dev/null 2>&1; then - log_info "Debezium Connect is ready!" + # Wait for Debezium Connect REST API to be fully ready + # Note: check /connectors not / — Jetty responds on / before Kafka Connect REST is initialized + log_info "Waiting for Debezium Connect REST API to be ready..." + for i in {1..60}; do + if curl -s http://localhost:8084/connectors > /dev/null 2>&1; then + log_info "Debezium Connect REST API is ready!" break fi - if [ $i -eq 30 ]; then - log_warn "Debezium Connect not ready after 60s, continuing anyway..." + if [ $i -eq 60 ]; then + log_warn "Debezium Connect REST API not ready after 120s, continuing anyway..." fi sleep 2 done - # Register Debezium connectors + # Register Debezium connectors (using register-connectors.sh with JSON configs) register_debezium_connectors # Start application services @@ -228,258 +229,19 @@ up() { } register_debezium_connectors() { - log_info "Registering Debezium connectors..." + local register_script="$SCRIPT_DIR/scripts/debezium/register-connectors.sh" - # Check existing connectors - EXISTING=$(curl -s http://localhost:8084/connectors 2>/dev/null || echo "[]") + if [ ! -f "$register_script" ]; then + log_warn "register-connectors.sh not found at $register_script, skipping..." + return + fi - # Read database credentials from .env + # Source .env for credentials, then call register script (only 1.0 connectors on this server) source "$ENV_FILE" - - # Register identity-postgres-connector - if echo "$EXISTING" | grep -q "identity-postgres-connector"; then - log_info "identity-postgres-connector already registered" - else - log_info "Registering identity-postgres-connector..." - IDENTITY_CONFIG='{ - "name": "identity-postgres-connector", - "config": { - "connector.class": "io.debezium.connector.postgresql.PostgresConnector", - "tasks.max": "1", - "database.hostname": "postgres", - "database.port": "5432", - "database.user": "'${POSTGRES_USER:-rwa_user}'", - "database.password": "'${POSTGRES_PASSWORD:-rwa_secure_password}'", - "database.dbname": "rwa_identity", - "topic.prefix": "cdc.identity", - "table.include.list": "public.user_accounts", - "plugin.name": "pgoutput", - "publication.name": "debezium_identity_publication", - "publication.autocreate.mode": "filtered", - "slot.name": "debezium_identity_slot", - "key.converter": "org.apache.kafka.connect.json.JsonConverter", - "key.converter.schemas.enable": "false", - "value.converter": "org.apache.kafka.connect.json.JsonConverter", - "value.converter.schemas.enable": "false", - "transforms": "unwrap", - "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", - "transforms.unwrap.drop.tombstones": "true", - "transforms.unwrap.delete.handling.mode": "rewrite", - "transforms.unwrap.add.fields": "op,table,source.ts_ms", - "heartbeat.interval.ms": "10000", - "snapshot.mode": "initial", - "decimal.handling.mode": "string", - "time.precision.mode": "connect" - } - }' - - RESULT=$(curl -s -X POST \ - -H "Content-Type: application/json" \ - -d "$IDENTITY_CONFIG" \ - "http://localhost:8084/connectors" 2>/dev/null || echo "failed") - - if echo "$RESULT" | grep -q "identity-postgres-connector"; then - log_info "identity-postgres-connector registered successfully" - else - log_warn "Failed to register identity connector: $RESULT" - fi - fi - - # Register referral-postgres-connector - if echo "$EXISTING" | grep -q "referral-postgres-connector"; then - log_info "referral-postgres-connector already registered" - else - log_info "Registering referral-postgres-connector..." - REFERRAL_CONFIG='{ - "name": "referral-postgres-connector", - "config": { - "connector.class": "io.debezium.connector.postgresql.PostgresConnector", - "tasks.max": "1", - "database.hostname": "postgres", - "database.port": "5432", - "database.user": "'${POSTGRES_USER:-rwa_user}'", - "database.password": "'${POSTGRES_PASSWORD:-rwa_secure_password}'", - "database.dbname": "rwa_referral", - "topic.prefix": "cdc.referral", - "table.include.list": "public.referral_relationships", - "plugin.name": "pgoutput", - "publication.name": "debezium_referral_publication", - "publication.autocreate.mode": "filtered", - "slot.name": "debezium_referral_slot", - "key.converter": "org.apache.kafka.connect.json.JsonConverter", - "key.converter.schemas.enable": "false", - "value.converter": "org.apache.kafka.connect.json.JsonConverter", - "value.converter.schemas.enable": "false", - "transforms": "unwrap", - "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", - "transforms.unwrap.drop.tombstones": "true", - "transforms.unwrap.delete.handling.mode": "rewrite", - "transforms.unwrap.add.fields": "op,table,source.ts_ms", - "heartbeat.interval.ms": "10000", - "snapshot.mode": "initial", - "decimal.handling.mode": "string", - "time.precision.mode": "connect" - } - }' - - RESULT=$(curl -s -X POST \ - -H "Content-Type: application/json" \ - -d "$REFERRAL_CONFIG" \ - "http://localhost:8084/connectors" 2>/dev/null || echo "failed") - - if echo "$RESULT" | grep -q "referral-postgres-connector"; then - log_info "referral-postgres-connector registered successfully" - else - log_warn "Failed to register referral connector: $RESULT" - fi - fi - - # Register wallet-postgres-connector - if echo "$EXISTING" | grep -q "wallet-postgres-connector"; then - log_info "wallet-postgres-connector already registered" - else - log_info "Registering wallet-postgres-connector..." - WALLET_CONFIG='{ - "name": "wallet-postgres-connector", - "config": { - "connector.class": "io.debezium.connector.postgresql.PostgresConnector", - "tasks.max": "1", - "database.hostname": "postgres", - "database.port": "5432", - "database.user": "'${POSTGRES_USER:-rwa_user}'", - "database.password": "'${POSTGRES_PASSWORD:-rwa_secure_password}'", - "database.dbname": "rwa_wallet", - "topic.prefix": "cdc.wallet", - "table.include.list": "public.wallet_accounts,public.withdrawal_orders,public.fiat_withdrawal_orders,public.wallet_ledger_entries", - "plugin.name": "pgoutput", - "publication.name": "debezium_wallet_publication", - "publication.autocreate.mode": "filtered", - "slot.name": "debezium_wallet_slot", - "key.converter": "org.apache.kafka.connect.json.JsonConverter", - "key.converter.schemas.enable": "false", - "value.converter": "org.apache.kafka.connect.json.JsonConverter", - "value.converter.schemas.enable": "false", - "transforms": "unwrap", - "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", - "transforms.unwrap.drop.tombstones": "true", - "transforms.unwrap.delete.handling.mode": "rewrite", - "transforms.unwrap.add.fields": "op,table,source.ts_ms", - "heartbeat.interval.ms": "10000", - "snapshot.mode": "initial", - "decimal.handling.mode": "string", - "time.precision.mode": "connect" - } - }' - - RESULT=$(curl -s -X POST \ - -H "Content-Type: application/json" \ - -d "$WALLET_CONFIG" \ - "http://localhost:8084/connectors" 2>/dev/null || echo "failed") - - if echo "$RESULT" | grep -q "wallet-postgres-connector"; then - log_info "wallet-postgres-connector registered successfully" - else - log_warn "Failed to register wallet connector: $RESULT" - fi - fi - - # Register planting-postgres-connector - if echo "$EXISTING" | grep -q "planting-postgres-connector"; then - log_info "planting-postgres-connector already registered" - else - log_info "Registering planting-postgres-connector..." - PLANTING_CONFIG='{ - "name": "planting-postgres-connector", - "config": { - "connector.class": "io.debezium.connector.postgresql.PostgresConnector", - "tasks.max": "1", - "database.hostname": "postgres", - "database.port": "5432", - "database.user": "'${POSTGRES_USER:-rwa_user}'", - "database.password": "'${POSTGRES_PASSWORD:-rwa_secure_password}'", - "database.dbname": "rwa_planting", - "topic.prefix": "cdc.planting", - "table.include.list": "public.planting_orders,public.planting_positions,public.contract_signing_tasks,public.fund_allocations", - "plugin.name": "pgoutput", - "publication.name": "debezium_planting_publication", - "publication.autocreate.mode": "filtered", - "slot.name": "debezium_planting_slot", - "key.converter": "org.apache.kafka.connect.json.JsonConverter", - "key.converter.schemas.enable": "false", - "value.converter": "org.apache.kafka.connect.json.JsonConverter", - "value.converter.schemas.enable": "false", - "transforms": "unwrap", - "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", - "transforms.unwrap.drop.tombstones": "true", - "transforms.unwrap.delete.handling.mode": "rewrite", - "transforms.unwrap.add.fields": "op,table,source.ts_ms", - "heartbeat.interval.ms": "10000", - "snapshot.mode": "initial", - "decimal.handling.mode": "string", - "time.precision.mode": "connect" - } - }' - - RESULT=$(curl -s -X POST \ - -H "Content-Type: application/json" \ - -d "$PLANTING_CONFIG" \ - "http://localhost:8084/connectors" 2>/dev/null || echo "failed") - - if echo "$RESULT" | grep -q "planting-postgres-connector"; then - log_info "planting-postgres-connector registered successfully" - else - log_warn "Failed to register planting connector: $RESULT" - fi - fi - - # Register authorization-postgres-connector - if echo "$EXISTING" | grep -q "authorization-postgres-connector"; then - log_info "authorization-postgres-connector already registered" - else - log_info "Registering authorization-postgres-connector..." - AUTHORIZATION_CONFIG='{ - "name": "authorization-postgres-connector", - "config": { - "connector.class": "io.debezium.connector.postgresql.PostgresConnector", - "tasks.max": "1", - "database.hostname": "postgres", - "database.port": "5432", - "database.user": "'${POSTGRES_USER:-rwa_user}'", - "database.password": "'${POSTGRES_PASSWORD:-rwa_secure_password}'", - "database.dbname": "rwa_authorization", - "topic.prefix": "cdc.authorization", - "table.include.list": "public.authorization_roles,public.monthly_assessments,public.system_accounts,public.system_account_ledgers", - "plugin.name": "pgoutput", - "publication.name": "debezium_authorization_publication", - "publication.autocreate.mode": "filtered", - "slot.name": "debezium_authorization_slot", - "key.converter": "org.apache.kafka.connect.json.JsonConverter", - "key.converter.schemas.enable": "false", - "value.converter": "org.apache.kafka.connect.json.JsonConverter", - "value.converter.schemas.enable": "false", - "transforms": "unwrap", - "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", - "transforms.unwrap.drop.tombstones": "true", - "transforms.unwrap.delete.handling.mode": "rewrite", - "transforms.unwrap.add.fields": "op,table,source.ts_ms", - "heartbeat.interval.ms": "10000", - "snapshot.mode": "initial", - "decimal.handling.mode": "string", - "time.precision.mode": "connect" - } - }' - - RESULT=$(curl -s -X POST \ - -H "Content-Type: application/json" \ - -d "$AUTHORIZATION_CONFIG" \ - "http://localhost:8084/connectors" 2>/dev/null || echo "failed") - - if echo "$RESULT" | grep -q "authorization-postgres-connector"; then - log_info "authorization-postgres-connector registered successfully" - else - log_warn "Failed to register authorization connector: $RESULT" - fi - fi + POSTGRES_PASSWORD="$POSTGRES_PASSWORD" \ + POSTGRES_USER="${POSTGRES_USER:-rwa_user}" \ + DEBEZIUM_CONNECT_URL="http://localhost:8084" \ + bash "$register_script" --1.0 } down() { diff --git a/backend/services/scripts/debezium/register-connectors.sh b/backend/services/scripts/debezium/register-connectors.sh index a64b2911..7b99925a 100644 --- a/backend/services/scripts/debezium/register-connectors.sh +++ b/backend/services/scripts/debezium/register-connectors.sh @@ -53,14 +53,14 @@ echo "Force: $FORCE" # Wait for Debezium Connect to be ready echo "" -echo "Waiting for Debezium Connect to be ready..." +echo "Waiting for Debezium Connect REST API to be ready..." for i in $(seq 1 $MAX_RETRIES); do - if curl -s "$CONNECT_URL/" > /dev/null 2>&1; then - echo "Debezium Connect is ready!" + if curl -s "$CONNECT_URL/connectors" > /dev/null 2>&1; then + echo "Debezium Connect REST API is ready!" break fi if [ $i -eq $MAX_RETRIES ]; then - echo "ERROR: Debezium Connect is not ready after $MAX_RETRIES attempts" + echo "ERROR: Debezium Connect REST API is not ready after $MAX_RETRIES attempts" exit 1 fi echo "Attempt $i/$MAX_RETRIES - waiting ${RETRY_INTERVAL}s..."