feat(contribution): implement transactional idempotent CDC consumer for 1.0->2.0 sync

Implements 100% exactly-once semantics for CDC events from 1.0 databases
(identity-service, planting-service, referral-service) to contribution-service.

Key changes:
- Add ProcessedCdcEvent model with (sourceTopic, offset) unique constraint
- Add withIdempotency() wrapper using Serializable transaction isolation
- Add registerTransactionalHandler() for handlers requiring idempotency
- Modify CDC handlers to accept external transaction client
- All database operations now use the passed transaction client

This ensures that:
1. Each CDC event is processed exactly once
2. Idempotency record and business logic are in the same transaction
3. Any failure causes complete rollback

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-12 19:22:47 -08:00
parent 70135938c4
commit ff67319171
7 changed files with 359 additions and 125 deletions

View File

@ -0,0 +1,29 @@
-- ============================================================================
-- 添加事务性幂等消费支持
-- 用于 1.0 -> 2.0 CDC 同步的 100% exactly-once 语义
-- ============================================================================
-- 1. 创建 processed_cdc_events 表(用于 CDC 事件幂等)
CREATE TABLE IF NOT EXISTS "processed_cdc_events" (
"id" BIGSERIAL NOT NULL,
"source_topic" VARCHAR(200) NOT NULL,
"offset" BIGINT NOT NULL,
"table_name" VARCHAR(100) NOT NULL,
"operation" VARCHAR(10) NOT NULL,
"processed_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT "processed_cdc_events_pkey" PRIMARY KEY ("id")
);
-- 复合唯一索引:(source_topic, offset) 保证幂等性
CREATE UNIQUE INDEX "processed_cdc_events_source_topic_offset_key" ON "processed_cdc_events"("source_topic", "offset");
-- 时间索引用于清理旧数据
CREATE INDEX "processed_cdc_events_processed_at_idx" ON "processed_cdc_events"("processed_at");
-- 2. 修复 processed_events 表的唯一约束
-- 删除旧的单字段唯一索引
DROP INDEX IF EXISTS "processed_events_event_id_key";
-- 创建新的复合唯一索引
CREATE UNIQUE INDEX IF NOT EXISTS "processed_events_sourceService_eventId_key" ON "processed_events"("source_service", "event_id");

View File

