From 534d4ce70c550b31cb78b16a3b77d3a26a129ab6 Mon Sep 17 00:00:00 2001 From: hailin Date: Thu, 29 Jan 2026 22:20:23 -0800 Subject: [PATCH] =?UTF-8?q?fix(2.0):=20=E6=89=80=E6=9C=89=20Kafka=20consum?= =?UTF-8?q?er=20=E6=94=B9=E4=B8=BA=20fromBeginning:=20true=20=E7=A1=AE?= =?UTF-8?q?=E4=BF=9D=20full-reset=20=E5=85=A8=E9=87=8F=E5=90=8C=E6=AD=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - mining-blockchain-service: withdrawal/mpc/deposit-ack consumer 改为 fromBeginning: true - trading-service: cdc-consumer/market-maker-deposit consumer 改为 fromBeginning: true - mining-service: contribution-event handler 改为 fromBeginning: true - deploy-mining.sh: CDC_CONSUMER_GROUPS 补充所有 2.0 服务的 consumer group ID 确保 ./deploy-mining.sh full-reset 可以 100% 从 0 开始同步所有 1.0 数据。 Co-Authored-By: Claude Opus 4.5 --- backend/services/deploy-mining.sh | 6 ++++++ .../infrastructure/kafka/deposit-ack-consumer.service.ts | 2 +- .../src/infrastructure/kafka/mpc-event-consumer.service.ts | 2 +- .../kafka/withdrawal-event-consumer.service.ts | 2 +- .../event-handlers/contribution-event.handler.ts | 2 +- .../src/infrastructure/kafka/cdc-consumer.service.ts | 2 +- .../kafka/market-maker-deposit-consumer.service.ts | 2 +- 7 files changed, 12 insertions(+), 6 deletions(-) diff --git a/backend/services/deploy-mining.sh b/backend/services/deploy-mining.sh index 8fad73a6..0c359844 100755 --- a/backend/services/deploy-mining.sh +++ b/backend/services/deploy-mining.sh @@ -120,6 +120,12 @@ CDC_CONSUMER_GROUPS=( "auth-service-cdc-group" "auth-service-cdc-group-wallet" "mining-admin-service-cdc-group" + "mining-blockchain-service-withdrawal-events" + "mining-blockchain-service-mpc-events" + "mining-blockchain-service-deposit-acks" + "trading-service-cdc-group" + "trading-service-mm-deposit-group" + "mining-service-contribution-sync" ) # Debezium Outbox Connectors (for 2.0 service events -> mining-admin-service) diff --git a/backend/services/mining-blockchain-service/src/infrastructure/kafka/deposit-ack-consumer.service.ts b/backend/services/mining-blockchain-service/src/infrastructure/kafka/deposit-ack-consumer.service.ts index 5943988f..a718f8f8 100644 --- a/backend/services/mining-blockchain-service/src/infrastructure/kafka/deposit-ack-consumer.service.ts +++ b/backend/services/mining-blockchain-service/src/infrastructure/kafka/deposit-ack-consumer.service.ts @@ -76,7 +76,7 @@ export class DepositAckConsumerService implements OnModuleInit, OnModuleDestroy await this.consumer.subscribe({ topics: Object.values(ACK_TOPICS), - fromBeginning: false, + fromBeginning: true, }); this.logger.log(`[SUBSCRIBE] Subscribed to ACK topics`); diff --git a/backend/services/mining-blockchain-service/src/infrastructure/kafka/mpc-event-consumer.service.ts b/backend/services/mining-blockchain-service/src/infrastructure/kafka/mpc-event-consumer.service.ts index c6d6ea6a..56e83fcb 100644 --- a/backend/services/mining-blockchain-service/src/infrastructure/kafka/mpc-event-consumer.service.ts +++ b/backend/services/mining-blockchain-service/src/infrastructure/kafka/mpc-event-consumer.service.ts @@ -118,7 +118,7 @@ export class MpcEventConsumerService implements OnModuleInit, OnModuleDestroy { this.logger.log(`[CONNECT] MPC Event Kafka consumer connected successfully`); // Subscribe to MPC topics - await this.consumer.subscribe({ topics: Object.values(MPC_TOPICS), fromBeginning: false }); + await this.consumer.subscribe({ topics: Object.values(MPC_TOPICS), fromBeginning: true }); this.logger.log(`[SUBSCRIBE] Subscribed to MPC topics: ${Object.values(MPC_TOPICS).join(', ')}`); // Start consuming diff --git a/backend/services/mining-blockchain-service/src/infrastructure/kafka/withdrawal-event-consumer.service.ts b/backend/services/mining-blockchain-service/src/infrastructure/kafka/withdrawal-event-consumer.service.ts index c42e0503..4b7a49d2 100644 --- a/backend/services/mining-blockchain-service/src/infrastructure/kafka/withdrawal-event-consumer.service.ts +++ b/backend/services/mining-blockchain-service/src/infrastructure/kafka/withdrawal-event-consumer.service.ts @@ -91,7 +91,7 @@ export class WithdrawalEventConsumerService implements OnModuleInit, OnModuleDes await this.consumer.subscribe({ topics: Object.values(WITHDRAWAL_TOPICS), - fromBeginning: false, + fromBeginning: true, }); this.logger.log(`[SUBSCRIBE] Subscribed to withdrawal topics`); diff --git a/backend/services/mining-service/src/application/event-handlers/contribution-event.handler.ts b/backend/services/mining-service/src/application/event-handlers/contribution-event.handler.ts index 71ea8ebc..fd75cf28 100644 --- a/backend/services/mining-service/src/application/event-handlers/contribution-event.handler.ts +++ b/backend/services/mining-service/src/application/event-handlers/contribution-event.handler.ts @@ -40,7 +40,7 @@ export class ContributionEventHandler implements OnModuleInit { // 订阅多个 topic for (const topic of topics) { - await this.consumer.subscribe({ topic, fromBeginning: false }); + await this.consumer.subscribe({ topic, fromBeginning: true }); } await this.consumer.run({ diff --git a/backend/services/trading-service/src/infrastructure/kafka/cdc-consumer.service.ts b/backend/services/trading-service/src/infrastructure/kafka/cdc-consumer.service.ts index 667e4e18..c2916ed9 100644 --- a/backend/services/trading-service/src/infrastructure/kafka/cdc-consumer.service.ts +++ b/backend/services/trading-service/src/infrastructure/kafka/cdc-consumer.service.ts @@ -75,7 +75,7 @@ export class CdcConsumerService implements OnModuleInit, OnModuleDestroy { await this.consumer.subscribe({ topics: [walletTopic], - fromBeginning: false, // 不需要处理历史消息 + fromBeginning: true, // full-reset 时从头开始同步 }); this.logger.log(`Subscribed to topic: ${walletTopic}`); diff --git a/backend/services/trading-service/src/infrastructure/kafka/market-maker-deposit-consumer.service.ts b/backend/services/trading-service/src/infrastructure/kafka/market-maker-deposit-consumer.service.ts index 12971a31..5509b8bf 100644 --- a/backend/services/trading-service/src/infrastructure/kafka/market-maker-deposit-consumer.service.ts +++ b/backend/services/trading-service/src/infrastructure/kafka/market-maker-deposit-consumer.service.ts @@ -81,7 +81,7 @@ export class MarketMakerDepositConsumerService implements OnModuleInit, OnModule await this.consumer.subscribe({ topics: [topic], - fromBeginning: false, + fromBeginning: true, }); this.logger.log(`Subscribed to topic: ${topic}`);