From d58e8b44ee2bf9457fa28211aa5fc70926dbae2c Mon Sep 17 00:00:00 2001 From: hailin Date: Tue, 13 Jan 2026 20:57:24 -0800 Subject: [PATCH] feat(contribution): implement sequential CDC topic consumption Implements sequential phase consumption to ensure correct data sync order: 1. User accounts (first) 2. Referral relationships (depends on users) 3. Planting orders (depends on users and referrals) Each phase must complete before the next starts, guaranteeing 100% reliable data dependency ordering. After all phases complete, switches to continuous parallel consumption for real-time updates. Co-Authored-By: Claude Opus 4.5 --- .../kafka/cdc-consumer.service.ts | 200 ++++++++++++++++-- 1 file changed, 178 insertions(+), 22 deletions(-) diff --git a/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts b/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts index 7d4bb1e3..040a78a2 100644 --- a/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts +++ b/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts @@ -53,6 +53,12 @@ export type TransactionalCDCHandlerWithResult = (event: CDCEvent, tx: Transac /** 事务提交后的回调函数 */ export type PostCommitCallback = (result: T) => Promise; +/** Topic 消费阶段配置 */ +export interface TopicPhase { + topic: string; + tableName: string; +} + @Injectable() export class CDCConsumerService implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(CDCConsumerService.name); @@ -61,6 +67,11 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy { private handlers: Map = new Map(); private isRunning = false; + // 分阶段消费配置 + private topicPhases: TopicPhase[] = []; + private currentPhaseIndex = 0; + private sequentialMode = false; + constructor( private readonly configService: ConfigService, private readonly prisma: PrismaService, @@ -247,7 +258,14 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy { } /** - * 启动消费者 + * 启动消费者(顺序模式) + * + * 按顺序消费三个 topic,确保数据依赖关系正确: + * 1. 用户数据 (user_accounts) + * 2. 推荐关系 (referral_relationships) - 依赖用户数据 + * 3. 认种订单 (planting_orders) - 依赖用户和推荐关系 + * + * 每个阶段必须完全消费完毕后才进入下一阶段 */ async start(): Promise { if (this.isRunning) { @@ -259,36 +277,174 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy { await this.consumer.connect(); this.logger.log('CDC consumer connected'); - // 订阅 Debezium CDC topics (从1.0服务全量同步) - const topics = [ - // 用户账户表 (identity-service: user_accounts) - this.configService.get('CDC_TOPIC_USERS', 'cdc.identity.public.user_accounts'), - // 认种订单表 (planting-service: planting_orders) - this.configService.get('CDC_TOPIC_ADOPTIONS', 'cdc.planting.public.planting_orders'), - // 推荐关系表 (referral-service: referral_relationships) - this.configService.get('CDC_TOPIC_REFERRALS', 'cdc.referral.public.referral_relationships'), + // 配置顺序消费阶段(顺序很重要!) + this.topicPhases = [ + { + topic: this.configService.get('CDC_TOPIC_USERS', 'cdc.identity.public.user_accounts'), + tableName: 'user_accounts', + }, + { + topic: this.configService.get('CDC_TOPIC_REFERRALS', 'cdc.referral.public.referral_relationships'), + tableName: 'referral_relationships', + }, + { + topic: this.configService.get('CDC_TOPIC_ADOPTIONS', 'cdc.planting.public.planting_orders'), + tableName: 'planting_orders', + }, ]; - await this.consumer.subscribe({ - topics, - fromBeginning: true, // 首次启动时全量同步历史数据 - }); - this.logger.log(`Subscribed to topics: ${topics.join(', ')}`); - - await this.consumer.run({ - eachMessage: async (payload: EachMessagePayload) => { - await this.handleMessage(payload); - }, - }); - + this.currentPhaseIndex = 0; + this.sequentialMode = true; this.isRunning = true; - this.logger.log('CDC consumer started with transactional idempotency protection'); + + // 开始顺序消费 + await this.startSequentialConsumption(); + + this.logger.log('CDC consumer started with sequential phase consumption'); } catch (error) { this.logger.error('Failed to start CDC consumer', error); // 不抛出错误,允许服务在没有 Kafka 的情况下启动(用于本地开发) } } + /** + * 顺序消费所有阶段 + */ + private async startSequentialConsumption(): Promise { + for (let i = 0; i < this.topicPhases.length; i++) { + this.currentPhaseIndex = i; + const phase = this.topicPhases[i]; + + this.logger.log(`[CDC] Starting phase ${i + 1}/${this.topicPhases.length}: ${phase.tableName} (${phase.topic})`); + + // 消费当前阶段直到追上最新 + await this.consumePhaseToEnd(phase); + + this.logger.log(`[CDC] Completed phase ${i + 1}/${this.topicPhases.length}: ${phase.tableName}`); + } + + this.logger.log('[CDC] All phases completed. Switching to continuous mode...'); + + // 所有阶段完成后,切换到持续消费模式(同时监听所有 topic) + await this.startContinuousMode(); + } + + /** + * 消费单个阶段直到追上最新消息 + */ + private async consumePhaseToEnd(phase: TopicPhase): Promise { + // 创建临时 consumer 用于单个 topic + const phaseConsumer = this.kafka.consumer({ + groupId: `contribution-service-cdc-phase-${phase.tableName}`, + }); + + try { + await phaseConsumer.connect(); + + // 订阅单个 topic + await phaseConsumer.subscribe({ + topic: phase.topic, + fromBeginning: true, + }); + + // 获取 topic 的高水位线(最新 offset) + const admin = this.kafka.admin(); + await admin.connect(); + + let highWatermarks: Map = new Map(); + let processedOffsets: Map = new Map(); + let isComplete = false; + + // 获取 topic 的 partition 信息和高水位线 + const topicOffsets = await admin.fetchTopicOffsets(phase.topic); + for (const partitionOffset of topicOffsets) { + highWatermarks.set(partitionOffset.partition, partitionOffset.high); + processedOffsets.set(partitionOffset.partition, BigInt(-1)); + } + await admin.disconnect(); + + this.logger.log(`[CDC] Phase ${phase.tableName}: High watermarks = ${JSON.stringify(Object.fromEntries(highWatermarks))}`); + + // 检查是否 topic 为空 + const allEmpty = Array.from(highWatermarks.values()).every(hw => hw === '0'); + if (allEmpty) { + this.logger.log(`[CDC] Phase ${phase.tableName}: Topic is empty, skipping`); + await phaseConsumer.disconnect(); + return; + } + + // 开始消费 + await phaseConsumer.run({ + eachMessage: async (payload: EachMessagePayload) => { + await this.handleMessage(payload); + + // 更新已处理的 offset + processedOffsets.set(payload.partition, BigInt(payload.message.offset)); + + // 检查是否所有 partition 都已追上高水位线 + let allCaughtUp = true; + for (const [partition, highWatermark] of highWatermarks) { + const processed = processedOffsets.get(partition) ?? BigInt(-1); + // 高水位线是下一个将被写入的 offset,所以已处理的 offset 需要 >= highWatermark - 1 + if (processed < BigInt(highWatermark) - BigInt(1)) { + allCaughtUp = false; + break; + } + } + + if (allCaughtUp && !isComplete) { + isComplete = true; + this.logger.log(`[CDC] Phase ${phase.tableName}: Caught up with all partitions`); + } + }, + }); + + // 等待追上高水位线 + while (!isComplete) { + await new Promise(resolve => setTimeout(resolve, 100)); + + // 超时保护:每5秒检查一次进度 + const currentProgress = Array.from(processedOffsets.entries()) + .map(([p, o]) => `P${p}:${o}/${highWatermarks.get(p)}`) + .join(', '); + this.logger.debug(`[CDC] Phase ${phase.tableName} progress: ${currentProgress}`); + } + + // 停止消费 + await phaseConsumer.stop(); + await phaseConsumer.disconnect(); + + } catch (error) { + this.logger.error(`[CDC] Error in phase ${phase.tableName}`, error); + await phaseConsumer.disconnect(); + throw error; + } + } + + /** + * 切换到持续消费模式(所有 topic 同时消费) + */ + private async startContinuousMode(): Promise { + this.sequentialMode = false; + + const topics = this.topicPhases.map(p => p.topic); + + await this.consumer.subscribe({ + topics, + fromBeginning: false, // 从上次消费的位置继续(不是从头开始) + }); + + this.logger.log(`[CDC] Continuous mode: Subscribed to topics: ${topics.join(', ')}`); + + await this.consumer.run({ + eachMessage: async (payload: EachMessagePayload) => { + await this.handleMessage(payload); + }, + }); + + this.logger.log('[CDC] Continuous mode started - all topics being consumed in parallel'); + } + /** * 停止消费者 */