fix(cdc): implement idempotent consumer pattern for reliable CDC sync

- Use (sourceTopic, eventId) as composite unique key in processed_events
- Add sourceTopic to ServiceEvent for globally unique idempotency key
- Wrap all handlers with withIdempotency() for duplicate event detection
- Fix ID collision issue between different service outbox tables

This implements the industry-standard CDC exactly-once semantics pattern.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-12 13:31:10 -08:00
parent 82a3c7a2c3
commit 577f626972
4 changed files with 124 additions and 52 deletions

View File

@ -0,0 +1,17 @@
-- ============================================================================
-- 修复 processed_events 表的幂等键
-- 问题: 原来使用 eventId 作为唯一键,但不同服务的 outbox ID 可能相同
-- 解决: 使用 (sourceService, eventId) 作为复合唯一键
-- ============================================================================
-- 先清空已有数据(因为之前的数据可能有冲突)
TRUNCATE TABLE "processed_events";
-- 删除旧的唯一索引
DROP INDEX IF EXISTS "processed_events_eventId_key";
-- 删除旧的 sourceService 索引
DROP INDEX IF EXISTS "processed_events_sourceService_idx";
-- 创建新的复合唯一索引
CREATE UNIQUE INDEX "processed_events_sourceService_eventId_key" ON "processed_events"("sourceService", "eventId");

View File

