diff --git a/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts b/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts index 68cd7beb..5925db74 100644 --- a/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts +++ b/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts @@ -59,6 +59,15 @@ export interface TopicPhase { tableName: string; } +/** + * 收集的消息(用于排序后处理) + * 包含原始 Kafka 消息和解析后的业务数据 + */ +interface CollectedMessage { + payload: EachMessagePayload; + orderId: bigint; // 用于排序的 order_id +} + @Injectable() export class CDCConsumerService implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(CDCConsumerService.name); @@ -334,6 +343,14 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy { /** * 消费单个阶段直到追上最新消息 + * + * 对于 planting_orders 阶段,使用"收集-排序-处理"模式: + * 1. 先收集所有消息到内存 + * 2. 按 order_id(源系统主键)升序排序 + * 3. 再逐条处理 + * + * 这解决了 Debezium snapshot 不按主键顺序读取导致的消息乱序问题, + * 确保上游用户的认种记录先于下游用户处理,避免算力分配错误。 */ private async consumePhaseToEnd(phase: TopicPhase): Promise { const admin = this.kafka.admin(); @@ -393,48 +410,15 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy { fromBeginning: true, }); - let processedOffsets: Map = new Map(); - let isComplete = false; + // 判断是否需要使用"收集-排序-处理"模式 + const needsSorting = phase.tableName === 'planting_orders'; - for (const partition of highWatermarks.keys()) { - processedOffsets.set(partition, BigInt(-1)); - } - - // 开始消费 - await phaseConsumer.run({ - eachMessage: async (payload: EachMessagePayload) => { - await this.handleMessage(payload); - - // 更新已处理的 offset - processedOffsets.set(payload.partition, BigInt(payload.message.offset)); - - // 检查是否所有 partition 都已追上高水位线 - let allCaughtUp = true; - for (const [partition, highWatermark] of highWatermarks) { - const processed = processedOffsets.get(partition) ?? BigInt(-1); - // 高水位线是下一个将被写入的 offset,所以已处理的 offset 需要 >= highWatermark - 1 - if (processed < BigInt(highWatermark) - BigInt(1)) { - allCaughtUp = false; - break; - } - } - - if (allCaughtUp && !isComplete) { - isComplete = true; - this.logger.log(`[CDC] Phase ${phase.tableName}: Caught up with all partitions`); - } - }, - }); - - // 等待追上高水位线 - while (!isComplete) { - await new Promise(resolve => setTimeout(resolve, 100)); - - // 每秒检查一次进度 - const currentProgress = Array.from(processedOffsets.entries()) - .map(([p, o]) => `P${p}:${o}/${highWatermarks.get(p)}`) - .join(', '); - this.logger.debug(`[CDC] Phase ${phase.tableName} progress: ${currentProgress}`); + if (needsSorting) { + // planting_orders 阶段:使用"收集-排序-处理"模式 + await this.consumePhaseWithSorting(phaseConsumer, phase, highWatermarks); + } else { + // 其他阶段:使用原有的"边消费边处理"模式 + await this.consumePhaseDirectly(phaseConsumer, phase, highWatermarks); } // 停止消费 @@ -450,6 +434,177 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy { } } + /** + * 直接消费模式(边消费边处理) + * 用于 user_accounts 和 referral_relationships 阶段 + */ + private async consumePhaseDirectly( + phaseConsumer: Consumer, + phase: TopicPhase, + highWatermarks: Map, + ): Promise { + let processedOffsets: Map = new Map(); + let isComplete = false; + + for (const partition of highWatermarks.keys()) { + processedOffsets.set(partition, BigInt(-1)); + } + + // 开始消费 + await phaseConsumer.run({ + eachMessage: async (payload: EachMessagePayload) => { + await this.handleMessage(payload); + + // 更新已处理的 offset + processedOffsets.set(payload.partition, BigInt(payload.message.offset)); + + // 检查是否所有 partition 都已追上高水位线 + let allCaughtUp = true; + for (const [partition, highWatermark] of highWatermarks) { + const processed = processedOffsets.get(partition) ?? BigInt(-1); + // 高水位线是下一个将被写入的 offset,所以已处理的 offset 需要 >= highWatermark - 1 + if (processed < BigInt(highWatermark) - BigInt(1)) { + allCaughtUp = false; + break; + } + } + + if (allCaughtUp && !isComplete) { + isComplete = true; + this.logger.log(`[CDC] Phase ${phase.tableName}: Caught up with all partitions`); + } + }, + }); + + // 等待追上高水位线 + while (!isComplete) { + await new Promise(resolve => setTimeout(resolve, 100)); + + // 每秒检查一次进度 + const currentProgress = Array.from(processedOffsets.entries()) + .map(([p, o]) => `P${p}:${o}/${highWatermarks.get(p)}`) + .join(', '); + this.logger.debug(`[CDC] Phase ${phase.tableName} progress: ${currentProgress}`); + } + } + + /** + * 收集-排序-处理模式 + * 用于 planting_orders 阶段,确保按 order_id(源系统主键)顺序处理 + * + * 解决问题:Debezium snapshot 按 PostgreSQL 物理存储顺序读取数据, + * 而非按主键顺序,导致下游用户的认种记录可能先于上游用户处理, + * 造成算力分配错误(上游用户的 unlocked_level_depth 还未更新)。 + */ + private async consumePhaseWithSorting( + phaseConsumer: Consumer, + phase: TopicPhase, + highWatermarks: Map, + ): Promise { + const collectedMessages: CollectedMessage[] = []; + let processedOffsets: Map = new Map(); + let isComplete = false; + + for (const partition of highWatermarks.keys()) { + processedOffsets.set(partition, BigInt(-1)); + } + + this.logger.log(`[CDC] Phase ${phase.tableName}: Using collect-sort-process mode`); + + // 第一步:收集所有消息(不处理) + await phaseConsumer.run({ + eachMessage: async (payload: EachMessagePayload) => { + // 解析消息获取 order_id 用于排序 + const orderId = this.extractOrderIdFromPayload(payload); + + collectedMessages.push({ + payload, + orderId, + }); + + // 更新已处理的 offset + processedOffsets.set(payload.partition, BigInt(payload.message.offset)); + + // 检查是否所有 partition 都已追上高水位线 + let allCaughtUp = true; + for (const [partition, highWatermark] of highWatermarks) { + const processed = processedOffsets.get(partition) ?? BigInt(-1); + if (processed < BigInt(highWatermark) - BigInt(1)) { + allCaughtUp = false; + break; + } + } + + if (allCaughtUp && !isComplete) { + isComplete = true; + this.logger.log(`[CDC] Phase ${phase.tableName}: Collected all ${collectedMessages.length} messages`); + } + }, + }); + + // 等待收集完成 + while (!isComplete) { + await new Promise(resolve => setTimeout(resolve, 100)); + + // 每秒检查一次进度 + const currentProgress = Array.from(processedOffsets.entries()) + .map(([p, o]) => `P${p}:${o}/${highWatermarks.get(p)}`) + .join(', '); + this.logger.debug(`[CDC] Phase ${phase.tableName} collecting: ${currentProgress}, collected: ${collectedMessages.length}`); + } + + // 第二步:按 order_id 升序排序 + this.logger.log(`[CDC] Phase ${phase.tableName}: Sorting ${collectedMessages.length} messages by order_id`); + collectedMessages.sort((a, b) => { + if (a.orderId < b.orderId) return -1; + if (a.orderId > b.orderId) return 1; + return 0; + }); + + // 记录排序前后的变化(用于调试) + if (collectedMessages.length > 0) { + const firstFive = collectedMessages.slice(0, 5).map(m => m.orderId.toString()).join(', '); + const lastFive = collectedMessages.slice(-5).map(m => m.orderId.toString()).join(', '); + this.logger.log(`[CDC] Phase ${phase.tableName}: Sorted order_ids: first=[${firstFive}], last=[${lastFive}]`); + } + + // 第三步:按排序后的顺序处理消息 + this.logger.log(`[CDC] Phase ${phase.tableName}: Processing ${collectedMessages.length} messages in sorted order`); + let processedCount = 0; + for (const collected of collectedMessages) { + await this.handleMessage(collected.payload); + processedCount++; + + // 每处理 100 条记录日志一次进度 + if (processedCount % 100 === 0) { + this.logger.log(`[CDC] Phase ${phase.tableName}: Processed ${processedCount}/${collectedMessages.length} messages`); + } + } + + this.logger.log(`[CDC] Phase ${phase.tableName}: Completed processing all ${collectedMessages.length} messages in order_id order`); + } + + /** + * 从 Kafka 消息中提取 order_id(用于排序) + */ + private extractOrderIdFromPayload(payload: EachMessagePayload): bigint { + try { + if (!payload.message.value) { + return BigInt(0); + } + + const rawData = JSON.parse(payload.message.value.toString()); + // order_id 是源表的主键字段 + const orderId = rawData.order_id || rawData.id || 0; + + // 转换为 bigint 用于比较 + return BigInt(orderId); + } catch (error) { + this.logger.warn(`[CDC] Failed to extract order_id from message, using 0`, error); + return BigInt(0); + } + } + /** * 切换到持续消费模式(所有 topic 同时消费) */