From 631fe2bf31f7a8549d165b8b1969af7015097ff7 Mon Sep 17 00:00:00 2001 From: hailin Date: Tue, 13 Jan 2026 22:14:51 -0800 Subject: [PATCH] fix(contribution-service): reset consumer group offsets to earliest on startup Use admin.resetOffsets({ earliest: true }) before connecting consumer to ensure CDC sync always starts from the beginning of Kafka topics, regardless of previously committed offsets. This fixes the infinite loop issue where existing consumer groups had committed offsets at high watermark, causing eachMessage to never be called. Co-Authored-By: Claude Opus 4.5 --- .../kafka/cdc-consumer.service.ts | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts b/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts index a8156d7c..68cd7beb 100644 --- a/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts +++ b/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts @@ -359,8 +359,27 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy { return; } - // 使用固定的 group id(首次从头消费,重启后从上次位置继续) + // 使用固定的 group id const phaseGroupId = `contribution-service-cdc-phase-${phase.tableName}`; + + // 重置 consumer group 的 offset 到最早位置 + // 使用 admin.resetOffsets 而不是 setOffsets,更简洁且专门用于重置到 earliest/latest + // 这确保每次服务启动都会从头开始消费,不受之前 committed offset 影响 + // 参考: https://kafka.js.org/docs/admin#a-name-reset-offsets-a-resetoffsets + this.logger.log(`[CDC] Phase ${phase.tableName}: Resetting consumer group ${phaseGroupId} offsets to earliest`); + try { + await admin.resetOffsets({ + groupId: phaseGroupId, + topic: phase.topic, + earliest: true, + }); + this.logger.log(`[CDC] Phase ${phase.tableName}: Consumer group offsets reset successfully`); + } catch (resetError: any) { + // 如果 consumer group 不存在,resetOffsets 会失败,这是正常的(首次运行) + // fromBeginning: true 会在这种情况下生效 + this.logger.log(`[CDC] Phase ${phase.tableName}: Could not reset offsets (may be first run): ${resetError.message}`); + } + const phaseConsumer = this.kafka.consumer({ groupId: phaseGroupId, }); @@ -368,7 +387,7 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy { try { await phaseConsumer.connect(); - // 订阅单个 topic + // 订阅单个 topic,fromBeginning 对新 group 有效 await phaseConsumer.subscribe({ topic: phase.topic, fromBeginning: true,