refactor(cdc): 统一使用 Debezium CDC 进行数据同步

1. contribution-service:
   - 添加 identity topic 订阅,全量同步 1.0 用户数据
   - 修改 fromBeginning 为 true,首次启动全量同步

2. mining-admin-service:
   - 将 Outbox 事件改为 Debezium CDC 监听 outbox_events 表
   - 修改 fromBeginning 为 true,首次启动全量同步

3. 新增 5 个 2.0 服务的 Debezium connector 配置:
   - auth-outbox-connector.json
   - contribution-outbox-connector.json
   - mining-outbox-connector.json
   - trading-outbox-connector.json
   - mining-wallet-outbox-connector.json

4. 更新 register-connectors.sh 脚本

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-11 23:19:34 -08:00
parent cab36fccf1
commit 63d73af135
11 changed files with 320 additions and 88 deletions

View File

@ -16,7 +16,7 @@ KAFKA_GROUP_ID=contribution-service-group
# JWT (for auth validation)
JWT_SECRET=your-jwt-secret
# CDC Topics
CDC_TOPIC_USERS=dbserver1.rwa_identity.users
CDC_TOPIC_ADOPTIONS=dbserver1.rwa_planting.adoptions
CDC_TOPIC_REFERRALS=dbserver1.rwa_referral.referral_relations
# CDC Topics (从1.0系统全量同步)
CDC_TOPIC_USERS=cdc.identity.public.user_accounts
CDC_TOPIC_ADOPTIONS=cdc.planting.public.planting_orders
CDC_TOPIC_REFERRALS=cdc.referral.public.referral_relationships

View File

