From 63d73af13516880df946c6f6efbf5a296341668d Mon Sep 17 00:00:00 2001 From: hailin Date: Sun, 11 Jan 2026 23:19:34 -0800 Subject: [PATCH] =?UTF-8?q?refactor(cdc):=20=E7=BB=9F=E4=B8=80=E4=BD=BF?= =?UTF-8?q?=E7=94=A8=20Debezium=20CDC=20=E8=BF=9B=E8=A1=8C=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=90=8C=E6=AD=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. contribution-service: - 添加 identity topic 订阅,全量同步 1.0 用户数据 - 修改 fromBeginning 为 true,首次启动全量同步 2. mining-admin-service: - 将 Outbox 事件改为 Debezium CDC 监听 outbox_events 表 - 修改 fromBeginning 为 true,首次启动全量同步 3. 新增 5 个 2.0 服务的 Debezium connector 配置: - auth-outbox-connector.json - contribution-outbox-connector.json - mining-outbox-connector.json - trading-outbox-connector.json - mining-wallet-outbox-connector.json 4. 更新 register-connectors.sh 脚本 Co-Authored-By: Claude Opus 4.5 --- .../contribution-service/.env.example | 8 +- .../event-handlers/cdc-event-dispatcher.ts | 9 +- .../kafka/cdc-consumer.service.ts | 6 +- .../kafka/cdc-consumer.service.ts | 2 +- .../infrastructure/kafka/cdc-sync.service.ts | 30 ++-- .../debezium/auth-outbox-connector.json | 44 ++++++ .../contribution-outbox-connector.json | 44 ++++++ .../debezium/mining-outbox-connector.json | 44 ++++++ .../mining-wallet-outbox-connector.json | 44 ++++++ .../scripts/debezium/register-connectors.sh | 133 ++++++++++-------- .../debezium/trading-outbox-connector.json | 44 ++++++ 11 files changed, 320 insertions(+), 88 deletions(-) create mode 100644 backend/services/scripts/debezium/auth-outbox-connector.json create mode 100644 backend/services/scripts/debezium/contribution-outbox-connector.json create mode 100644 backend/services/scripts/debezium/mining-outbox-connector.json create mode 100644 backend/services/scripts/debezium/mining-wallet-outbox-connector.json create mode 100644 backend/services/scripts/debezium/trading-outbox-connector.json diff --git a/backend/services/contribution-service/.env.example b/backend/services/contribution-service/.env.example index 97ec7714..48c04c31 100644 --- a/backend/services/contribution-service/.env.example +++ b/backend/services/contribution-service/.env.example @@ -16,7 +16,7 @@ KAFKA_GROUP_ID=contribution-service-group # JWT (for auth validation) JWT_SECRET=your-jwt-secret -# CDC Topics -CDC_TOPIC_USERS=dbserver1.rwa_identity.users -CDC_TOPIC_ADOPTIONS=dbserver1.rwa_planting.adoptions -CDC_TOPIC_REFERRALS=dbserver1.rwa_referral.referral_relations +# CDC Topics (从1.0系统全量同步) +CDC_TOPIC_USERS=cdc.identity.public.user_accounts +CDC_TOPIC_ADOPTIONS=cdc.planting.public.planting_orders +CDC_TOPIC_REFERRALS=cdc.referral.public.referral_relationships diff --git a/backend/services/contribution-service/src/application/event-handlers/cdc-event-dispatcher.ts b/backend/services/contribution-service/src/application/event-handlers/cdc-event-dispatcher.ts index a745fe3a..4ebefc45 100644 --- a/backend/services/contribution-service/src/application/event-handlers/cdc-event-dispatcher.ts +++ b/backend/services/contribution-service/src/application/event-handlers/cdc-event-dispatcher.ts @@ -24,10 +24,11 @@ export class CDCEventDispatcher implements OnModuleInit { // 表名需要与 Debezium topic 中的表名一致 // topic 格式: cdc..public. // - // 注意:contribution-service 不需要直接同步用户数据 - // - 认种订单 (planting_orders) 包含 account_sequence - // - 推荐关系 (referral_relationships) 包含 account_sequence 和层级信息 - // - ContributionAccount 在认种时自动创建 + // 从 1.0 系统全量同步三类数据: + // - 用户数据 (identity-service: user_accounts) + // - 推荐关系 (referral-service: referral_relationships) + // - 认种订单 (planting-service: planting_orders) + this.cdcConsumer.registerHandler('user_accounts', this.handleUserEvent.bind(this)); // identity-service this.cdcConsumer.registerHandler('referral_relationships', this.handleReferralEvent.bind(this)); // referral-service this.cdcConsumer.registerHandler('planting_orders', this.handleAdoptionEvent.bind(this)); // planting-service diff --git a/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts b/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts index 9b10f246..01192aff 100644 --- a/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts +++ b/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts @@ -83,8 +83,10 @@ export class CDCConsumerService implements OnModuleInit { await this.consumer.connect(); this.logger.log('CDC consumer connected'); - // 订阅 Debezium CDC topics (从1.0服务同步) + // 订阅 Debezium CDC topics (从1.0服务全量同步) const topics = [ + // 用户账户表 (identity-service: user_accounts) + this.configService.get('CDC_TOPIC_USERS', 'cdc.identity.public.user_accounts'), // 认种订单表 (planting-service: planting_orders) this.configService.get('CDC_TOPIC_ADOPTIONS', 'cdc.planting.public.planting_orders'), // 推荐关系表 (referral-service: referral_relationships) @@ -93,7 +95,7 @@ export class CDCConsumerService implements OnModuleInit { await this.consumer.subscribe({ topics, - fromBeginning: false, + fromBeginning: true, // 首次启动时全量同步历史数据 }); this.logger.log(`Subscribed to topics: ${topics.join(', ')}`); diff --git a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-consumer.service.ts b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-consumer.service.ts index c338a956..9f0c7983 100644 --- a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-consumer.service.ts +++ b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-consumer.service.ts @@ -128,7 +128,7 @@ export class CdcConsumerService implements OnModuleInit, OnModuleDestroy { await this.consumer.subscribe({ topics: this.topics, - fromBeginning: false, + fromBeginning: true, // 首次启动时全量同步历史数据 }); this.logger.log(`Subscribed to topics: ${this.topics.join(', ')}`); diff --git a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts index ec65c160..94cecfc1 100644 --- a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts +++ b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts @@ -30,11 +30,11 @@ export class CdcSyncService implements OnModuleInit { private async registerHandlers(): Promise { // =========================================================================== - // 从 auth-service 同步用户数据 + // 从 auth-service 同步用户数据 (通过 Debezium CDC 监听 outbox_events 表) // =========================================================================== const usersTopic = this.configService.get( - 'CDC_TOPIC_USERS', - 'mining-admin.auth.users', + 'CDC_TOPIC_AUTH_OUTBOX', + 'cdc.auth.outbox', ); this.cdcConsumer.addTopic(usersTopic); this.cdcConsumer.registerServiceHandler( @@ -66,11 +66,11 @@ export class CdcSyncService implements OnModuleInit { ); // =========================================================================== - // 从 contribution-service 同步算力数据 + // 从 contribution-service 同步算力数据 (通过 Debezium CDC 监听 outbox_events 表) // =========================================================================== const contributionTopic = this.configService.get( - 'CDC_TOPIC_CONTRIBUTION', - 'mining-admin.contribution.accounts', + 'CDC_TOPIC_CONTRIBUTION_OUTBOX', + 'cdc.contribution.outbox', ); this.cdcConsumer.addTopic(contributionTopic); this.cdcConsumer.registerServiceHandler( @@ -83,11 +83,11 @@ export class CdcSyncService implements OnModuleInit { ); // =========================================================================== - // 从 mining-service 同步挖矿数据 + // 从 mining-service 同步挖矿数据 (通过 Debezium CDC 监听 outbox_events 表) // =========================================================================== const miningTopic = this.configService.get( - 'CDC_TOPIC_MINING', - 'mining-admin.mining.accounts', + 'CDC_TOPIC_MINING_OUTBOX', + 'cdc.mining.outbox', ); this.cdcConsumer.addTopic(miningTopic); this.cdcConsumer.registerServiceHandler( @@ -104,11 +104,11 @@ export class CdcSyncService implements OnModuleInit { ); // =========================================================================== - // 从 trading-service 同步交易数据 + // 从 trading-service 同步交易数据 (通过 Debezium CDC 监听 outbox_events 表) // =========================================================================== const tradingTopic = this.configService.get( - 'CDC_TOPIC_TRADING', - 'mining-admin.trading.accounts', + 'CDC_TOPIC_TRADING_OUTBOX', + 'cdc.trading.outbox', ); this.cdcConsumer.addTopic(tradingTopic); this.cdcConsumer.registerServiceHandler( @@ -126,11 +126,11 @@ export class CdcSyncService implements OnModuleInit { // =========================================================================== - // 从 mining-wallet-service 同步钱包数据 + // 从 mining-wallet-service 同步钱包数据 (通过 Debezium CDC 监听 outbox_events 表) // =========================================================================== const walletTopic = this.configService.get( - 'CDC_TOPIC_WALLET', - 'mining-admin.wallet.accounts', + 'CDC_TOPIC_WALLET_OUTBOX', + 'cdc.mining-wallet.outbox', ); this.cdcConsumer.addTopic(walletTopic); diff --git a/backend/services/scripts/debezium/auth-outbox-connector.json b/backend/services/scripts/debezium/auth-outbox-connector.json new file mode 100644 index 00000000..345e7f4e --- /dev/null +++ b/backend/services/scripts/debezium/auth-outbox-connector.json @@ -0,0 +1,44 @@ +{ + "name": "auth-outbox-connector", + "config": { + "connector.class": "io.debezium.connector.postgresql.PostgresConnector", + "tasks.max": "1", + + "database.hostname": "postgres", + "database.port": "5432", + "database.user": "${POSTGRES_USER:-rwa_user}", + "database.password": "${POSTGRES_PASSWORD:-rwa_secure_password}", + "database.dbname": "rwa_auth", + + "topic.prefix": "cdc.auth", + + "table.include.list": "public.outbox_events", + + "plugin.name": "pgoutput", + "publication.name": "debezium_auth_outbox_publication", + "publication.autocreate.mode": "filtered", + + "slot.name": "debezium_auth_outbox_slot", + + "key.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter.schemas.enable": "false", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + + "transforms": "unwrap,route", + "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", + "transforms.unwrap.drop.tombstones": "true", + "transforms.unwrap.delete.handling.mode": "rewrite", + "transforms.unwrap.add.fields": "op,table,source.ts_ms", + "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", + "transforms.route.regex": ".*", + "transforms.route.replacement": "cdc.auth.outbox", + + "heartbeat.interval.ms": "10000", + + "snapshot.mode": "initial", + + "decimal.handling.mode": "string", + "time.precision.mode": "connect" + } +} diff --git a/backend/services/scripts/debezium/contribution-outbox-connector.json b/backend/services/scripts/debezium/contribution-outbox-connector.json new file mode 100644 index 00000000..30c7ebae --- /dev/null +++ b/backend/services/scripts/debezium/contribution-outbox-connector.json @@ -0,0 +1,44 @@ +{ + "name": "contribution-outbox-connector", + "config": { + "connector.class": "io.debezium.connector.postgresql.PostgresConnector", + "tasks.max": "1", + + "database.hostname": "postgres", + "database.port": "5432", + "database.user": "${POSTGRES_USER:-rwa_user}", + "database.password": "${POSTGRES_PASSWORD:-rwa_secure_password}", + "database.dbname": "rwa_contribution", + + "topic.prefix": "cdc.contribution", + + "table.include.list": "public.outbox_events", + + "plugin.name": "pgoutput", + "publication.name": "debezium_contribution_outbox_publication", + "publication.autocreate.mode": "filtered", + + "slot.name": "debezium_contribution_outbox_slot", + + "key.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter.schemas.enable": "false", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + + "transforms": "unwrap,route", + "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", + "transforms.unwrap.drop.tombstones": "true", + "transforms.unwrap.delete.handling.mode": "rewrite", + "transforms.unwrap.add.fields": "op,table,source.ts_ms", + "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", + "transforms.route.regex": ".*", + "transforms.route.replacement": "cdc.contribution.outbox", + + "heartbeat.interval.ms": "10000", + + "snapshot.mode": "initial", + + "decimal.handling.mode": "string", + "time.precision.mode": "connect" + } +} diff --git a/backend/services/scripts/debezium/mining-outbox-connector.json b/backend/services/scripts/debezium/mining-outbox-connector.json new file mode 100644 index 00000000..98651e46 --- /dev/null +++ b/backend/services/scripts/debezium/mining-outbox-connector.json @@ -0,0 +1,44 @@ +{ + "name": "mining-outbox-connector", + "config": { + "connector.class": "io.debezium.connector.postgresql.PostgresConnector", + "tasks.max": "1", + + "database.hostname": "postgres", + "database.port": "5432", + "database.user": "${POSTGRES_USER:-rwa_user}", + "database.password": "${POSTGRES_PASSWORD:-rwa_secure_password}", + "database.dbname": "rwa_mining", + + "topic.prefix": "cdc.mining", + + "table.include.list": "public.outbox_events", + + "plugin.name": "pgoutput", + "publication.name": "debezium_mining_outbox_publication", + "publication.autocreate.mode": "filtered", + + "slot.name": "debezium_mining_outbox_slot", + + "key.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter.schemas.enable": "false", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + + "transforms": "unwrap,route", + "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", + "transforms.unwrap.drop.tombstones": "true", + "transforms.unwrap.delete.handling.mode": "rewrite", + "transforms.unwrap.add.fields": "op,table,source.ts_ms", + "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", + "transforms.route.regex": ".*", + "transforms.route.replacement": "cdc.mining.outbox", + + "heartbeat.interval.ms": "10000", + + "snapshot.mode": "initial", + + "decimal.handling.mode": "string", + "time.precision.mode": "connect" + } +} diff --git a/backend/services/scripts/debezium/mining-wallet-outbox-connector.json b/backend/services/scripts/debezium/mining-wallet-outbox-connector.json new file mode 100644 index 00000000..6a445c8a --- /dev/null +++ b/backend/services/scripts/debezium/mining-wallet-outbox-connector.json @@ -0,0 +1,44 @@ +{ + "name": "mining-wallet-outbox-connector", + "config": { + "connector.class": "io.debezium.connector.postgresql.PostgresConnector", + "tasks.max": "1", + + "database.hostname": "postgres", + "database.port": "5432", + "database.user": "${POSTGRES_USER:-rwa_user}", + "database.password": "${POSTGRES_PASSWORD:-rwa_secure_password}", + "database.dbname": "mining_wallet_db", + + "topic.prefix": "cdc.mining-wallet", + + "table.include.list": "public.outbox_events", + + "plugin.name": "pgoutput", + "publication.name": "debezium_mining_wallet_outbox_publication", + "publication.autocreate.mode": "filtered", + + "slot.name": "debezium_mining_wallet_outbox_slot", + + "key.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter.schemas.enable": "false", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + + "transforms": "unwrap,route", + "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", + "transforms.unwrap.drop.tombstones": "true", + "transforms.unwrap.delete.handling.mode": "rewrite", + "transforms.unwrap.add.fields": "op,table,source.ts_ms", + "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", + "transforms.route.regex": ".*", + "transforms.route.replacement": "cdc.mining-wallet.outbox", + + "heartbeat.interval.ms": "10000", + + "snapshot.mode": "initial", + + "decimal.handling.mode": "string", + "time.precision.mode": "connect" + } +} diff --git a/backend/services/scripts/debezium/register-connectors.sh b/backend/services/scripts/debezium/register-connectors.sh index 771e065f..1b65ea47 100644 --- a/backend/services/scripts/debezium/register-connectors.sh +++ b/backend/services/scripts/debezium/register-connectors.sh @@ -4,20 +4,24 @@ # ============================================================================= # Usage: ./register-connectors.sh # -# This script registers the PostgreSQL connector for identity-service CDC -# It should be run after Debezium Connect is fully started +# This script registers all PostgreSQL connectors for CDC: +# - 1.0 系统: identity, planting, referral, wallet, authorization +# - 2.0 系统: auth, contribution, mining, trading, mining-wallet (outbox tables) # ============================================================================= set -e CONNECT_URL="${DEBEZIUM_CONNECT_URL:-http://localhost:8083}" +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" MAX_RETRIES=30 RETRY_INTERVAL=5 echo "=== Debezium Connector Registration ===" echo "Connect URL: $CONNECT_URL" +echo "Script Dir: $SCRIPT_DIR" # Wait for Debezium Connect to be ready +echo "" echo "Waiting for Debezium Connect to be ready..." for i in $(seq 1 $MAX_RETRIES); do if curl -s "$CONNECT_URL/" > /dev/null 2>&1; then @@ -38,81 +42,86 @@ echo "Checking existing connectors..." EXISTING=$(curl -s "$CONNECT_URL/connectors") echo "Existing connectors: $EXISTING" -# Register identity-postgres connector -echo "" -echo "Registering identity-postgres-connector..." +# Function to register a connector from JSON file +register_connector() { + local json_file="$1" + local connector_name="$2" -CONNECTOR_CONFIG='{ - "name": "identity-postgres-connector", - "config": { - "connector.class": "io.debezium.connector.postgresql.PostgresConnector", - "tasks.max": "1", + if [ ! -f "$json_file" ]; then + echo "WARNING: $json_file not found, skipping..." + return + fi - "database.hostname": "postgres", - "database.port": "5432", - "database.user": "'${POSTGRES_USER:-rwa_user}'", - "database.password": "'${POSTGRES_PASSWORD:-rwa_secure_password}'", - "database.dbname": "rwa_identity", - "database.server.name": "identity", + echo "" + echo "Registering $connector_name..." - "topic.prefix": "cdc.identity", + # Delete existing connector if exists + if echo "$EXISTING" | grep -q "$connector_name"; then + echo " Deleting existing $connector_name..." + curl -s -X DELETE "$CONNECT_URL/connectors/$connector_name" + sleep 2 + fi - "table.include.list": "public.user_accounts", + # Register connector + RESULT=$(curl -s -X POST \ + -H "Content-Type: application/json" \ + -d @"$json_file" \ + "$CONNECT_URL/connectors") - "plugin.name": "pgoutput", - "publication.name": "debezium_identity_publication", - "publication.autocreate.mode": "filtered", + echo " Result: $RESULT" - "slot.name": "debezium_identity_slot", - - "key.converter": "org.apache.kafka.connect.json.JsonConverter", - "key.converter.schemas.enable": "false", - "value.converter": "org.apache.kafka.connect.json.JsonConverter", - "value.converter.schemas.enable": "false", - - "transforms": "unwrap", - "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", - "transforms.unwrap.drop.tombstones": "true", - "transforms.unwrap.delete.handling.mode": "rewrite", - "transforms.unwrap.add.fields": "op,table,source.ts_ms", - - "heartbeat.interval.ms": "10000", - - "snapshot.mode": "initial", - - "decimal.handling.mode": "string", - "time.precision.mode": "connect" - } -}' - -# Delete existing connector if exists -if echo "$EXISTING" | grep -q "identity-postgres-connector"; then - echo "Deleting existing identity-postgres-connector..." - curl -s -X DELETE "$CONNECT_URL/connectors/identity-postgres-connector" + # Check status sleep 2 -fi + STATUS=$(curl -s "$CONNECT_URL/connectors/$connector_name/status" 2>/dev/null || echo "Status check failed") + echo " Status: $STATUS" +} -# Create connector -RESULT=$(curl -s -X POST \ - -H "Content-Type: application/json" \ - -d "$CONNECTOR_CONFIG" \ - "$CONNECT_URL/connectors") - -echo "Result: $RESULT" - -# Check connector status +# ============================================================================= +# 1.0 系统 Connectors (监听业务表) +# ============================================================================= echo "" -echo "Checking connector status..." -sleep 3 -STATUS=$(curl -s "$CONNECT_URL/connectors/identity-postgres-connector/status") -echo "Connector status: $STATUS" +echo "=== Registering 1.0 System Connectors ===" +register_connector "$SCRIPT_DIR/identity-connector.json" "identity-postgres-connector" +register_connector "$SCRIPT_DIR/planting-connector.json" "planting-postgres-connector" +register_connector "$SCRIPT_DIR/referral-connector.json" "referral-postgres-connector" +register_connector "$SCRIPT_DIR/wallet-connector.json" "wallet-postgres-connector" +register_connector "$SCRIPT_DIR/authorization-connector.json" "authorization-postgres-connector" + +# ============================================================================= +# 2.0 系统 Connectors (监听 outbox_events 表) +# ============================================================================= +echo "" +echo "=== Registering 2.0 System Outbox Connectors ===" + +register_connector "$SCRIPT_DIR/auth-outbox-connector.json" "auth-outbox-connector" +register_connector "$SCRIPT_DIR/contribution-outbox-connector.json" "contribution-outbox-connector" +register_connector "$SCRIPT_DIR/mining-outbox-connector.json" "mining-outbox-connector" +register_connector "$SCRIPT_DIR/trading-outbox-connector.json" "trading-outbox-connector" +register_connector "$SCRIPT_DIR/mining-wallet-outbox-connector.json" "mining-wallet-outbox-connector" + +# ============================================================================= +# Summary +# ============================================================================= echo "" echo "=== Registration Complete ===" echo "" echo "Kafka topics created:" +echo "" +echo "1.0 System (业务表 CDC):" echo " - cdc.identity.public.user_accounts" +echo " - cdc.planting.public.planting_orders" +echo " - cdc.referral.public.referral_relationships" +echo " - cdc.wallet.public.wallet_accounts" +echo " - cdc.authorization.public.*" +echo "" +echo "2.0 System (Outbox 表 CDC):" +echo " - cdc.auth.outbox" +echo " - cdc.contribution.outbox" +echo " - cdc.mining.outbox" +echo " - cdc.trading.outbox" +echo " - cdc.mining-wallet.outbox" echo "" echo "To verify:" echo " curl $CONNECT_URL/connectors" -echo " curl $CONNECT_URL/connectors/identity-postgres-connector/status" +echo " curl $CONNECT_URL/connectors//status" diff --git a/backend/services/scripts/debezium/trading-outbox-connector.json b/backend/services/scripts/debezium/trading-outbox-connector.json new file mode 100644 index 00000000..25e806c4 --- /dev/null +++ b/backend/services/scripts/debezium/trading-outbox-connector.json @@ -0,0 +1,44 @@ +{ + "name": "trading-outbox-connector", + "config": { + "connector.class": "io.debezium.connector.postgresql.PostgresConnector", + "tasks.max": "1", + + "database.hostname": "postgres", + "database.port": "5432", + "database.user": "${POSTGRES_USER:-rwa_user}", + "database.password": "${POSTGRES_PASSWORD:-rwa_secure_password}", + "database.dbname": "rwa_trading", + + "topic.prefix": "cdc.trading", + + "table.include.list": "public.outbox_events", + + "plugin.name": "pgoutput", + "publication.name": "debezium_trading_outbox_publication", + "publication.autocreate.mode": "filtered", + + "slot.name": "debezium_trading_outbox_slot", + + "key.converter": "org.apache.kafka.connect.json.JsonConverter", + "key.converter.schemas.enable": "false", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + + "transforms": "unwrap,route", + "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", + "transforms.unwrap.drop.tombstones": "true", + "transforms.unwrap.delete.handling.mode": "rewrite", + "transforms.unwrap.add.fields": "op,table,source.ts_ms", + "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", + "transforms.route.regex": ".*", + "transforms.route.replacement": "cdc.trading.outbox", + + "heartbeat.interval.ms": "10000", + + "snapshot.mode": "initial", + + "decimal.handling.mode": "string", + "time.precision.mode": "connect" + } +}