fix(contribution-service): reset consumer group offsets to earliest on startup

Use admin.resetOffsets({ earliest: true }) before connecting consumer
to ensure CDC sync always starts from the beginning of Kafka topics,
regardless of previously committed offsets.

This fixes the infinite loop issue where existing consumer groups
had committed offsets at high watermark, causing eachMessage to
never be called.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-13 22:14:51 -08:00
parent d968efcad4
commit 631fe2bf31
1 changed files with 21 additions and 2 deletions

View File

@ -359,8 +359,27 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy {
return; return;
} }
// 使用固定的 group id(首次从头消费,重启后从上次位置继续) // 使用固定的 group id
const phaseGroupId = `contribution-service-cdc-phase-${phase.tableName}`; const phaseGroupId = `contribution-service-cdc-phase-${phase.tableName}`;
// 重置 consumer group 的 offset 到最早位置
// 使用 admin.resetOffsets 而不是 setOffsets更简洁且专门用于重置到 earliest/latest
// 这确保每次服务启动都会从头开始消费,不受之前 committed offset 影响
// 参考: https://kafka.js.org/docs/admin#a-name-reset-offsets-a-resetoffsets
this.logger.log(`[CDC] Phase ${phase.tableName}: Resetting consumer group ${phaseGroupId} offsets to earliest`);
try {
await admin.resetOffsets({
groupId: phaseGroupId,
topic: phase.topic,
earliest: true,
});
this.logger.log(`[CDC] Phase ${phase.tableName}: Consumer group offsets reset successfully`);
} catch (resetError: any) {
// 如果 consumer group 不存在resetOffsets 会失败,这是正常的(首次运行)
// fromBeginning: true 会在这种情况下生效
this.logger.log(`[CDC] Phase ${phase.tableName}: Could not reset offsets (may be first run): ${resetError.message}`);
}
const phaseConsumer = this.kafka.consumer({ const phaseConsumer = this.kafka.consumer({
groupId: phaseGroupId, groupId: phaseGroupId,
}); });
@ -368,7 +387,7 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy {
try { try {
await phaseConsumer.connect(); await phaseConsumer.connect();
// 订阅单个 topic // 订阅单个 topicfromBeginning 对新 group 有效
await phaseConsumer.subscribe({ await phaseConsumer.subscribe({
topic: phase.topic, topic: phase.topic,
fromBeginning: true, fromBeginning: true,