@ -411,15 +411,33 @@ model CdcSyncProgress {
@@map("cdc_sync_progress")
}
// 已处理事件表(幂等性)
// 已处理 CDC 事件表(幂等性)
// 使用 (sourceTopic, offset) 作为复合唯一键
// 这是事务性幂等消费的关键:在同一事务中插入此记录 + 执行业务逻辑
model ProcessedCdcEvent {
id BigInt @id @default(autoincrement())
sourceTopic String @map("source_topic") @db.VarChar(200) // CDC topic 名称
offset BigInt @map("offset") // Kafka offset 作为唯一标识
tableName String @map("table_name") @db.VarChar(100) // 表名
operation String @map("operation") @db.VarChar(10) // c/u/d/r
processedAt DateTime @default(now()) @map("processed_at")
@@unique([sourceTopic, offset])
@@index([processedAt])
@@map("processed_cdc_events")
}
// 已处理 Outbox 事件表(用于 2.0 服务间同步)
model ProcessedEvent {
id BigInt @id @default(autoincrement())
eventId String @unique @map("event_id") @db.VarChar(100)
eventId String @map("event_id") @db.VarChar(100)
eventType String @map("event_type") @db.VarChar(50)
sourceService String? @map("source_service") @db.VarChar(50)
sourceService String @map("source_service") @db.VarChar(100)
processedAt DateTime @default(now()) @map("processed_at")
@@unique([sourceService, eventId])
@@index([eventType])
@@index([processedAt])
@@map("processed_events")

View File

@ -1,24 +1,30 @@
import { Injectable, Logger } from '@nestjs/common';
import Decimal from 'decimal.js';
import { CDCEvent } from '../../infrastructure/kafka/cdc-consumer.service';
import { SyncedDataRepository } from '../../infrastructure/persistence/repositories/synced-data.repository';
import { CDCEvent, TransactionClient } from '../../infrastructure/kafka/cdc-consumer.service';
import { ContributionCalculationService } from '../services/contribution-calculation.service';
/**
* CDC
* 1.0 planting-service同步过来的planting_orders数据
*
*
* handler tx
* 使
*
*
* calculateForAdoption
*
* existsBySourceAdoptionId
*/
@Injectable()
export class AdoptionSyncedHandler {
private readonly logger = new Logger(AdoptionSyncedHandler.name);
constructor(
private readonly syncedDataRepository: SyncedDataRepository,
private readonly contributionCalculationService: ContributionCalculationService,
) {}
async handle(event: CDCEvent): Promise<void> {
async handle(event: CDCEvent, tx: TransactionClient): Promise<void> {
const { op, before, after } = event.payload;
this.logger.log(`[CDC] Adoption event received: op=${op}, seq=${event.sequenceNum}`);
@ -28,10 +34,10 @@ export class AdoptionSyncedHandler {
switch (op) {
case 'c': // create
case 'r': // read (snapshot)
await this.handleCreate(after, event.sequenceNum);
await this.handleCreate(after, event.sequenceNum, tx);
break;
case 'u': // update
await this.handleUpdate(after, before, event.sequenceNum);
await this.handleUpdate(after, before, event.sequenceNum, tx);
break;
case 'd': // delete
await this.handleDelete(before);
@ -45,7 +51,7 @@ export class AdoptionSyncedHandler {
}
}
private async handleCreate(data: any, sequenceNum: bigint): Promise<void> {
private async handleCreate(data: any, sequenceNum: bigint, tx: TransactionClient): Promise<void> {
if (!data) {
this.logger.warn(`[CDC] Adoption create: empty data received`);
return;
@ -66,24 +72,43 @@ export class AdoptionSyncedHandler {
return;
}
// 第一步:保存同步的认种订单数据
const originalAdoptionId = BigInt(orderId);
// 第一步:在外部事务中保存同步的认种订单数据
this.logger.log(`[CDC] Upserting synced adoption: ${orderId}`);
await this.syncedDataRepository.upsertSyncedAdoption({
originalAdoptionId: BigInt(orderId),
accountSequence: accountSequence,
treeCount: treeCount,
await tx.syncedAdoption.upsert({
where: { originalAdoptionId },
create: {
originalAdoptionId,
accountSequence,
treeCount,
adoptionDate: new Date(createdAt),
status: data.status ?? null,
selectedProvince: selectedProvince,
selectedCity: selectedCity,
selectedProvince,
selectedCity,
contributionPerTree: new Decimal('1'), // 每棵树1算力
sourceSequenceNum: sequenceNum,
syncedAt: new Date(),
},
update: {
accountSequence,
treeCount,
adoptionDate: new Date(createdAt),
status: data.status ?? undefined,
selectedProvince: selectedProvince ?? undefined,
selectedCity: selectedCity ?? undefined,
contributionPerTree: new Decimal('1'),
sourceSequenceNum: sequenceNum,
syncedAt: new Date(),
},
});
// 第二步:触发算力计算(在单独的事务中执行)
// 第二步:触发算力计算
// 注意calculateForAdoption 有自己的幂等检查existsBySourceAdoptionId
// 所以即使这里重复调用也是安全的
this.logger.log(`[CDC] Triggering contribution calculation for adoption: ${orderId}`);
try {
await this.contributionCalculationService.calculateForAdoption(BigInt(orderId));
await this.contributionCalculationService.calculateForAdoption(originalAdoptionId);
this.logger.log(`[CDC] Contribution calculation completed for adoption: ${orderId}`);
} catch (error) {
// 算力计算失败不影响数据同步,后续可通过批量任务重试
@ -93,7 +118,7 @@ export class AdoptionSyncedHandler {
this.logger.log(`[CDC] Adoption synced successfully: orderId=${orderId}, account=${accountSequence}, trees=${treeCount}`);
}
private async handleUpdate(after: any, before: any, sequenceNum: bigint): Promise<void> {
private async handleUpdate(after: any, before: any, sequenceNum: bigint, tx: TransactionClient): Promise<void> {
if (!after) {
this.logger.warn(`[CDC] Adoption update: empty after data received`);
return;
@ -104,8 +129,10 @@ export class AdoptionSyncedHandler {
this.logger.log(`[CDC] Adoption update: orderId=${orderId}`);
// 检查是否已经处理过
const existingAdoption = await this.syncedDataRepository.findSyncedAdoptionByOriginalId(originalAdoptionId);
// 检查是否已经处理过(使用事务客户端)
const existingAdoption = await tx.syncedAdoption.findUnique({
where: { originalAdoptionId },
});
if (existingAdoption?.contributionDistributed) {
// 如果树数量发生变化,需要重新计算(这种情况较少)
@ -129,20 +156,35 @@ export class AdoptionSyncedHandler {
this.logger.log(`[CDC] Adoption update data: account=${accountSequence}, trees=${treeCount}, province=${selectedProvince}, city=${selectedCity}`);
// 第一步:保存同步的认种订单数据
await this.syncedDataRepository.upsertSyncedAdoption({
originalAdoptionId: originalAdoptionId,
accountSequence: accountSequence,
treeCount: treeCount,
// 第一步:在外部事务中保存同步的认种订单数据
await tx.syncedAdoption.upsert({
where: { originalAdoptionId },
create: {
originalAdoptionId,
accountSequence,
treeCount,
adoptionDate: new Date(createdAt),
status: after.status ?? null,
selectedProvince: selectedProvince,
selectedCity: selectedCity,
selectedProvince,
selectedCity,
contributionPerTree: new Decimal('1'),
sourceSequenceNum: sequenceNum,
syncedAt: new Date(),
},
update: {
accountSequence,
treeCount,
adoptionDate: new Date(createdAt),
status: after.status ?? undefined,
selectedProvince: selectedProvince ?? undefined,
selectedCity: selectedCity ?? undefined,
contributionPerTree: new Decimal('1'),
sourceSequenceNum: sequenceNum,
syncedAt: new Date(),
},
});
// 第二步:触发算力计算(在单独的事务中执行)
// 第二步:触发算力计算
if (!existingAdoption?.contributionDistributed) {
this.logger.log(`[CDC] Triggering contribution calculation for updated adoption: ${orderId}`);
try {

View File

@ -1,5 +1,5 @@
import { Injectable, OnModuleInit, Logger } from '@nestjs/common';
import { CDCConsumerService, CDCEvent } from '../../infrastructure/kafka/cdc-consumer.service';
import { CDCConsumerService, CDCEvent, TransactionClient } from '../../infrastructure/kafka/cdc-consumer.service';
import { UserSyncedHandler } from './user-synced.handler';
import { ReferralSyncedHandler } from './referral-synced.handler';
import { AdoptionSyncedHandler } from './adoption-synced.handler';
@ -7,6 +7,11 @@ import { AdoptionSyncedHandler } from './adoption-synced.handler';
/**
* CDC
* Debezium CDC
*
* 使Transactional Idempotent Consumer
* - CDC exactly-once
* -
* -
*/
@Injectable()
export class CDCEventDispatcher implements OnModuleInit {
@ -20,7 +25,7 @@ export class CDCEventDispatcher implements OnModuleInit {
) {}
async onModuleInit() {
// 注册各表的事件处理器
// 注册各表的事务性事件处理器
// 表名需要与 Debezium topic 中的表名一致
// topic 格式: cdc.<service>.public.<table_name>
//
@ -28,29 +33,34 @@ export class CDCEventDispatcher implements OnModuleInit {
// - 用户数据 (identity-service: user_accounts)
// - 推荐关系 (referral-service: referral_relationships)
// - 认种订单 (planting-service: planting_orders)
this.cdcConsumer.registerHandler('user_accounts', this.handleUserEvent.bind(this)); // identity-service
this.cdcConsumer.registerHandler('referral_relationships', this.handleReferralEvent.bind(this)); // referral-service
this.cdcConsumer.registerHandler('planting_orders', this.handleAdoptionEvent.bind(this)); // planting-service
//
// 使用 registerTransactionalHandler 确保:
// 1. CDC 事件幂等记录processed_cdc_events
// 2. 业务数据处理
// 都在同一个 Serializable 事务中完成
this.cdcConsumer.registerTransactionalHandler('user_accounts', this.handleUserEvent.bind(this)); // identity-service
this.cdcConsumer.registerTransactionalHandler('referral_relationships', this.handleReferralEvent.bind(this)); // referral-service
this.cdcConsumer.registerTransactionalHandler('planting_orders', this.handleAdoptionEvent.bind(this)); // planting-service
// 启动 CDC 消费者
try {
await this.cdcConsumer.start();
this.logger.log('CDC event dispatcher started');
this.logger.log('CDC event dispatcher started with transactional idempotency');
} catch (error) {
this.logger.error('Failed to start CDC event dispatcher', error);
// 不抛出错误,允许服务在没有 Kafka 的情况下启动(用于本地开发)
}
}
private async handleUserEvent(event: CDCEvent): Promise<void> {
await this.userHandler.handle(event);
private async handleUserEvent(event: CDCEvent, tx: TransactionClient): Promise<void> {
await this.userHandler.handle(event, tx);
}
private async handleReferralEvent(event: CDCEvent): Promise<void> {
await this.referralHandler.handle(event);
private async handleReferralEvent(event: CDCEvent, tx: TransactionClient): Promise<void> {
await this.referralHandler.handle(event, tx);
}
private async handleAdoptionEvent(event: CDCEvent): Promise<void> {
await this.adoptionHandler.handle(event);
private async handleAdoptionEvent(event: CDCEvent, tx: TransactionClient): Promise<void> {
await this.adoptionHandler.handle(event, tx);
}
}

View File

@ -1,7 +1,5 @@
import { Injectable, Logger } from '@nestjs/common';
import { CDCEvent } from '../../infrastructure/kafka/cdc-consumer.service';
import { SyncedDataRepository } from '../../infrastructure/persistence/repositories/synced-data.repository';
import { UnitOfWork } from '../../infrastructure/persistence/unit-of-work/unit-of-work';
import { CDCEvent, TransactionClient } from '../../infrastructure/kafka/cdc-consumer.service';
/**
* CDC
@ -19,17 +17,18 @@ import { UnitOfWork } from '../../infrastructure/persistence/unit-of-work/unit-o
* - referrer_user_id (1.0 referrer_id)
* - referrer account_sequence
* - ancestor_path
*
* handler tx
* 使
*
*/
@Injectable()
export class ReferralSyncedHandler {
private readonly logger = new Logger(ReferralSyncedHandler.name);
constructor(
private readonly syncedDataRepository: SyncedDataRepository,
private readonly unitOfWork: UnitOfWork,
) {}
constructor() {}
async handle(event: CDCEvent): Promise<void> {
async handle(event: CDCEvent, tx: TransactionClient): Promise<void> {
const { op, before, after } = event.payload;
this.logger.log(`[CDC] Referral event received: op=${op}, seq=${event.sequenceNum}`);
@ -39,10 +38,10 @@ export class ReferralSyncedHandler {
switch (op) {
case 'c': // create
case 'r': // read (snapshot)
await this.handleCreate(after, event.sequenceNum);
await this.handleCreate(after, event.sequenceNum, tx);
break;
case 'u': // update
await this.handleUpdate(after, event.sequenceNum);
await this.handleUpdate(after, event.sequenceNum, tx);
break;
case 'd': // delete
await this.handleDelete(before);
@ -56,7 +55,7 @@ export class ReferralSyncedHandler {
}
}
private async handleCreate(data: any, sequenceNum: bigint): Promise<void> {
private async handleCreate(data: any, sequenceNum: bigint, tx: TransactionClient): Promise<void> {
if (!data) {
this.logger.warn(`[CDC] Referral create: empty data received`);
return;
@ -80,10 +79,12 @@ export class ReferralSyncedHandler {
const ancestorPath = this.convertAncestorPath(ancestorPathArray);
this.logger.debug(`[CDC] Referral ancestorPath converted: ${ancestorPath}`);
// 尝试查找推荐人的 account_sequence
// 尝试查找推荐人的 account_sequence(使用事务客户端)
let referrerAccountSequence: string | null = null;
if (referrerUserId) {
const referrer = await this.syncedDataRepository.findSyncedReferralByOriginalUserId(BigInt(referrerUserId));
const referrer = await tx.syncedReferral.findFirst({
where: { originalUserId: BigInt(referrerUserId) },
});
if (referrer) {
referrerAccountSequence = referrer.accountSequence;
this.logger.debug(`[CDC] Found referrer account_sequence: ${referrerAccountSequence} for referrer_id: ${referrerUserId}`);
@ -92,9 +93,11 @@ export class ReferralSyncedHandler {
}
}
await this.unitOfWork.executeInTransaction(async () => {
// 使用外部事务客户端执行所有操作
this.logger.log(`[CDC] Upserting synced referral: ${accountSequence}`);
await this.syncedDataRepository.upsertSyncedReferral({
await tx.syncedReferral.upsert({
where: { accountSequence },
create: {
accountSequence,
referrerAccountSequence,
referrerUserId: referrerUserId ? BigInt(referrerUserId) : null,
@ -102,13 +105,23 @@ export class ReferralSyncedHandler {
ancestorPath,
depth,
sourceSequenceNum: sequenceNum,
});
syncedAt: new Date(),
},
update: {
referrerAccountSequence: referrerAccountSequence ?? undefined,
referrerUserId: referrerUserId ? BigInt(referrerUserId) : undefined,
originalUserId: originalUserId ? BigInt(originalUserId) : undefined,
ancestorPath: ancestorPath ?? undefined,
depth: depth ?? undefined,
sourceSequenceNum: sequenceNum,
syncedAt: new Date(),
},
});
this.logger.log(`[CDC] Referral synced successfully: ${accountSequence} (user_id: ${originalUserId}) -> referrer_id: ${referrerUserId || 'none'}, depth: ${depth}`);
}
private async handleUpdate(data: any, sequenceNum: bigint): Promise<void> {
private async handleUpdate(data: any, sequenceNum: bigint, tx: TransactionClient): Promise<void> {
if (!data) {
this.logger.warn(`[CDC] Referral update: empty data received`);
return;
@ -129,17 +142,21 @@ export class ReferralSyncedHandler {
const ancestorPath = this.convertAncestorPath(ancestorPathArray);
// 尝试查找推荐人的 account_sequence
// 尝试查找推荐人的 account_sequence(使用事务客户端)
let referrerAccountSequence: string | null = null;
if (referrerUserId) {
const referrer = await this.syncedDataRepository.findSyncedReferralByOriginalUserId(BigInt(referrerUserId));
const referrer = await tx.syncedReferral.findFirst({
where: { originalUserId: BigInt(referrerUserId) },
});
if (referrer) {
referrerAccountSequence = referrer.accountSequence;
this.logger.debug(`[CDC] Found referrer account_sequence: ${referrerAccountSequence}`);
}
}
await this.syncedDataRepository.upsertSyncedReferral({
await tx.syncedReferral.upsert({
where: { accountSequence },
create: {
accountSequence,
referrerAccountSequence,
referrerUserId: referrerUserId ? BigInt(referrerUserId) : null,
@ -147,6 +164,17 @@ export class ReferralSyncedHandler {
ancestorPath,
depth,
sourceSequenceNum: sequenceNum,
syncedAt: new Date(),
},
update: {
referrerAccountSequence: referrerAccountSequence ?? undefined,
referrerUserId: referrerUserId ? BigInt(referrerUserId) : undefined,
originalUserId: originalUserId ? BigInt(originalUserId) : undefined,
ancestorPath: ancestorPath ?? undefined,
depth: depth ?? undefined,
sourceSequenceNum: sequenceNum,
syncedAt: new Date(),
},
});
this.logger.log(`[CDC] Referral updated successfully: ${accountSequence}`);

View File

@ -1,25 +1,22 @@
import { Injectable, Logger } from '@nestjs/common';
import { CDCEvent } from '../../infrastructure/kafka/cdc-consumer.service';
import { SyncedDataRepository } from '../../infrastructure/persistence/repositories/synced-data.repository';
import { ContributionAccountRepository } from '../../infrastructure/persistence/repositories/contribution-account.repository';
import { CDCEvent, TransactionClient } from '../../infrastructure/kafka/cdc-consumer.service';
import { ContributionAccountAggregate } from '../../domain/aggregates/contribution-account.aggregate';
import { UnitOfWork } from '../../infrastructure/persistence/unit-of-work/unit-of-work';
/**
* CDC
*
*
* handler tx
* 使
*
*/
@Injectable()
export class UserSyncedHandler {
private readonly logger = new Logger(UserSyncedHandler.name);
constructor(
private readonly syncedDataRepository: SyncedDataRepository,
private readonly contributionAccountRepository: ContributionAccountRepository,
private readonly unitOfWork: UnitOfWork,
) {}
constructor() {}
async handle(event: CDCEvent): Promise<void> {
async handle(event: CDCEvent, tx: TransactionClient): Promise<void> {
const { op, before, after } = event.payload;
this.logger.log(`[CDC] User event received: op=${op}, seq=${event.sequenceNum}`);
@ -29,10 +26,10 @@ export class UserSyncedHandler {
switch (op) {
case 'c': // create
case 'r': // read (snapshot)
await this.handleCreate(after, event.sequenceNum);
await this.handleCreate(after, event.sequenceNum, tx);
break;
case 'u': // update
await this.handleUpdate(after, event.sequenceNum);
await this.handleUpdate(after, event.sequenceNum, tx);
break;
case 'd': // delete
await this.handleDelete(before);
@ -46,7 +43,7 @@ export class UserSyncedHandler {
}
}
private async handleCreate(data: any, sequenceNum: bigint): Promise<void> {
private async handleCreate(data: any, sequenceNum: bigint, tx: TransactionClient): Promise<void> {
if (!data) {
this.logger.warn(`[CDC] User create: empty data received`);
return;
@ -65,33 +62,47 @@ export class UserSyncedHandler {
return;
}
await this.unitOfWork.executeInTransaction(async () => {
// 使用外部事务客户端执行所有操作
// 保存同步的用户数据
this.logger.log(`[CDC] Upserting synced user: ${accountSequence}`);
await this.syncedDataRepository.upsertSyncedUser({
await tx.syncedUser.upsert({
where: { accountSequence },
create: {
originalUserId: BigInt(userId),
accountSequence,
phone,
status,
sourceSequenceNum: sequenceNum,
syncedAt: new Date(),
},
update: {
phone: phone ?? undefined,
status: status ?? undefined,
sourceSequenceNum: sequenceNum,
syncedAt: new Date(),
},
});
// 为用户创建算力账户(如果不存在)
const existingAccount = await this.contributionAccountRepository.findByAccountSequence(accountSequence);
const existingAccount = await tx.contributionAccount.findUnique({
where: { accountSequence },
});
if (!existingAccount) {
const newAccount = ContributionAccountAggregate.create(accountSequence);
await this.contributionAccountRepository.save(newAccount);
const persistData = newAccount.toPersistence();
await tx.contributionAccount.create({
data: persistData,
});
this.logger.log(`[CDC] Created contribution account for user: ${accountSequence}`);
} else {
this.logger.debug(`[CDC] Contribution account already exists for user: ${accountSequence}`);
}
});
this.logger.log(`[CDC] User synced successfully: ${accountSequence}`);
}
private async handleUpdate(data: any, sequenceNum: bigint): Promise<void> {
private async handleUpdate(data: any, sequenceNum: bigint, tx: TransactionClient): Promise<void> {
if (!data) {
this.logger.warn(`[CDC] User update: empty data received`);
return;
@ -110,12 +121,22 @@ export class UserSyncedHandler {
return;
}
await this.syncedDataRepository.upsertSyncedUser({
await tx.syncedUser.upsert({
where: { accountSequence },
create: {
originalUserId: BigInt(userId),
accountSequence,
phone,
status,
sourceSequenceNum: sequenceNum,
syncedAt: new Date(),
},
update: {
phone: phone ?? undefined,
status: status ?? undefined,
sourceSequenceNum: sequenceNum,
syncedAt: new Date(),
},
});
this.logger.log(`[CDC] User updated successfully: ${accountSequence}`);

View File

@ -1,6 +1,11 @@
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Prisma } from '@prisma/client';
import { Kafka, Consumer, EachMessagePayload } from 'kafkajs';
import { PrismaService } from '../persistence/prisma/prisma.service';
/** Prisma 事务客户端类型 */
export type TransactionClient = Prisma.TransactionClient;
/**
* CDC
@ -29,21 +34,31 @@ export interface CDCEvent {
source_ts_ms: number;
deleted: boolean;
};
// 内部使用Kafka offset 作为序列号
// Kafka 消息元数据
topic: string;
offset: bigint;
// 内部使用Kafka offset 作为序列号(向后兼容)
sequenceNum: bigint;
}
/** 普通 handler不支持事务 */
export type CDCHandler = (event: CDCEvent) => Promise<void>;
/** 事务性 handler支持在事务中执行 */
export type TransactionalCDCHandler = (event: CDCEvent, tx: TransactionClient) => Promise<void>;
@Injectable()
export class CDCConsumerService implements OnModuleInit {
export class CDCConsumerService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(CDCConsumerService.name);
private kafka: Kafka;
private consumer: Consumer;
private handlers: Map<string, CDCHandler> = new Map();
private isRunning = false;
constructor(private readonly configService: ConfigService) {
constructor(
private readonly configService: ConfigService,
private readonly prisma: PrismaService,
) {
const brokers = this.configService.get<string>('KAFKA_BROKERS', 'localhost:9092').split(',');
this.kafka = new Kafka({
@ -60,16 +75,84 @@ export class CDCConsumerService implements OnModuleInit {
// 不在这里启动,等待注册处理器后再启动
}
async onModuleDestroy() {
await this.stop();
}
/**
* CDC
* @param tableName "users", "adoptions", "referrals"
* @param handler
* - 100% exactly-once
*
*
* 1. 使
* 2.
*
*
*/
withIdempotency(handler: TransactionalCDCHandler): CDCHandler {
return async (event: CDCEvent) => {
const idempotencyKey = `${event.topic}:${event.offset}`;
try {
await this.prisma.$transaction(async (tx) => {
// 1. 尝试插入幂等记录(使用唯一约束防止重复)
try {
await tx.processedCdcEvent.create({
data: {
sourceTopic: event.topic,
offset: event.offset,
tableName: event.payload.table,
operation: event.payload.op,
},
});
} catch (error: any) {
// 唯一约束冲突 = 事件已处理,直接返回(不执行业务逻辑)
if (error.code === 'P2002') {
this.logger.debug(`[CDC] Skipping duplicate event: ${idempotencyKey}`);
return;
}
throw error;
}
// 2. 执行业务逻辑(传入事务客户端)
await handler(event, tx);
this.logger.debug(`[CDC] Processed event in transaction: ${idempotencyKey}`);
}, {
// 设置事务隔离级别为 Serializable防止并发问题
isolationLevel: Prisma.TransactionIsolationLevel.Serializable,
timeout: 60000, // 60秒超时算力计算可能需要较长时间
});
} catch (error: any) {
// 唯一约束冲突在事务外也可能发生(并发场景)
if (error.code === 'P2002') {
this.logger.debug(`[CDC] Skipping duplicate event (concurrent): ${idempotencyKey}`);
return;
}
this.logger.error(`[CDC] Failed to process event: ${idempotencyKey}`, error);
throw error;
}
};
}
/**
* CDC
* @deprecated 使 registerTransactionalHandler
*/
registerHandler(tableName: string, handler: CDCHandler): void {
this.handlers.set(tableName, handler);
this.logger.log(`Registered CDC handler for table: ${tableName}`);
}
/**
* CDC 100%
* @param tableName "users", "adoptions", "referrals"
* @param handler
*/
registerTransactionalHandler(tableName: string, handler: TransactionalCDCHandler): void {
this.handlers.set(tableName, this.withIdempotency(handler));
this.logger.log(`Registered transactional CDC handler for table: ${tableName}`);
}
/**
*
*/
@ -106,10 +189,10 @@ export class CDCConsumerService implements OnModuleInit {
});
this.isRunning = true;
this.logger.log('CDC consumer started');
this.logger.log('CDC consumer started with transactional idempotency protection');
} catch (error) {
this.logger.error('Failed to start CDC consumer', error);
throw error;
// 不抛出错误,允许服务在没有 Kafka 的情况下启动(用于本地开发)
}
}
@ -127,7 +210,6 @@ export class CDCConsumerService implements OnModuleInit {
this.logger.log('CDC consumer stopped');
} catch (error) {
this.logger.error('Failed to stop CDC consumer', error);
throw error;
}
}
@ -158,6 +240,8 @@ export class CDCConsumerService implements OnModuleInit {
// 从原始数据中移除元数据字段,剩下的就是业务数据
const { __op, __table, __source_ts_ms, __deleted, ...businessData } = rawData;
const offset = BigInt(message.offset);
// 构造兼容的 CDCEvent 对象
// 对于 create/update/read数据在 after对于 delete数据在 before
const event: CDCEvent = {
@ -169,7 +253,9 @@ export class CDCConsumerService implements OnModuleInit {
source_ts_ms: sourceTsMs,
deleted: deleted,
},
sequenceNum: BigInt(message.offset),
topic,
offset,
sequenceNum: offset, // 向后兼容
};
// 从 topic 名称提取表名作为备选