85 lines
3.6 KiB
TypeScript
85 lines
3.6 KiB
TypeScript
import { Injectable, OnModuleInit, Logger } from '@nestjs/common';
|
||
import { CDCConsumerService, CDCEvent, TransactionClient } from '../../infrastructure/kafka/cdc-consumer.service';
|
||
import { UserSyncedHandler } from './user-synced.handler';
|
||
import { ReferralSyncedHandler } from './referral-synced.handler';
|
||
import { AdoptionSyncedHandler, AdoptionSyncResult } from './adoption-synced.handler';
|
||
|
||
/**
|
||
* CDC 事件分发器
|
||
* 负责将 Debezium CDC 事件路由到对应的处理器
|
||
*
|
||
* 使用事务性幂等模式(Transactional Idempotent Consumer)确保:
|
||
* - 每个 CDC 事件只处理一次(exactly-once 语义)
|
||
* - 幂等记录和业务逻辑在同一事务中执行
|
||
* - 任何失败都会导致整个事务回滚
|
||
*
|
||
* 对于认种事件,使用带后置回调的模式:
|
||
* - 数据同步在事务内完成
|
||
* - 算力计算在事务提交后执行(避免 Serializable 隔离级别下的可见性问题)
|
||
*/
|
||
@Injectable()
|
||
export class CDCEventDispatcher implements OnModuleInit {
|
||
private readonly logger = new Logger(CDCEventDispatcher.name);
|
||
|
||
constructor(
|
||
private readonly cdcConsumer: CDCConsumerService,
|
||
private readonly userHandler: UserSyncedHandler,
|
||
private readonly referralHandler: ReferralSyncedHandler,
|
||
private readonly adoptionHandler: AdoptionSyncedHandler,
|
||
) {}
|
||
|
||
async onModuleInit() {
|
||
// 注册各表的事务性事件处理器
|
||
// 表名需要与 Debezium topic 中的表名一致
|
||
// topic 格式: cdc.<service>.public.<table_name>
|
||
//
|
||
// 从 1.0 系统全量同步三类数据:
|
||
// - 用户数据 (identity-service: user_accounts)
|
||
// - 推荐关系 (referral-service: referral_relationships)
|
||
// - 认种订单 (planting-service: planting_orders)
|
||
|
||
// 用户和推荐关系:简单的事务性处理
|
||
this.cdcConsumer.registerTransactionalHandler('user_accounts', this.handleUserEvent.bind(this));
|
||
this.cdcConsumer.registerTransactionalHandler('referral_relationships', this.handleReferralEvent.bind(this));
|
||
|
||
// 认种订单:使用带后置回调的处理模式
|
||
// - 事务内:同步认种数据到 synced_adoptions 表
|
||
// - 事务后:计算算力(需要读取已提交的数据)
|
||
this.cdcConsumer.registerTransactionalHandlerWithCallback<AdoptionSyncResult | null>(
|
||
'planting_orders',
|
||
this.handleAdoptionEvent.bind(this),
|
||
this.handleAdoptionPostCommit.bind(this),
|
||
);
|
||
|
||
// 启动 CDC 消费者(非阻塞,在后台运行顺序同步)
|
||
// 不能 await,否则会阻塞服务启动,导致 HTTP 端点无法访问
|
||
this.cdcConsumer.start().then(() => {
|
||
this.logger.log('CDC event dispatcher started with transactional idempotency');
|
||
}).catch((error) => {
|
||
this.logger.error('Failed to start CDC event dispatcher', error);
|
||
// 不抛出错误,允许服务在没有 Kafka 的情况下启动(用于本地开发)
|
||
});
|
||
}
|
||
|
||
private async handleUserEvent(event: CDCEvent, tx: TransactionClient): Promise<void> {
|
||
await this.userHandler.handle(event, tx);
|
||
}
|
||
|
||
private async handleReferralEvent(event: CDCEvent, tx: TransactionClient): Promise<void> {
|
||
await this.referralHandler.handle(event, tx);
|
||
}
|
||
|
||
private async handleAdoptionEvent(event: CDCEvent, tx: TransactionClient): Promise<AdoptionSyncResult | null> {
|
||
return await this.adoptionHandler.handle(event, tx);
|
||
}
|
||
|
||
/**
|
||
* 认种事件的后置回调 - 在事务提交后执行算力计算
|
||
*/
|
||
private async handleAdoptionPostCommit(result: AdoptionSyncResult | null): Promise<void> {
|
||
if (result) {
|
||
await this.adoptionHandler.calculateContributionAfterCommit(result);
|
||
}
|
||
}
|
||
}
|