@ -457,12 +457,12 @@ model CdcSyncProgress {
model ProcessedEvent {
id String @id @default(uuid())
eventId String @unique
eventId String
eventType String
sourceService String
processedAt DateTime @default(now())
@@index([sourceService])
@@unique([sourceService, eventId])
@@index([processedAt])
@@map("processed_events")
}

View File

@ -42,6 +42,8 @@ export interface ServiceEvent {
payload: any;
createdAt: string;
sequenceNum: bigint;
/** 来源 topic用于构建全局唯一的幂等键 (topic + id) */
sourceTopic: string;
}
export type CdcHandler = (event: CdcEvent) => Promise<void>;
@ -287,6 +289,7 @@ export class CdcConsumerService implements OnModuleInit, OnModuleDestroy {
const event: ServiceEvent = {
...normalizedEvent,
sequenceNum,
sourceTopic: topic,
};
const handler = this.serviceHandlers.get(event.eventType);
@ -311,7 +314,7 @@ export class CdcConsumerService implements OnModuleInit, OnModuleDestroy {
*
* Debezium outbox 线
*/
private normalizeServiceEvent(data: any): Omit<ServiceEvent, 'sequenceNum'> {
private normalizeServiceEvent(data: any): Omit<ServiceEvent, 'sequenceNum' | 'sourceTopic'> {
// 如果已经是驼峰格式,直接返回
if (data.eventType && data.aggregateType) {
return data;

View File

@ -5,12 +5,18 @@ import {
CdcConsumerService,
CdcEvent,
ServiceEvent,
ServiceEventHandler,
} from './cdc-consumer.service';
import { WalletSyncHandlers } from './wallet-sync.handlers';
/**
* CDC
* 2.0 mining-admin-service
*
* CDC
* 1. 使 (sourceTopic, eventId)
* 2.
* 3.
*/
@Injectable()
export class CdcSyncService implements OnModuleInit {
@ -28,6 +34,25 @@ export class CdcSyncService implements OnModuleInit {
await this.cdcConsumer.start();
}
/**
* handler
* CDC exactly-once
*/
private withIdempotency(handler: ServiceEventHandler): ServiceEventHandler {
return async (event: ServiceEvent) => {
// 1. 检查是否已处理
if (await this.isEventProcessed(event)) {
this.logger.debug(`Skipping duplicate event: ${event.sourceTopic}:${event.id} (${event.eventType})`);
return;
}
// 2. 执行实际处理逻辑
await handler(event);
// 3. 记录已处理(在 handler 内部完成,这里不再重复)
};
}
private async registerHandlers(): Promise<void> {
// ===========================================================================
// 从 auth-service 同步用户数据 (通过 Debezium CDC 监听 outbox_events 表)
@ -39,30 +64,30 @@ export class CdcSyncService implements OnModuleInit {
this.cdcConsumer.addTopic(usersTopic);
this.cdcConsumer.registerServiceHandler(
'UserCreated',
this.handleUserCreated.bind(this),
this.withIdempotency(this.handleUserCreated.bind(this)),
);
// auth-service 发布的 user.registered 事件
this.cdcConsumer.registerServiceHandler(
'user.registered',
this.handleUserRegistered.bind(this),
this.withIdempotency(this.handleUserRegistered.bind(this)),
);
this.cdcConsumer.registerServiceHandler(
'UserUpdated',
this.handleUserUpdated.bind(this),
this.withIdempotency(this.handleUserUpdated.bind(this)),
);
this.cdcConsumer.registerServiceHandler(
'KycStatusChanged',
this.handleKycStatusChanged.bind(this),
this.withIdempotency(this.handleKycStatusChanged.bind(this)),
);
// auth-service 发布的 user.kyc_verified 事件
this.cdcConsumer.registerServiceHandler(
'user.kyc_verified',
this.handleKycStatusChanged.bind(this),
this.withIdempotency(this.handleKycStatusChanged.bind(this)),
);
// auth-service 发布的 user.legacy.migrated 事件 (1.0用户首次登录2.0时)
this.cdcConsumer.registerServiceHandler(
'user.legacy.migrated',
this.handleLegacyUserMigrated.bind(this),
this.withIdempotency(this.handleLegacyUserMigrated.bind(this)),
);
// ===========================================================================
@ -75,41 +100,41 @@ export class CdcSyncService implements OnModuleInit {
this.cdcConsumer.addTopic(contributionTopic);
this.cdcConsumer.registerServiceHandler(
'ContributionAccountUpdated',
this.handleContributionAccountUpdated.bind(this),
this.withIdempotency(this.handleContributionAccountUpdated.bind(this)),
);
// ContributionAccountSynced 用于初始全量同步
this.cdcConsumer.registerServiceHandler(
'ContributionAccountSynced',
this.handleContributionAccountUpdated.bind(this),
this.withIdempotency(this.handleContributionAccountUpdated.bind(this)),
);
// ContributionCalculated 事件在算力计算完成时发布
this.cdcConsumer.registerServiceHandler(
'ContributionCalculated',
this.handleContributionCalculated.bind(this),
this.withIdempotency(this.handleContributionCalculated.bind(this)),
);
this.cdcConsumer.registerServiceHandler(
'SystemContributionUpdated',
this.handleSystemContributionUpdated.bind(this),
this.withIdempotency(this.handleSystemContributionUpdated.bind(this)),
);
// ReferralSynced 事件 - 同步推荐关系
this.cdcConsumer.registerServiceHandler(
'ReferralSynced',
this.handleReferralSynced.bind(this),
this.withIdempotency(this.handleReferralSynced.bind(this)),
);
// AdoptionSynced 事件 - 同步认种记录
this.cdcConsumer.registerServiceHandler(
'AdoptionSynced',
this.handleAdoptionSynced.bind(this),
this.withIdempotency(this.handleAdoptionSynced.bind(this)),
);
// ContributionRecordSynced 事件 - 同步算力明细记录
this.cdcConsumer.registerServiceHandler(
'ContributionRecordSynced',
this.handleContributionRecordSynced.bind(this),
this.withIdempotency(this.handleContributionRecordSynced.bind(this)),
);
// NetworkProgressUpdated 事件 - 同步全网算力进度
this.cdcConsumer.registerServiceHandler(
'NetworkProgressUpdated',
this.handleNetworkProgressUpdated.bind(this),
this.withIdempotency(this.handleNetworkProgressUpdated.bind(this)),
);
// ===========================================================================
@ -122,15 +147,15 @@ export class CdcSyncService implements OnModuleInit {
this.cdcConsumer.addTopic(miningTopic);
this.cdcConsumer.registerServiceHandler(
'MiningAccountUpdated',
this.handleMiningAccountUpdated.bind(this),
this.withIdempotency(this.handleMiningAccountUpdated.bind(this)),
);
this.cdcConsumer.registerServiceHandler(
'MiningConfigUpdated',
this.handleMiningConfigUpdated.bind(this),
this.withIdempotency(this.handleMiningConfigUpdated.bind(this)),
);
this.cdcConsumer.registerServiceHandler(
'DailyMiningStatCreated',
this.handleDailyMiningStatCreated.bind(this),
this.withIdempotency(this.handleDailyMiningStatCreated.bind(this)),
);
// ===========================================================================
@ -143,15 +168,15 @@ export class CdcSyncService implements OnModuleInit {
this.cdcConsumer.addTopic(tradingTopic);
this.cdcConsumer.registerServiceHandler(
'TradingAccountUpdated',
this.handleTradingAccountUpdated.bind(this),
this.withIdempotency(this.handleTradingAccountUpdated.bind(this)),
);
this.cdcConsumer.registerServiceHandler(
'DayKLineCreated',
this.handleDayKLineCreated.bind(this),
this.withIdempotency(this.handleDayKLineCreated.bind(this)),
);
this.cdcConsumer.registerServiceHandler(
'CirculationPoolUpdated',
this.handleCirculationPoolUpdated.bind(this),
this.withIdempotency(this.handleCirculationPoolUpdated.bind(this)),
);
@ -167,126 +192,126 @@ export class CdcSyncService implements OnModuleInit {
// 区域数据
this.cdcConsumer.registerServiceHandler(
'ProvinceCreated',
this.walletHandlers.handleProvinceCreated.bind(this.walletHandlers),
this.withIdempotency(this.walletHandlers.handleProvinceCreated.bind(this.walletHandlers)),
);
this.cdcConsumer.registerServiceHandler(
'ProvinceUpdated',
this.walletHandlers.handleProvinceUpdated.bind(this.walletHandlers),
this.withIdempotency(this.walletHandlers.handleProvinceUpdated.bind(this.walletHandlers)),
);
this.cdcConsumer.registerServiceHandler(
'CityCreated',
this.walletHandlers.handleCityCreated.bind(this.walletHandlers),
this.withIdempotency(this.walletHandlers.handleCityCreated.bind(this.walletHandlers)),
);
this.cdcConsumer.registerServiceHandler(
'CityUpdated',
this.walletHandlers.handleCityUpdated.bind(this.walletHandlers),
this.withIdempotency(this.walletHandlers.handleCityUpdated.bind(this.walletHandlers)),
);
this.cdcConsumer.registerServiceHandler(
'UserRegionMappingCreated',
this.walletHandlers.handleUserRegionMappingCreated.bind(this.walletHandlers),
this.withIdempotency(this.walletHandlers.handleUserRegionMappingCreated.bind(this.walletHandlers)),
);
this.cdcConsumer.registerServiceHandler(
'UserRegionMappingUpdated',
this.walletHandlers.handleUserRegionMappingUpdated.bind(this.walletHandlers),
this.withIdempotency(this.walletHandlers.handleUserRegionMappingUpdated.bind(this.walletHandlers)),
);
// 系统账户
this.cdcConsumer.registerServiceHandler(
'WalletSystemAccountCreated',
this.walletHandlers.handleWalletSystemAccountCreated.bind(this.walletHandlers),
this.withIdempotency(this.walletHandlers.handleWalletSystemAccountCreated.bind(this.walletHandlers)),
);
this.cdcConsumer.registerServiceHandler(
'WalletSystemAccountUpdated',
this.walletHandlers.handleWalletSystemAccountUpdated.bind(this.walletHandlers),
this.withIdempotency(this.walletHandlers.handleWalletSystemAccountUpdated.bind(this.walletHandlers)),
);
// 池账户
this.cdcConsumer.registerServiceHandler(
'WalletPoolAccountCreated',
this.walletHandlers.handleWalletPoolAccountCreated.bind(this.walletHandlers),
this.withIdempotency(this.walletHandlers.handleWalletPoolAccountCreated.bind(this.walletHandlers)),
);
this.cdcConsumer.registerServiceHandler(
'WalletPoolAccountUpdated',
this.walletHandlers.handleWalletPoolAccountUpdated.bind(this.walletHandlers),
this.withIdempotency(this.walletHandlers.handleWalletPoolAccountUpdated.bind(this.walletHandlers)),
);
// 用户钱包
this.cdcConsumer.registerServiceHandler(
'UserWalletCreated',
this.walletHandlers.handleUserWalletCreated.bind(this.walletHandlers),
this.withIdempotency(this.walletHandlers.handleUserWalletCreated.bind(this.walletHandlers)),
);
this.cdcConsumer.registerServiceHandler(
'UserWalletUpdated',
this.walletHandlers.handleUserWalletUpdated.bind(this.walletHandlers),
this.withIdempotency(this.walletHandlers.handleUserWalletUpdated.bind(this.walletHandlers)),
);
// 提现请求
this.cdcConsumer.registerServiceHandler(
'WithdrawRequestCreated',
this.walletHandlers.handleWithdrawRequestCreated.bind(this.walletHandlers),
this.withIdempotency(this.walletHandlers.handleWithdrawRequestCreated.bind(this.walletHandlers)),
);
this.cdcConsumer.registerServiceHandler(
'WithdrawRequestUpdated',
this.walletHandlers.handleWithdrawRequestUpdated.bind(this.walletHandlers),
this.withIdempotency(this.walletHandlers.handleWithdrawRequestUpdated.bind(this.walletHandlers)),
);
// 充值记录
this.cdcConsumer.registerServiceHandler(
'DepositRecordCreated',
this.walletHandlers.handleDepositRecordCreated.bind(this.walletHandlers),
this.withIdempotency(this.walletHandlers.handleDepositRecordCreated.bind(this.walletHandlers)),
);
this.cdcConsumer.registerServiceHandler(
'DepositRecordUpdated',
this.walletHandlers.handleDepositRecordUpdated.bind(this.walletHandlers),
this.withIdempotency(this.walletHandlers.handleDepositRecordUpdated.bind(this.walletHandlers)),
);
// DEX Swap
this.cdcConsumer.registerServiceHandler(
'DexSwapRecordCreated',
this.walletHandlers.handleDexSwapRecordCreated.bind(this.walletHandlers),
this.withIdempotency(this.walletHandlers.handleDexSwapRecordCreated.bind(this.walletHandlers)),
);
this.cdcConsumer.registerServiceHandler(
'DexSwapRecordUpdated',
this.walletHandlers.handleDexSwapRecordUpdated.bind(this.walletHandlers),
this.withIdempotency(this.walletHandlers.handleDexSwapRecordUpdated.bind(this.walletHandlers)),
);
// 地址绑定
this.cdcConsumer.registerServiceHandler(
'BlockchainAddressBindingCreated',
this.walletHandlers.handleBlockchainAddressBindingCreated.bind(this.walletHandlers),
this.withIdempotency(this.walletHandlers.handleBlockchainAddressBindingCreated.bind(this.walletHandlers)),
);
this.cdcConsumer.registerServiceHandler(
'BlockchainAddressBindingUpdated',
this.walletHandlers.handleBlockchainAddressBindingUpdated.bind(this.walletHandlers),
this.withIdempotency(this.walletHandlers.handleBlockchainAddressBindingUpdated.bind(this.walletHandlers)),
);
// 黑洞合约
this.cdcConsumer.registerServiceHandler(
'BlackHoleContractCreated',
this.walletHandlers.handleBlackHoleContractCreated.bind(this.walletHandlers),
this.withIdempotency(this.walletHandlers.handleBlackHoleContractCreated.bind(this.walletHandlers)),
);
this.cdcConsumer.registerServiceHandler(
'BlackHoleContractUpdated',
this.walletHandlers.handleBlackHoleContractUpdated.bind(this.walletHandlers),
this.withIdempotency(this.walletHandlers.handleBlackHoleContractUpdated.bind(this.walletHandlers)),
);
// 销毁记录
this.cdcConsumer.registerServiceHandler(
'BurnToBlackHoleRecordCreated',
this.walletHandlers.handleBurnToBlackHoleRecordCreated.bind(this.walletHandlers),
this.withIdempotency(this.walletHandlers.handleBurnToBlackHoleRecordCreated.bind(this.walletHandlers)),
);
// 费率配置
this.cdcConsumer.registerServiceHandler(
'FeeConfigCreated',
this.walletHandlers.handleFeeConfigCreated.bind(this.walletHandlers),
this.withIdempotency(this.walletHandlers.handleFeeConfigCreated.bind(this.walletHandlers)),
);
this.cdcConsumer.registerServiceHandler(
'FeeConfigUpdated',
this.walletHandlers.handleFeeConfigUpdated.bind(this.walletHandlers),
this.withIdempotency(this.walletHandlers.handleFeeConfigUpdated.bind(this.walletHandlers)),
);
this.logger.log('CDC sync handlers registered');
this.logger.log('CDC sync handlers registered with idempotency protection');
}
// ===========================================================================
@ -925,20 +950,47 @@ export class CdcSyncService implements OnModuleInit {
// 辅助方法
// ===========================================================================
/**
*
* 使 sourceTopic + eventId
* CDC
*/
private async isEventProcessed(event: ServiceEvent): Promise<boolean> {
try {
const existing = await this.prisma.processedEvent.findUnique({
where: {
sourceService_eventId: {
sourceService: event.sourceTopic,
eventId: event.id,
},
},
});
return !!existing;
} catch (error) {
this.logger.warn(`Failed to check processed event: ${event.sourceTopic}:${event.id}`);
return false;
}
}
private async recordProcessedEvent(event: ServiceEvent): Promise<void> {
try {
await this.prisma.processedEvent.upsert({
where: { eventId: event.id },
where: {
sourceService_eventId: {
sourceService: event.sourceTopic,
eventId: event.id,
},
},
create: {
eventId: event.id,
eventType: event.eventType,
sourceService: event.aggregateType,
sourceService: event.sourceTopic,
},
update: {},
});
} catch (error) {
// 忽略幂等性记录失败
this.logger.warn(`Failed to record processed event: ${event.id}`);
this.logger.warn(`Failed to record processed event: ${event.sourceTopic}:${event.id}`);
}
}
}