diff --git a/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts b/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts index a8156d7c..68cd7beb 100644 --- a/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts +++ b/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts @@ -359,8 +359,27 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy { return; } - // 使用固定的 group id(首次从头消费,重启后从上次位置继续) + // 使用固定的 group id const phaseGroupId = `contribution-service-cdc-phase-${phase.tableName}`; + + // 重置 consumer group 的 offset 到最早位置 + // 使用 admin.resetOffsets 而不是 setOffsets,更简洁且专门用于重置到 earliest/latest + // 这确保每次服务启动都会从头开始消费,不受之前 committed offset 影响 + // 参考: https://kafka.js.org/docs/admin#a-name-reset-offsets-a-resetoffsets + this.logger.log(`[CDC] Phase ${phase.tableName}: Resetting consumer group ${phaseGroupId} offsets to earliest`); + try { + await admin.resetOffsets({ + groupId: phaseGroupId, + topic: phase.topic, + earliest: true, + }); + this.logger.log(`[CDC] Phase ${phase.tableName}: Consumer group offsets reset successfully`); + } catch (resetError: any) { + // 如果 consumer group 不存在,resetOffsets 会失败,这是正常的(首次运行) + // fromBeginning: true 会在这种情况下生效 + this.logger.log(`[CDC] Phase ${phase.tableName}: Could not reset offsets (may be first run): ${resetError.message}`); + } + const phaseConsumer = this.kafka.consumer({ groupId: phaseGroupId, }); @@ -368,7 +387,7 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy { try { await phaseConsumer.connect(); - // 订阅单个 topic + // 订阅单个 topic,fromBeginning 对新 group 有效 await phaseConsumer.subscribe({ topic: phase.topic, fromBeginning: true,