feat(contribution): implement sequential CDC topic consumption

Implements sequential phase consumption to ensure correct data sync order:
1. User accounts (first)
2. Referral relationships (depends on users)
3. Planting orders (depends on users and referrals)

Each phase must complete before the next starts, guaranteeing 100%
reliable data dependency ordering. After all phases complete, switches
to continuous parallel consumption for real-time updates.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-13 20:57:24 -08:00
parent 30949af577
commit d58e8b44ee
1 changed files with 178 additions and 22 deletions

View File

@ -53,6 +53,12 @@ export type TransactionalCDCHandlerWithResult<T> = (event: CDCEvent, tx: Transac
/** 事务提交后的回调函数 */
export type PostCommitCallback<T> = (result: T) => Promise<void>;
/** Topic 消费阶段配置 */
export interface TopicPhase {
topic: string;
tableName: string;
}
@Injectable()
export class CDCConsumerService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(CDCConsumerService.name);
@ -61,6 +67,11 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy {
private handlers: Map<string, CDCHandler> = new Map();
private isRunning = false;
// 分阶段消费配置
private topicPhases: TopicPhase[] = [];
private currentPhaseIndex = 0;
private sequentialMode = false;
constructor(
private readonly configService: ConfigService,
private readonly prisma: PrismaService,
@ -247,7 +258,14 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy {
}
/**
*
*
*
* topic
* 1. (user_accounts)
* 2. (referral_relationships) -
* 3. (planting_orders) -
*
*
*/
async start(): Promise<void> {
if (this.isRunning) {
@ -259,36 +277,174 @@ export class CDCConsumerService implements OnModuleInit, OnModuleDestroy {
await this.consumer.connect();
this.logger.log('CDC consumer connected');
// 订阅 Debezium CDC topics (从1.0服务全量同步)
const topics = [
// 用户账户表 (identity-service: user_accounts)
this.configService.get<string>('CDC_TOPIC_USERS', 'cdc.identity.public.user_accounts'),
// 认种订单表 (planting-service: planting_orders)
this.configService.get<string>('CDC_TOPIC_ADOPTIONS', 'cdc.planting.public.planting_orders'),
// 推荐关系表 (referral-service: referral_relationships)
this.configService.get<string>('CDC_TOPIC_REFERRALS', 'cdc.referral.public.referral_relationships'),
// 配置顺序消费阶段(顺序很重要!)
this.topicPhases = [
{
topic: this.configService.get<string>('CDC_TOPIC_USERS', 'cdc.identity.public.user_accounts'),
tableName: 'user_accounts',
},
{
topic: this.configService.get<string>('CDC_TOPIC_REFERRALS', 'cdc.referral.public.referral_relationships'),
tableName: 'referral_relationships',
},
{
topic: this.configService.get<string>('CDC_TOPIC_ADOPTIONS', 'cdc.planting.public.planting_orders'),
tableName: 'planting_orders',
},
];
await this.consumer.subscribe({
topics,
fromBeginning: true, // 首次启动时全量同步历史数据
});
this.logger.log(`Subscribed to topics: ${topics.join(', ')}`);
await this.consumer.run({
eachMessage: async (payload: EachMessagePayload) => {
await this.handleMessage(payload);
},
});
this.currentPhaseIndex = 0;
this.sequentialMode = true;
this.isRunning = true;
this.logger.log('CDC consumer started with transactional idempotency protection');
// 开始顺序消费
await this.startSequentialConsumption();
this.logger.log('CDC consumer started with sequential phase consumption');
} catch (error) {
this.logger.error('Failed to start CDC consumer', error);
// 不抛出错误,允许服务在没有 Kafka 的情况下启动(用于本地开发)
}
}
/**
*
*/
private async startSequentialConsumption(): Promise<void> {
for (let i = 0; i < this.topicPhases.length; i++) {
this.currentPhaseIndex = i;
const phase = this.topicPhases[i];
this.logger.log(`[CDC] Starting phase ${i + 1}/${this.topicPhases.length}: ${phase.tableName} (${phase.topic})`);
// 消费当前阶段直到追上最新
await this.consumePhaseToEnd(phase);
this.logger.log(`[CDC] Completed phase ${i + 1}/${this.topicPhases.length}: ${phase.tableName}`);
}
this.logger.log('[CDC] All phases completed. Switching to continuous mode...');
// 所有阶段完成后,切换到持续消费模式(同时监听所有 topic
await this.startContinuousMode();
}
/**
*
*/
private async consumePhaseToEnd(phase: TopicPhase): Promise<void> {
// 创建临时 consumer 用于单个 topic
const phaseConsumer = this.kafka.consumer({
groupId: `contribution-service-cdc-phase-${phase.tableName}`,
});
try {
await phaseConsumer.connect();
// 订阅单个 topic
await phaseConsumer.subscribe({
topic: phase.topic,
fromBeginning: true,
});
// 获取 topic 的高水位线(最新 offset
const admin = this.kafka.admin();
await admin.connect();
let highWatermarks: Map<number, string> = new Map();
let processedOffsets: Map<number, bigint> = new Map();
let isComplete = false;
// 获取 topic 的 partition 信息和高水位线
const topicOffsets = await admin.fetchTopicOffsets(phase.topic);
for (const partitionOffset of topicOffsets) {
highWatermarks.set(partitionOffset.partition, partitionOffset.high);
processedOffsets.set(partitionOffset.partition, BigInt(-1));
}
await admin.disconnect();
this.logger.log(`[CDC] Phase ${phase.tableName}: High watermarks = ${JSON.stringify(Object.fromEntries(highWatermarks))}`);
// 检查是否 topic 为空
const allEmpty = Array.from(highWatermarks.values()).every(hw => hw === '0');
if (allEmpty) {
this.logger.log(`[CDC] Phase ${phase.tableName}: Topic is empty, skipping`);
await phaseConsumer.disconnect();
return;
}
// 开始消费
await phaseConsumer.run({
eachMessage: async (payload: EachMessagePayload) => {
await this.handleMessage(payload);
// 更新已处理的 offset
processedOffsets.set(payload.partition, BigInt(payload.message.offset));
// 检查是否所有 partition 都已追上高水位线
let allCaughtUp = true;
for (const [partition, highWatermark] of highWatermarks) {
const processed = processedOffsets.get(partition) ?? BigInt(-1);
// 高水位线是下一个将被写入的 offset所以已处理的 offset 需要 >= highWatermark - 1
if (processed < BigInt(highWatermark) - BigInt(1)) {
allCaughtUp = false;
break;
}
}
if (allCaughtUp && !isComplete) {
isComplete = true;
this.logger.log(`[CDC] Phase ${phase.tableName}: Caught up with all partitions`);
}
},
});
// 等待追上高水位线
while (!isComplete) {
await new Promise(resolve => setTimeout(resolve, 100));
// 超时保护每5秒检查一次进度
const currentProgress = Array.from(processedOffsets.entries())
.map(([p, o]) => `P${p}:${o}/${highWatermarks.get(p)}`)
.join(', ');
this.logger.debug(`[CDC] Phase ${phase.tableName} progress: ${currentProgress}`);
}
// 停止消费
await phaseConsumer.stop();
await phaseConsumer.disconnect();
} catch (error) {
this.logger.error(`[CDC] Error in phase ${phase.tableName}`, error);
await phaseConsumer.disconnect();
throw error;
}
}
/**
* topic
*/
private async startContinuousMode(): Promise<void> {
this.sequentialMode = false;
const topics = this.topicPhases.map(p => p.topic);
await this.consumer.subscribe({
topics,
fromBeginning: false, // 从上次消费的位置继续(不是从头开始)
});
this.logger.log(`[CDC] Continuous mode: Subscribed to topics: ${topics.join(', ')}`);
await this.consumer.run({
eachMessage: async (payload: EachMessagePayload) => {
await this.handleMessage(payload);
},
});
this.logger.log('[CDC] Continuous mode started - all topics being consumed in parallel');
}
/**
*
*/