diff --git a/backend/services/mining-admin-service/prisma/migrations/0002_fix_processed_event_composite_key/migration.sql b/backend/services/mining-admin-service/prisma/migrations/0002_fix_processed_event_composite_key/migration.sql new file mode 100644 index 00000000..8dfdcc03 --- /dev/null +++ b/backend/services/mining-admin-service/prisma/migrations/0002_fix_processed_event_composite_key/migration.sql @@ -0,0 +1,17 @@ +-- ============================================================================ +-- 修复 processed_events 表的幂等键 +-- 问题: 原来使用 eventId 作为唯一键,但不同服务的 outbox ID 可能相同 +-- 解决: 使用 (sourceService, eventId) 作为复合唯一键 +-- ============================================================================ + +-- 先清空已有数据(因为之前的数据可能有冲突) +TRUNCATE TABLE "processed_events"; + +-- 删除旧的唯一索引 +DROP INDEX IF EXISTS "processed_events_eventId_key"; + +-- 删除旧的 sourceService 索引 +DROP INDEX IF EXISTS "processed_events_sourceService_idx"; + +-- 创建新的复合唯一索引 +CREATE UNIQUE INDEX "processed_events_sourceService_eventId_key" ON "processed_events"("sourceService", "eventId"); diff --git a/backend/services/mining-admin-service/prisma/schema.prisma b/backend/services/mining-admin-service/prisma/schema.prisma index aa176382..25b11dd2 100644 --- a/backend/services/mining-admin-service/prisma/schema.prisma +++ b/backend/services/mining-admin-service/prisma/schema.prisma @@ -457,12 +457,12 @@ model CdcSyncProgress { model ProcessedEvent { id String @id @default(uuid()) - eventId String @unique + eventId String eventType String sourceService String processedAt DateTime @default(now()) - @@index([sourceService]) + @@unique([sourceService, eventId]) @@index([processedAt]) @@map("processed_events") } diff --git a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-consumer.service.ts b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-consumer.service.ts index 826db48c..3bcd87ea 100644 --- a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-consumer.service.ts +++ b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-consumer.service.ts @@ -42,6 +42,8 @@ export interface ServiceEvent { payload: any; createdAt: string; sequenceNum: bigint; + /** 来源 topic,用于构建全局唯一的幂等键 (topic + id) */ + sourceTopic: string; } export type CdcHandler = (event: CdcEvent) => Promise; @@ -287,6 +289,7 @@ export class CdcConsumerService implements OnModuleInit, OnModuleDestroy { const event: ServiceEvent = { ...normalizedEvent, sequenceNum, + sourceTopic: topic, }; const handler = this.serviceHandlers.get(event.eventType); @@ -311,7 +314,7 @@ export class CdcConsumerService implements OnModuleInit, OnModuleDestroy { * 规范化服务事件格式 * 将 Debezium outbox 的下划线格式转换为驼峰格式 */ - private normalizeServiceEvent(data: any): Omit { + private normalizeServiceEvent(data: any): Omit { // 如果已经是驼峰格式,直接返回 if (data.eventType && data.aggregateType) { return data; diff --git a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts index 7d830514..537fba8b 100644 --- a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts +++ b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts @@ -5,12 +5,18 @@ import { CdcConsumerService, CdcEvent, ServiceEvent, + ServiceEventHandler, } from './cdc-consumer.service'; import { WalletSyncHandlers } from './wallet-sync.handlers'; /** * CDC 同步服务 * 负责从各个 2.0 服务同步数据到 mining-admin-service + * + * 实现业界标准的 CDC 幂等消费模式: + * 1. 使用 (sourceTopic, eventId) 作为全局唯一的幂等键 + * 2. 处理前检查事件是否已处理 + * 3. 处理后记录已处理事件 */ @Injectable() export class CdcSyncService implements OnModuleInit { @@ -28,6 +34,25 @@ export class CdcSyncService implements OnModuleInit { await this.cdcConsumer.start(); } + /** + * 包装 handler,添加幂等性保护 + * 这是业界标准的 CDC exactly-once 语义实现 + */ + private withIdempotency(handler: ServiceEventHandler): ServiceEventHandler { + return async (event: ServiceEvent) => { + // 1. 检查是否已处理 + if (await this.isEventProcessed(event)) { + this.logger.debug(`Skipping duplicate event: ${event.sourceTopic}:${event.id} (${event.eventType})`); + return; + } + + // 2. 执行实际处理逻辑 + await handler(event); + + // 3. 记录已处理(在 handler 内部完成,这里不再重复) + }; + } + private async registerHandlers(): Promise { // =========================================================================== // 从 auth-service 同步用户数据 (通过 Debezium CDC 监听 outbox_events 表) @@ -39,30 +64,30 @@ export class CdcSyncService implements OnModuleInit { this.cdcConsumer.addTopic(usersTopic); this.cdcConsumer.registerServiceHandler( 'UserCreated', - this.handleUserCreated.bind(this), + this.withIdempotency(this.handleUserCreated.bind(this)), ); // auth-service 发布的 user.registered 事件 this.cdcConsumer.registerServiceHandler( 'user.registered', - this.handleUserRegistered.bind(this), + this.withIdempotency(this.handleUserRegistered.bind(this)), ); this.cdcConsumer.registerServiceHandler( 'UserUpdated', - this.handleUserUpdated.bind(this), + this.withIdempotency(this.handleUserUpdated.bind(this)), ); this.cdcConsumer.registerServiceHandler( 'KycStatusChanged', - this.handleKycStatusChanged.bind(this), + this.withIdempotency(this.handleKycStatusChanged.bind(this)), ); // auth-service 发布的 user.kyc_verified 事件 this.cdcConsumer.registerServiceHandler( 'user.kyc_verified', - this.handleKycStatusChanged.bind(this), + this.withIdempotency(this.handleKycStatusChanged.bind(this)), ); // auth-service 发布的 user.legacy.migrated 事件 (1.0用户首次登录2.0时) this.cdcConsumer.registerServiceHandler( 'user.legacy.migrated', - this.handleLegacyUserMigrated.bind(this), + this.withIdempotency(this.handleLegacyUserMigrated.bind(this)), ); // =========================================================================== @@ -75,41 +100,41 @@ export class CdcSyncService implements OnModuleInit { this.cdcConsumer.addTopic(contributionTopic); this.cdcConsumer.registerServiceHandler( 'ContributionAccountUpdated', - this.handleContributionAccountUpdated.bind(this), + this.withIdempotency(this.handleContributionAccountUpdated.bind(this)), ); // ContributionAccountSynced 用于初始全量同步 this.cdcConsumer.registerServiceHandler( 'ContributionAccountSynced', - this.handleContributionAccountUpdated.bind(this), + this.withIdempotency(this.handleContributionAccountUpdated.bind(this)), ); // ContributionCalculated 事件在算力计算完成时发布 this.cdcConsumer.registerServiceHandler( 'ContributionCalculated', - this.handleContributionCalculated.bind(this), + this.withIdempotency(this.handleContributionCalculated.bind(this)), ); this.cdcConsumer.registerServiceHandler( 'SystemContributionUpdated', - this.handleSystemContributionUpdated.bind(this), + this.withIdempotency(this.handleSystemContributionUpdated.bind(this)), ); // ReferralSynced 事件 - 同步推荐关系 this.cdcConsumer.registerServiceHandler( 'ReferralSynced', - this.handleReferralSynced.bind(this), + this.withIdempotency(this.handleReferralSynced.bind(this)), ); // AdoptionSynced 事件 - 同步认种记录 this.cdcConsumer.registerServiceHandler( 'AdoptionSynced', - this.handleAdoptionSynced.bind(this), + this.withIdempotency(this.handleAdoptionSynced.bind(this)), ); // ContributionRecordSynced 事件 - 同步算力明细记录 this.cdcConsumer.registerServiceHandler( 'ContributionRecordSynced', - this.handleContributionRecordSynced.bind(this), + this.withIdempotency(this.handleContributionRecordSynced.bind(this)), ); // NetworkProgressUpdated 事件 - 同步全网算力进度 this.cdcConsumer.registerServiceHandler( 'NetworkProgressUpdated', - this.handleNetworkProgressUpdated.bind(this), + this.withIdempotency(this.handleNetworkProgressUpdated.bind(this)), ); // =========================================================================== @@ -122,15 +147,15 @@ export class CdcSyncService implements OnModuleInit { this.cdcConsumer.addTopic(miningTopic); this.cdcConsumer.registerServiceHandler( 'MiningAccountUpdated', - this.handleMiningAccountUpdated.bind(this), + this.withIdempotency(this.handleMiningAccountUpdated.bind(this)), ); this.cdcConsumer.registerServiceHandler( 'MiningConfigUpdated', - this.handleMiningConfigUpdated.bind(this), + this.withIdempotency(this.handleMiningConfigUpdated.bind(this)), ); this.cdcConsumer.registerServiceHandler( 'DailyMiningStatCreated', - this.handleDailyMiningStatCreated.bind(this), + this.withIdempotency(this.handleDailyMiningStatCreated.bind(this)), ); // =========================================================================== @@ -143,15 +168,15 @@ export class CdcSyncService implements OnModuleInit { this.cdcConsumer.addTopic(tradingTopic); this.cdcConsumer.registerServiceHandler( 'TradingAccountUpdated', - this.handleTradingAccountUpdated.bind(this), + this.withIdempotency(this.handleTradingAccountUpdated.bind(this)), ); this.cdcConsumer.registerServiceHandler( 'DayKLineCreated', - this.handleDayKLineCreated.bind(this), + this.withIdempotency(this.handleDayKLineCreated.bind(this)), ); this.cdcConsumer.registerServiceHandler( 'CirculationPoolUpdated', - this.handleCirculationPoolUpdated.bind(this), + this.withIdempotency(this.handleCirculationPoolUpdated.bind(this)), ); @@ -167,126 +192,126 @@ export class CdcSyncService implements OnModuleInit { // 区域数据 this.cdcConsumer.registerServiceHandler( 'ProvinceCreated', - this.walletHandlers.handleProvinceCreated.bind(this.walletHandlers), + this.withIdempotency(this.walletHandlers.handleProvinceCreated.bind(this.walletHandlers)), ); this.cdcConsumer.registerServiceHandler( 'ProvinceUpdated', - this.walletHandlers.handleProvinceUpdated.bind(this.walletHandlers), + this.withIdempotency(this.walletHandlers.handleProvinceUpdated.bind(this.walletHandlers)), ); this.cdcConsumer.registerServiceHandler( 'CityCreated', - this.walletHandlers.handleCityCreated.bind(this.walletHandlers), + this.withIdempotency(this.walletHandlers.handleCityCreated.bind(this.walletHandlers)), ); this.cdcConsumer.registerServiceHandler( 'CityUpdated', - this.walletHandlers.handleCityUpdated.bind(this.walletHandlers), + this.withIdempotency(this.walletHandlers.handleCityUpdated.bind(this.walletHandlers)), ); this.cdcConsumer.registerServiceHandler( 'UserRegionMappingCreated', - this.walletHandlers.handleUserRegionMappingCreated.bind(this.walletHandlers), + this.withIdempotency(this.walletHandlers.handleUserRegionMappingCreated.bind(this.walletHandlers)), ); this.cdcConsumer.registerServiceHandler( 'UserRegionMappingUpdated', - this.walletHandlers.handleUserRegionMappingUpdated.bind(this.walletHandlers), + this.withIdempotency(this.walletHandlers.handleUserRegionMappingUpdated.bind(this.walletHandlers)), ); // 系统账户 this.cdcConsumer.registerServiceHandler( 'WalletSystemAccountCreated', - this.walletHandlers.handleWalletSystemAccountCreated.bind(this.walletHandlers), + this.withIdempotency(this.walletHandlers.handleWalletSystemAccountCreated.bind(this.walletHandlers)), ); this.cdcConsumer.registerServiceHandler( 'WalletSystemAccountUpdated', - this.walletHandlers.handleWalletSystemAccountUpdated.bind(this.walletHandlers), + this.withIdempotency(this.walletHandlers.handleWalletSystemAccountUpdated.bind(this.walletHandlers)), ); // 池账户 this.cdcConsumer.registerServiceHandler( 'WalletPoolAccountCreated', - this.walletHandlers.handleWalletPoolAccountCreated.bind(this.walletHandlers), + this.withIdempotency(this.walletHandlers.handleWalletPoolAccountCreated.bind(this.walletHandlers)), ); this.cdcConsumer.registerServiceHandler( 'WalletPoolAccountUpdated', - this.walletHandlers.handleWalletPoolAccountUpdated.bind(this.walletHandlers), + this.withIdempotency(this.walletHandlers.handleWalletPoolAccountUpdated.bind(this.walletHandlers)), ); // 用户钱包 this.cdcConsumer.registerServiceHandler( 'UserWalletCreated', - this.walletHandlers.handleUserWalletCreated.bind(this.walletHandlers), + this.withIdempotency(this.walletHandlers.handleUserWalletCreated.bind(this.walletHandlers)), ); this.cdcConsumer.registerServiceHandler( 'UserWalletUpdated', - this.walletHandlers.handleUserWalletUpdated.bind(this.walletHandlers), + this.withIdempotency(this.walletHandlers.handleUserWalletUpdated.bind(this.walletHandlers)), ); // 提现请求 this.cdcConsumer.registerServiceHandler( 'WithdrawRequestCreated', - this.walletHandlers.handleWithdrawRequestCreated.bind(this.walletHandlers), + this.withIdempotency(this.walletHandlers.handleWithdrawRequestCreated.bind(this.walletHandlers)), ); this.cdcConsumer.registerServiceHandler( 'WithdrawRequestUpdated', - this.walletHandlers.handleWithdrawRequestUpdated.bind(this.walletHandlers), + this.withIdempotency(this.walletHandlers.handleWithdrawRequestUpdated.bind(this.walletHandlers)), ); // 充值记录 this.cdcConsumer.registerServiceHandler( 'DepositRecordCreated', - this.walletHandlers.handleDepositRecordCreated.bind(this.walletHandlers), + this.withIdempotency(this.walletHandlers.handleDepositRecordCreated.bind(this.walletHandlers)), ); this.cdcConsumer.registerServiceHandler( 'DepositRecordUpdated', - this.walletHandlers.handleDepositRecordUpdated.bind(this.walletHandlers), + this.withIdempotency(this.walletHandlers.handleDepositRecordUpdated.bind(this.walletHandlers)), ); // DEX Swap this.cdcConsumer.registerServiceHandler( 'DexSwapRecordCreated', - this.walletHandlers.handleDexSwapRecordCreated.bind(this.walletHandlers), + this.withIdempotency(this.walletHandlers.handleDexSwapRecordCreated.bind(this.walletHandlers)), ); this.cdcConsumer.registerServiceHandler( 'DexSwapRecordUpdated', - this.walletHandlers.handleDexSwapRecordUpdated.bind(this.walletHandlers), + this.withIdempotency(this.walletHandlers.handleDexSwapRecordUpdated.bind(this.walletHandlers)), ); // 地址绑定 this.cdcConsumer.registerServiceHandler( 'BlockchainAddressBindingCreated', - this.walletHandlers.handleBlockchainAddressBindingCreated.bind(this.walletHandlers), + this.withIdempotency(this.walletHandlers.handleBlockchainAddressBindingCreated.bind(this.walletHandlers)), ); this.cdcConsumer.registerServiceHandler( 'BlockchainAddressBindingUpdated', - this.walletHandlers.handleBlockchainAddressBindingUpdated.bind(this.walletHandlers), + this.withIdempotency(this.walletHandlers.handleBlockchainAddressBindingUpdated.bind(this.walletHandlers)), ); // 黑洞合约 this.cdcConsumer.registerServiceHandler( 'BlackHoleContractCreated', - this.walletHandlers.handleBlackHoleContractCreated.bind(this.walletHandlers), + this.withIdempotency(this.walletHandlers.handleBlackHoleContractCreated.bind(this.walletHandlers)), ); this.cdcConsumer.registerServiceHandler( 'BlackHoleContractUpdated', - this.walletHandlers.handleBlackHoleContractUpdated.bind(this.walletHandlers), + this.withIdempotency(this.walletHandlers.handleBlackHoleContractUpdated.bind(this.walletHandlers)), ); // 销毁记录 this.cdcConsumer.registerServiceHandler( 'BurnToBlackHoleRecordCreated', - this.walletHandlers.handleBurnToBlackHoleRecordCreated.bind(this.walletHandlers), + this.withIdempotency(this.walletHandlers.handleBurnToBlackHoleRecordCreated.bind(this.walletHandlers)), ); // 费率配置 this.cdcConsumer.registerServiceHandler( 'FeeConfigCreated', - this.walletHandlers.handleFeeConfigCreated.bind(this.walletHandlers), + this.withIdempotency(this.walletHandlers.handleFeeConfigCreated.bind(this.walletHandlers)), ); this.cdcConsumer.registerServiceHandler( 'FeeConfigUpdated', - this.walletHandlers.handleFeeConfigUpdated.bind(this.walletHandlers), + this.withIdempotency(this.walletHandlers.handleFeeConfigUpdated.bind(this.walletHandlers)), ); - this.logger.log('CDC sync handlers registered'); + this.logger.log('CDC sync handlers registered with idempotency protection'); } // =========================================================================== @@ -925,20 +950,47 @@ export class CdcSyncService implements OnModuleInit { // 辅助方法 // =========================================================================== + /** + * 检查事件是否已处理(幂等性检查) + * 使用 sourceTopic + eventId 作为复合唯一键(全局唯一) + * 这是业界标准的 CDC 幂等消费模式 + */ + private async isEventProcessed(event: ServiceEvent): Promise { + try { + const existing = await this.prisma.processedEvent.findUnique({ + where: { + sourceService_eventId: { + sourceService: event.sourceTopic, + eventId: event.id, + }, + }, + }); + return !!existing; + } catch (error) { + this.logger.warn(`Failed to check processed event: ${event.sourceTopic}:${event.id}`); + return false; + } + } + private async recordProcessedEvent(event: ServiceEvent): Promise { try { await this.prisma.processedEvent.upsert({ - where: { eventId: event.id }, + where: { + sourceService_eventId: { + sourceService: event.sourceTopic, + eventId: event.id, + }, + }, create: { eventId: event.id, eventType: event.eventType, - sourceService: event.aggregateType, + sourceService: event.sourceTopic, }, update: {}, }); } catch (error) { // 忽略幂等性记录失败 - this.logger.warn(`Failed to record processed event: ${event.id}`); + this.logger.warn(`Failed to record processed event: ${event.sourceTopic}:${event.id}`); } } }