fix(contribution-service): CDC planting_orders 阶段按 order_id 排序处理

## 问题背景

用户 D25122700018 的层级已解锁(unlocked_level_depth=5),但缺少 TEAM_LEVEL 算力记录。
其下级用户 D25122700019 的团队算力被错误地分配给了 D25122700015(level 2)而非 D25122700018(level 1)。

## 根本原因分析

1. 源系统数据顺序正确:
   - D25122700018: order_id=55, created_at=2026-01-09 11:57:01 (先认种)
   - D25122700019: order_id=57, created_at=2026-01-09 12:00:38 (后认种)

2. Kafka 消息顺序错误:
   - D25122700019: offset=732, synced_at=10:15:32 (先处理)
   - D25122700018: offset=798, synced_at=10:15:41 (后处理)

3. 原因:Debezium snapshot 按 PostgreSQL 物理存储顺序(heap order)读取数据,
   而非按主键顺序。即使 topic 只有 1 个分区,消息顺序仍然错误。

4. 后果:当处理 D25122700019 的认种时,D25122700018 的 unlocked_level_depth 还是 0,
   导致 D25122700019 的 TEAM_LEVEL 算力跳过 level 1 直接分配给 level 2。

## 解决方案

对 planting_orders 阶段实现"收集-排序-处理"模式:
1. 先收集所有消息到内存数组(不立即处理)
2. 按 order_id(源系统主键)升序排序
3. 再按排序后的顺序逐条处理

这确保上游用户的认种记录先于下游用户处理,避免算力分配错误。

## 受影响用户案例

- 上游用户: D25122700018 (order_id=55)
- 下游用户: D25122700019 (order_id=57, 58, 59)
- 错误分配: D25122700019 的 TEAM_LEVEL 给了 D25122700015 而非 D25122700018

## 回滚方法

如需回滚此修改,将 consumePhaseToEnd 方法中的判断条件改为 false:
```typescript
const needsSorting = false; // 原: phase.tableName === 'planting_orders'
```
或直接 revert 此 commit。

## 风险评估

- 业务逻辑完全不变,只改变处理顺序
- user_accounts 和 referral_relationships 阶段保持原有逻辑
- 内存开销可控(10000 条记录约 5MB)
- 排序开销可忽略(O(n log n))

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-23 03:41:14 -08:00
parent e9dea69ee9
commit 37d3300b17
1 changed files with 196 additions and 41 deletions

View File