@ -24,10 +24,11 @@ export class CDCEventDispatcher implements OnModuleInit {
// 表名需要与 Debezium topic 中的表名一致
// topic 格式: cdc.<service>.public.<table_name>
//
// 注意contribution-service 不需要直接同步用户数据
// - 认种订单 (planting_orders) 包含 account_sequence
// - 推荐关系 (referral_relationships) 包含 account_sequence 和层级信息
// - ContributionAccount 在认种时自动创建
// 从 1.0 系统全量同步三类数据:
// - 用户数据 (identity-service: user_accounts)
// - 推荐关系 (referral-service: referral_relationships)
// - 认种订单 (planting-service: planting_orders)
this.cdcConsumer.registerHandler('user_accounts', this.handleUserEvent.bind(this)); // identity-service
this.cdcConsumer.registerHandler('referral_relationships', this.handleReferralEvent.bind(this)); // referral-service
this.cdcConsumer.registerHandler('planting_orders', this.handleAdoptionEvent.bind(this)); // planting-service

View File

@ -83,8 +83,10 @@ export class CDCConsumerService implements OnModuleInit {
await this.consumer.connect();
this.logger.log('CDC consumer connected');
// 订阅 Debezium CDC topics (从1.0服务同步)
// 订阅 Debezium CDC topics (从1.0服务全量同步)
const topics = [
// 用户账户表 (identity-service: user_accounts)
this.configService.get<string>('CDC_TOPIC_USERS', 'cdc.identity.public.user_accounts'),
// 认种订单表 (planting-service: planting_orders)
this.configService.get<string>('CDC_TOPIC_ADOPTIONS', 'cdc.planting.public.planting_orders'),
// 推荐关系表 (referral-service: referral_relationships)
@ -93,7 +95,7 @@ export class CDCConsumerService implements OnModuleInit {
await this.consumer.subscribe({
topics,
fromBeginning: false,
fromBeginning: true, // 首次启动时全量同步历史数据
});
this.logger.log(`Subscribed to topics: ${topics.join(', ')}`);

View File

@ -128,7 +128,7 @@ export class CdcConsumerService implements OnModuleInit, OnModuleDestroy {
await this.consumer.subscribe({
topics: this.topics,
fromBeginning: false,
fromBeginning: true, // 首次启动时全量同步历史数据
});
this.logger.log(`Subscribed to topics: ${this.topics.join(', ')}`);

View File

@ -30,11 +30,11 @@ export class CdcSyncService implements OnModuleInit {
private async registerHandlers(): Promise<void> {
// ===========================================================================
// 从 auth-service 同步用户数据
// 从 auth-service 同步用户数据 (通过 Debezium CDC 监听 outbox_events 表)
// ===========================================================================
const usersTopic = this.configService.get<string>(
'CDC_TOPIC_USERS',
'mining-admin.auth.users',
'CDC_TOPIC_AUTH_OUTBOX',
'cdc.auth.outbox',
);
this.cdcConsumer.addTopic(usersTopic);
this.cdcConsumer.registerServiceHandler(
@ -66,11 +66,11 @@ export class CdcSyncService implements OnModuleInit {
);
// ===========================================================================
// 从 contribution-service 同步算力数据
// 从 contribution-service 同步算力数据 (通过 Debezium CDC 监听 outbox_events 表)
// ===========================================================================
const contributionTopic = this.configService.get<string>(
'CDC_TOPIC_CONTRIBUTION',
'mining-admin.contribution.accounts',
'CDC_TOPIC_CONTRIBUTION_OUTBOX',
'cdc.contribution.outbox',
);
this.cdcConsumer.addTopic(contributionTopic);
this.cdcConsumer.registerServiceHandler(
@ -83,11 +83,11 @@ export class CdcSyncService implements OnModuleInit {
);
// ===========================================================================
// 从 mining-service 同步挖矿数据
// 从 mining-service 同步挖矿数据 (通过 Debezium CDC 监听 outbox_events 表)
// ===========================================================================
const miningTopic = this.configService.get<string>(
'CDC_TOPIC_MINING',
'mining-admin.mining.accounts',
'CDC_TOPIC_MINING_OUTBOX',
'cdc.mining.outbox',
);
this.cdcConsumer.addTopic(miningTopic);
this.cdcConsumer.registerServiceHandler(
@ -104,11 +104,11 @@ export class CdcSyncService implements OnModuleInit {
);
// ===========================================================================
// 从 trading-service 同步交易数据
// 从 trading-service 同步交易数据 (通过 Debezium CDC 监听 outbox_events 表)
// ===========================================================================
const tradingTopic = this.configService.get<string>(
'CDC_TOPIC_TRADING',
'mining-admin.trading.accounts',
'CDC_TOPIC_TRADING_OUTBOX',
'cdc.trading.outbox',
);
this.cdcConsumer.addTopic(tradingTopic);
this.cdcConsumer.registerServiceHandler(
@ -126,11 +126,11 @@ export class CdcSyncService implements OnModuleInit {
// ===========================================================================
// 从 mining-wallet-service 同步钱包数据
// 从 mining-wallet-service 同步钱包数据 (通过 Debezium CDC 监听 outbox_events 表)
// ===========================================================================
const walletTopic = this.configService.get<string>(
'CDC_TOPIC_WALLET',
'mining-admin.wallet.accounts',
'CDC_TOPIC_WALLET_OUTBOX',
'cdc.mining-wallet.outbox',
);
this.cdcConsumer.addTopic(walletTopic);

View File

@ -0,0 +1,44 @@
{
"name": "auth-outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "${POSTGRES_USER:-rwa_user}",
"database.password": "${POSTGRES_PASSWORD:-rwa_secure_password}",
"database.dbname": "rwa_auth",
"topic.prefix": "cdc.auth",
"table.include.list": "public.outbox_events",
"plugin.name": "pgoutput",
"publication.name": "debezium_auth_outbox_publication",
"publication.autocreate.mode": "filtered",
"slot.name": "debezium_auth_outbox_slot",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"transforms": "unwrap,route",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,table,source.ts_ms",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": ".*",
"transforms.route.replacement": "cdc.auth.outbox",
"heartbeat.interval.ms": "10000",
"snapshot.mode": "initial",
"decimal.handling.mode": "string",
"time.precision.mode": "connect"
}
}

View File

@ -0,0 +1,44 @@
{
"name": "contribution-outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "${POSTGRES_USER:-rwa_user}",
"database.password": "${POSTGRES_PASSWORD:-rwa_secure_password}",
"database.dbname": "rwa_contribution",
"topic.prefix": "cdc.contribution",
"table.include.list": "public.outbox_events",
"plugin.name": "pgoutput",
"publication.name": "debezium_contribution_outbox_publication",
"publication.autocreate.mode": "filtered",
"slot.name": "debezium_contribution_outbox_slot",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"transforms": "unwrap,route",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,table,source.ts_ms",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": ".*",
"transforms.route.replacement": "cdc.contribution.outbox",
"heartbeat.interval.ms": "10000",
"snapshot.mode": "initial",
"decimal.handling.mode": "string",
"time.precision.mode": "connect"
}
}

View File

@ -0,0 +1,44 @@
{
"name": "mining-outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "${POSTGRES_USER:-rwa_user}",
"database.password": "${POSTGRES_PASSWORD:-rwa_secure_password}",
"database.dbname": "rwa_mining",
"topic.prefix": "cdc.mining",
"table.include.list": "public.outbox_events",
"plugin.name": "pgoutput",
"publication.name": "debezium_mining_outbox_publication",
"publication.autocreate.mode": "filtered",
"slot.name": "debezium_mining_outbox_slot",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"transforms": "unwrap,route",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,table,source.ts_ms",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": ".*",
"transforms.route.replacement": "cdc.mining.outbox",
"heartbeat.interval.ms": "10000",
"snapshot.mode": "initial",
"decimal.handling.mode": "string",
"time.precision.mode": "connect"
}
}

View File

@ -0,0 +1,44 @@
{
"name": "mining-wallet-outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "${POSTGRES_USER:-rwa_user}",
"database.password": "${POSTGRES_PASSWORD:-rwa_secure_password}",
"database.dbname": "mining_wallet_db",
"topic.prefix": "cdc.mining-wallet",
"table.include.list": "public.outbox_events",
"plugin.name": "pgoutput",
"publication.name": "debezium_mining_wallet_outbox_publication",
"publication.autocreate.mode": "filtered",
"slot.name": "debezium_mining_wallet_outbox_slot",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"transforms": "unwrap,route",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,table,source.ts_ms",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": ".*",
"transforms.route.replacement": "cdc.mining-wallet.outbox",
"heartbeat.interval.ms": "10000",
"snapshot.mode": "initial",
"decimal.handling.mode": "string",
"time.precision.mode": "connect"
}
}

View File

@ -4,20 +4,24 @@
# =============================================================================
# Usage: ./register-connectors.sh
#
# This script registers the PostgreSQL connector for identity-service CDC
# It should be run after Debezium Connect is fully started
# This script registers all PostgreSQL connectors for CDC:
# - 1.0 系统: identity, planting, referral, wallet, authorization
# - 2.0 系统: auth, contribution, mining, trading, mining-wallet (outbox tables)
# =============================================================================
set -e
CONNECT_URL="${DEBEZIUM_CONNECT_URL:-http://localhost:8083}"
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
MAX_RETRIES=30
RETRY_INTERVAL=5
echo "=== Debezium Connector Registration ==="
echo "Connect URL: $CONNECT_URL"
echo "Script Dir: $SCRIPT_DIR"
# Wait for Debezium Connect to be ready
echo ""
echo "Waiting for Debezium Connect to be ready..."
for i in $(seq 1 $MAX_RETRIES); do
if curl -s "$CONNECT_URL/" > /dev/null 2>&1; then
@ -38,81 +42,86 @@ echo "Checking existing connectors..."
EXISTING=$(curl -s "$CONNECT_URL/connectors")
echo "Existing connectors: $EXISTING"
# Register identity-postgres connector
echo ""
echo "Registering identity-postgres-connector..."
# Function to register a connector from JSON file
register_connector() {
local json_file="$1"
local connector_name="$2"
CONNECTOR_CONFIG='{
"name": "identity-postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
if [ ! -f "$json_file" ]; then
echo "WARNING: $json_file not found, skipping..."
return
fi
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "'${POSTGRES_USER:-rwa_user}'",
"database.password": "'${POSTGRES_PASSWORD:-rwa_secure_password}'",
"database.dbname": "rwa_identity",
"database.server.name": "identity",
echo ""
echo "Registering $connector_name..."
"topic.prefix": "cdc.identity",
# Delete existing connector if exists
if echo "$EXISTING" | grep -q "$connector_name"; then
echo " Deleting existing $connector_name..."
curl -s -X DELETE "$CONNECT_URL/connectors/$connector_name"
sleep 2
fi
"table.include.list": "public.user_accounts",
# Register connector
RESULT=$(curl -s -X POST \
-H "Content-Type: application/json" \
-d @"$json_file" \
"$CONNECT_URL/connectors")
"plugin.name": "pgoutput",
"publication.name": "debezium_identity_publication",
"publication.autocreate.mode": "filtered",
echo " Result: $RESULT"
"slot.name": "debezium_identity_slot",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,table,source.ts_ms",
"heartbeat.interval.ms": "10000",
"snapshot.mode": "initial",
"decimal.handling.mode": "string",
"time.precision.mode": "connect"
}
}'
# Delete existing connector if exists
if echo "$EXISTING" | grep -q "identity-postgres-connector"; then
echo "Deleting existing identity-postgres-connector..."
curl -s -X DELETE "$CONNECT_URL/connectors/identity-postgres-connector"
# Check status
sleep 2
fi
STATUS=$(curl -s "$CONNECT_URL/connectors/$connector_name/status" 2>/dev/null || echo "Status check failed")
echo " Status: $STATUS"
}
# Create connector
RESULT=$(curl -s -X POST \
-H "Content-Type: application/json" \
-d "$CONNECTOR_CONFIG" \
"$CONNECT_URL/connectors")
echo "Result: $RESULT"
# Check connector status
# =============================================================================
# 1.0 系统 Connectors (监听业务表)
# =============================================================================
echo ""
echo "Checking connector status..."
sleep 3
STATUS=$(curl -s "$CONNECT_URL/connectors/identity-postgres-connector/status")
echo "Connector status: $STATUS"
echo "=== Registering 1.0 System Connectors ==="
register_connector "$SCRIPT_DIR/identity-connector.json" "identity-postgres-connector"
register_connector "$SCRIPT_DIR/planting-connector.json" "planting-postgres-connector"
register_connector "$SCRIPT_DIR/referral-connector.json" "referral-postgres-connector"
register_connector "$SCRIPT_DIR/wallet-connector.json" "wallet-postgres-connector"
register_connector "$SCRIPT_DIR/authorization-connector.json" "authorization-postgres-connector"
# =============================================================================
# 2.0 系统 Connectors (监听 outbox_events 表)
# =============================================================================
echo ""
echo "=== Registering 2.0 System Outbox Connectors ==="
register_connector "$SCRIPT_DIR/auth-outbox-connector.json" "auth-outbox-connector"
register_connector "$SCRIPT_DIR/contribution-outbox-connector.json" "contribution-outbox-connector"
register_connector "$SCRIPT_DIR/mining-outbox-connector.json" "mining-outbox-connector"
register_connector "$SCRIPT_DIR/trading-outbox-connector.json" "trading-outbox-connector"
register_connector "$SCRIPT_DIR/mining-wallet-outbox-connector.json" "mining-wallet-outbox-connector"
# =============================================================================
# Summary
# =============================================================================
echo ""
echo "=== Registration Complete ==="
echo ""
echo "Kafka topics created:"
echo ""
echo "1.0 System (业务表 CDC):"
echo " - cdc.identity.public.user_accounts"
echo " - cdc.planting.public.planting_orders"
echo " - cdc.referral.public.referral_relationships"
echo " - cdc.wallet.public.wallet_accounts"
echo " - cdc.authorization.public.*"
echo ""
echo "2.0 System (Outbox 表 CDC):"
echo " - cdc.auth.outbox"
echo " - cdc.contribution.outbox"
echo " - cdc.mining.outbox"
echo " - cdc.trading.outbox"
echo " - cdc.mining-wallet.outbox"
echo ""
echo "To verify:"
echo " curl $CONNECT_URL/connectors"
echo " curl $CONNECT_URL/connectors/identity-postgres-connector/status"
echo " curl $CONNECT_URL/connectors/<connector-name>/status"

View File

@ -0,0 +1,44 @@
{
"name": "trading-outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "${POSTGRES_USER:-rwa_user}",
"database.password": "${POSTGRES_PASSWORD:-rwa_secure_password}",
"database.dbname": "rwa_trading",
"topic.prefix": "cdc.trading",
"table.include.list": "public.outbox_events",
"plugin.name": "pgoutput",
"publication.name": "debezium_trading_outbox_publication",
"publication.autocreate.mode": "filtered",
"slot.name": "debezium_trading_outbox_slot",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"transforms": "unwrap,route",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "op,table,source.ts_ms",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": ".*",
"transforms.route.replacement": "cdc.trading.outbox",
"heartbeat.interval.ms": "10000",
"snapshot.mode": "initial",
"decimal.handling.mode": "string",
"time.precision.mode": "connect"
}
}