diff --git a/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts b/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts index 39291b6e..d2c09171 100644 --- a/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts +++ b/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts @@ -333,9 +333,33 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy { * 消费单个阶段直到追上最新消息 */ private async consumePhaseToEnd(phase: TopicPhase): Promise { - // 创建临时 consumer 用于单个 topic + const admin = this.kafka.admin(); + await admin.connect(); + + // 获取 topic 的高水位线和最早 offset + const topicOffsets = await admin.fetchTopicOffsets(phase.topic); + const highWatermarks: Map = new Map(); + const earliestOffsets: Map = new Map(); + + for (const partitionOffset of topicOffsets) { + highWatermarks.set(partitionOffset.partition, partitionOffset.high); + earliestOffsets.set(partitionOffset.partition, partitionOffset.low); + } + + this.logger.log(`[CDC] Phase ${phase.tableName}: High watermarks = ${JSON.stringify(Object.fromEntries(highWatermarks))}`); + + // 检查是否 topic 为空 + const allEmpty = Array.from(highWatermarks.values()).every(hw => hw === '0'); + if (allEmpty) { + this.logger.log(`[CDC] Phase ${phase.tableName}: Topic is empty, skipping`); + await admin.disconnect(); + return; + } + + // 使用唯一的 group id(每次启动都重新消费) + const uniqueGroupId = `contribution-service-cdc-phase-${phase.tableName}-${Date.now()}`; const phaseConsumer = this.kafka.consumer({ - groupId: `contribution-service-cdc-phase-${phase.tableName}`, + groupId: uniqueGroupId, }); try { @@ -347,30 +371,11 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy { fromBeginning: true, }); - // 获取 topic 的高水位线(最新 offset) - const admin = this.kafka.admin(); - await admin.connect(); - - let highWatermarks: Map = new Map(); let processedOffsets: Map = new Map(); let isComplete = false; - // 获取 topic 的 partition 信息和高水位线 - const topicOffsets = await admin.fetchTopicOffsets(phase.topic); - for (const partitionOffset of topicOffsets) { - highWatermarks.set(partitionOffset.partition, partitionOffset.high); - processedOffsets.set(partitionOffset.partition, BigInt(-1)); - } - await admin.disconnect(); - - this.logger.log(`[CDC] Phase ${phase.tableName}: High watermarks = ${JSON.stringify(Object.fromEntries(highWatermarks))}`); - - // 检查是否 topic 为空 - const allEmpty = Array.from(highWatermarks.values()).every(hw => hw === '0'); - if (allEmpty) { - this.logger.log(`[CDC] Phase ${phase.tableName}: Topic is empty, skipping`); - await phaseConsumer.disconnect(); - return; + for (const partition of highWatermarks.keys()) { + processedOffsets.set(partition, BigInt(-1)); } // 开始消费 @@ -403,7 +408,7 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy { while (!isComplete) { await new Promise(resolve => setTimeout(resolve, 100)); - // 超时保护:每5秒检查一次进度 + // 每秒检查一次进度 const currentProgress = Array.from(processedOffsets.entries()) .map(([p, o]) => `P${p}:${o}/${highWatermarks.get(p)}`) .join(', '); @@ -414,9 +419,18 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy { await phaseConsumer.stop(); await phaseConsumer.disconnect(); + // 删除临时 consumer group + try { + await admin.deleteGroups([uniqueGroupId]); + } catch { + // 忽略删除失败 + } + await admin.disconnect(); + } catch (error) { this.logger.error(`[CDC] Error in phase ${phase.tableName}`, error); await phaseConsumer.disconnect(); + await admin.disconnect(); throw error; } }