@ -59,6 +59,15 @@ export interface TopicPhase {
tableName: string;
}
/**
*
* Kafka
*/
interface CollectedMessage {
payload: EachMessagePayload;
orderId: bigint; // 用于排序的 order_id
}
@Injectable()
export class CDCConsumerService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(CDCConsumerService.name);
@ -334,6 +343,14 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy {
/**
*
*
* planting_orders 使"收集-排序-处理"
* 1.
* 2. order_id
* 3.
*
* Debezium snapshot
*
*/
private async consumePhaseToEnd(phase: TopicPhase): Promise<void> {
const admin = this.kafka.admin();
@ -393,48 +410,15 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy {
fromBeginning: true,
});
let processedOffsets: Map<number, bigint> = new Map();
let isComplete = false;
// 判断是否需要使用"收集-排序-处理"模式
const needsSorting = phase.tableName === 'planting_orders';
for (const partition of highWatermarks.keys()) {
processedOffsets.set(partition, BigInt(-1));
}
// 开始消费
await phaseConsumer.run({
eachMessage: async (payload: EachMessagePayload) => {
await this.handleMessage(payload);
// 更新已处理的 offset
processedOffsets.set(payload.partition, BigInt(payload.message.offset));
// 检查是否所有 partition 都已追上高水位线
let allCaughtUp = true;
for (const [partition, highWatermark] of highWatermarks) {
const processed = processedOffsets.get(partition) ?? BigInt(-1);
// 高水位线是下一个将被写入的 offset所以已处理的 offset 需要 >= highWatermark - 1
if (processed < BigInt(highWatermark) - BigInt(1)) {
allCaughtUp = false;
break;
}
}
if (allCaughtUp && !isComplete) {
isComplete = true;
this.logger.log(`[CDC] Phase ${phase.tableName}: Caught up with all partitions`);
}
},
});
// 等待追上高水位线
while (!isComplete) {
await new Promise(resolve => setTimeout(resolve, 100));
// 每秒检查一次进度
const currentProgress = Array.from(processedOffsets.entries())
.map(([p, o]) => `P${p}:${o}/${highWatermarks.get(p)}`)
.join(', ');
this.logger.debug(`[CDC] Phase ${phase.tableName} progress: ${currentProgress}`);
if (needsSorting) {
// planting_orders 阶段:使用"收集-排序-处理"模式
await this.consumePhaseWithSorting(phaseConsumer, phase, highWatermarks);
} else {
// 其他阶段:使用原有的"边消费边处理"模式
await this.consumePhaseDirectly(phaseConsumer, phase, highWatermarks);
}
// 停止消费
@ -450,6 +434,177 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy {
}
}
/**
*
* user_accounts referral_relationships
*/
private async consumePhaseDirectly(
phaseConsumer: Consumer,
phase: TopicPhase,
highWatermarks: Map<number, string>,
): Promise<void> {
let processedOffsets: Map<number, bigint> = new Map();
let isComplete = false;
for (const partition of highWatermarks.keys()) {
processedOffsets.set(partition, BigInt(-1));
}
// 开始消费
await phaseConsumer.run({
eachMessage: async (payload: EachMessagePayload) => {
await this.handleMessage(payload);
// 更新已处理的 offset
processedOffsets.set(payload.partition, BigInt(payload.message.offset));
// 检查是否所有 partition 都已追上高水位线
let allCaughtUp = true;
for (const [partition, highWatermark] of highWatermarks) {
const processed = processedOffsets.get(partition) ?? BigInt(-1);
// 高水位线是下一个将被写入的 offset所以已处理的 offset 需要 >= highWatermark - 1
if (processed < BigInt(highWatermark) - BigInt(1)) {
allCaughtUp = false;
break;
}
}
if (allCaughtUp && !isComplete) {
isComplete = true;
this.logger.log(`[CDC] Phase ${phase.tableName}: Caught up with all partitions`);
}
},
});
// 等待追上高水位线
while (!isComplete) {
await new Promise(resolve => setTimeout(resolve, 100));
// 每秒检查一次进度
const currentProgress = Array.from(processedOffsets.entries())
.map(([p, o]) => `P${p}:${o}/${highWatermarks.get(p)}`)
.join(', ');
this.logger.debug(`[CDC] Phase ${phase.tableName} progress: ${currentProgress}`);
}
}
/**
* --
* planting_orders order_id
*
* Debezium snapshot PostgreSQL
*
* unlocked_level_depth
*/
private async consumePhaseWithSorting(
phaseConsumer: Consumer,
phase: TopicPhase,
highWatermarks: Map<number, string>,
): Promise<void> {
const collectedMessages: CollectedMessage[] = [];
let processedOffsets: Map<number, bigint> = new Map();
let isComplete = false;
for (const partition of highWatermarks.keys()) {
processedOffsets.set(partition, BigInt(-1));
}
this.logger.log(`[CDC] Phase ${phase.tableName}: Using collect-sort-process mode`);
// 第一步:收集所有消息(不处理)
await phaseConsumer.run({
eachMessage: async (payload: EachMessagePayload) => {
// 解析消息获取 order_id 用于排序
const orderId = this.extractOrderIdFromPayload(payload);
collectedMessages.push({
payload,
orderId,
});
// 更新已处理的 offset
processedOffsets.set(payload.partition, BigInt(payload.message.offset));
// 检查是否所有 partition 都已追上高水位线
let allCaughtUp = true;
for (const [partition, highWatermark] of highWatermarks) {
const processed = processedOffsets.get(partition) ?? BigInt(-1);
if (processed < BigInt(highWatermark) - BigInt(1)) {
allCaughtUp = false;
break;
}
}
if (allCaughtUp && !isComplete) {
isComplete = true;
this.logger.log(`[CDC] Phase ${phase.tableName}: Collected all ${collectedMessages.length} messages`);
}
},
});
// 等待收集完成
while (!isComplete) {
await new Promise(resolve => setTimeout(resolve, 100));
// 每秒检查一次进度
const currentProgress = Array.from(processedOffsets.entries())
.map(([p, o]) => `P${p}:${o}/${highWatermarks.get(p)}`)
.join(', ');
this.logger.debug(`[CDC] Phase ${phase.tableName} collecting: ${currentProgress}, collected: ${collectedMessages.length}`);
}
// 第二步:按 order_id 升序排序
this.logger.log(`[CDC] Phase ${phase.tableName}: Sorting ${collectedMessages.length} messages by order_id`);
collectedMessages.sort((a, b) => {
if (a.orderId < b.orderId) return -1;
if (a.orderId > b.orderId) return 1;
return 0;
});
// 记录排序前后的变化(用于调试)
if (collectedMessages.length > 0) {
const firstFive = collectedMessages.slice(0, 5).map(m => m.orderId.toString()).join(', ');
const lastFive = collectedMessages.slice(-5).map(m => m.orderId.toString()).join(', ');
this.logger.log(`[CDC] Phase ${phase.tableName}: Sorted order_ids: first=[${firstFive}], last=[${lastFive}]`);
}
// 第三步:按排序后的顺序处理消息
this.logger.log(`[CDC] Phase ${phase.tableName}: Processing ${collectedMessages.length} messages in sorted order`);
let processedCount = 0;
for (const collected of collectedMessages) {
await this.handleMessage(collected.payload);
processedCount++;
// 每处理 100 条记录日志一次进度
if (processedCount % 100 === 0) {
this.logger.log(`[CDC] Phase ${phase.tableName}: Processed ${processedCount}/${collectedMessages.length} messages`);
}
}
this.logger.log(`[CDC] Phase ${phase.tableName}: Completed processing all ${collectedMessages.length} messages in order_id order`);
}
/**
* Kafka order_id
*/
private extractOrderIdFromPayload(payload: EachMessagePayload): bigint {
try {
if (!payload.message.value) {
return BigInt(0);
}
const rawData = JSON.parse(payload.message.value.toString());
// order_id 是源表的主键字段
const orderId = rawData.order_id || rawData.id || 0;
// 转换为 bigint 用于比较
return BigInt(orderId);
} catch (error) {
this.logger.warn(`[CDC] Failed to extract order_id from message, using 0`, error);
return BigInt(0);
}
}
/**
* topic
*/