fix(contribution): use unique consumer group id for each phase

Previous consumer group had already consumed messages, so fromBeginning
had no effect. Now using timestamp-based unique group id to ensure
fresh consumption from beginning each time.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-13 21:11:40 -08:00
parent 22523aba14
commit aef6feb2cd
1 changed files with 38 additions and 24 deletions

View File

@ -333,9 +333,33 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy {
* *
*/ */
private async consumePhaseToEnd(phase: TopicPhase): Promise<void> { private async consumePhaseToEnd(phase: TopicPhase): Promise<void> {
// 创建临时 consumer 用于单个 topic const admin = this.kafka.admin();
await admin.connect();
// 获取 topic 的高水位线和最早 offset
const topicOffsets = await admin.fetchTopicOffsets(phase.topic);
const highWatermarks: Map<number, string> = new Map();
const earliestOffsets: Map<number, string> = new Map();
for (const partitionOffset of topicOffsets) {
highWatermarks.set(partitionOffset.partition, partitionOffset.high);
earliestOffsets.set(partitionOffset.partition, partitionOffset.low);
}
this.logger.log(`[CDC] Phase ${phase.tableName}: High watermarks = ${JSON.stringify(Object.fromEntries(highWatermarks))}`);
// 检查是否 topic 为空
const allEmpty = Array.from(highWatermarks.values()).every(hw => hw === '0');
if (allEmpty) {
this.logger.log(`[CDC] Phase ${phase.tableName}: Topic is empty, skipping`);
await admin.disconnect();
return;
}
// 使用唯一的 group id每次启动都重新消费
const uniqueGroupId = `contribution-service-cdc-phase-${phase.tableName}-${Date.now()}`;
const phaseConsumer = this.kafka.consumer({ const phaseConsumer = this.kafka.consumer({
groupId: `contribution-service-cdc-phase-${phase.tableName}`, groupId: uniqueGroupId,
}); });
try { try {
@ -347,30 +371,11 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy {
fromBeginning: true, fromBeginning: true,
}); });
// 获取 topic 的高水位线(最新 offset
const admin = this.kafka.admin();
await admin.connect();
let highWatermarks: Map<number, string> = new Map();
let processedOffsets: Map<number, bigint> = new Map(); let processedOffsets: Map<number, bigint> = new Map();
let isComplete = false; let isComplete = false;
// 获取 topic 的 partition 信息和高水位线 for (const partition of highWatermarks.keys()) {
const topicOffsets = await admin.fetchTopicOffsets(phase.topic); processedOffsets.set(partition, BigInt(-1));
for (const partitionOffset of topicOffsets) {
highWatermarks.set(partitionOffset.partition, partitionOffset.high);
processedOffsets.set(partitionOffset.partition, BigInt(-1));
}
await admin.disconnect();
this.logger.log(`[CDC] Phase ${phase.tableName}: High watermarks = ${JSON.stringify(Object.fromEntries(highWatermarks))}`);
// 检查是否 topic 为空
const allEmpty = Array.from(highWatermarks.values()).every(hw => hw === '0');
if (allEmpty) {
this.logger.log(`[CDC] Phase ${phase.tableName}: Topic is empty, skipping`);
await phaseConsumer.disconnect();
return;
} }
// 开始消费 // 开始消费
@ -403,7 +408,7 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy {
while (!isComplete) { while (!isComplete) {
await new Promise(resolve => setTimeout(resolve, 100)); await new Promise(resolve => setTimeout(resolve, 100));
// 超时保护:5秒检查一次进度 // 每秒检查一次进度
const currentProgress = Array.from(processedOffsets.entries()) const currentProgress = Array.from(processedOffsets.entries())
.map(([p, o]) => `P${p}:${o}/${highWatermarks.get(p)}`) .map(([p, o]) => `P${p}:${o}/${highWatermarks.get(p)}`)
.join(', '); .join(', ');
@ -414,9 +419,18 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy {
await phaseConsumer.stop(); await phaseConsumer.stop();
await phaseConsumer.disconnect(); await phaseConsumer.disconnect();
// 删除临时 consumer group
try {
await admin.deleteGroups([uniqueGroupId]);
} catch {
// 忽略删除失败
}
await admin.disconnect();
} catch (error) { } catch (error) {
this.logger.error(`[CDC] Error in phase ${phase.tableName}`, error); this.logger.error(`[CDC] Error in phase ${phase.tableName}`, error);
await phaseConsumer.disconnect(); await phaseConsumer.disconnect();
await admin.disconnect();
throw error; throw error;
} }
} }