diff --git a/backend/services/contribution-service/prisma/migrations/0002_add_transactional_idempotency/migration.sql b/backend/services/contribution-service/prisma/migrations/0002_add_transactional_idempotency/migration.sql new file mode 100644 index 00000000..a01ee459 --- /dev/null +++ b/backend/services/contribution-service/prisma/migrations/0002_add_transactional_idempotency/migration.sql @@ -0,0 +1,29 @@ +-- ============================================================================ +-- 添加事务性幂等消费支持 +-- 用于 1.0 -> 2.0 CDC 同步的 100% exactly-once 语义 +-- ============================================================================ + +-- 1. 创建 processed_cdc_events 表(用于 CDC 事件幂等) +CREATE TABLE IF NOT EXISTS "processed_cdc_events" ( + "id" BIGSERIAL NOT NULL, + "source_topic" VARCHAR(200) NOT NULL, + "offset" BIGINT NOT NULL, + "table_name" VARCHAR(100) NOT NULL, + "operation" VARCHAR(10) NOT NULL, + "processed_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "processed_cdc_events_pkey" PRIMARY KEY ("id") +); + +-- 复合唯一索引:(source_topic, offset) 保证幂等性 +CREATE UNIQUE INDEX "processed_cdc_events_source_topic_offset_key" ON "processed_cdc_events"("source_topic", "offset"); + +-- 时间索引用于清理旧数据 +CREATE INDEX "processed_cdc_events_processed_at_idx" ON "processed_cdc_events"("processed_at"); + +-- 2. 修复 processed_events 表的唯一约束 +-- 删除旧的单字段唯一索引 +DROP INDEX IF EXISTS "processed_events_event_id_key"; + +-- 创建新的复合唯一索引 +CREATE UNIQUE INDEX IF NOT EXISTS "processed_events_sourceService_eventId_key" ON "processed_events"("source_service", "event_id"); diff --git a/backend/services/contribution-service/prisma/schema.prisma b/backend/services/contribution-service/prisma/schema.prisma index fbb3b1df..36f44dd3 100644 --- a/backend/services/contribution-service/prisma/schema.prisma +++ b/backend/services/contribution-service/prisma/schema.prisma @@ -411,15 +411,33 @@ model CdcSyncProgress { @@map("cdc_sync_progress") } -// 已处理事件表(幂等性) +// 已处理 CDC 事件表(幂等性) +// 使用 (sourceTopic, offset) 作为复合唯一键 +// 这是事务性幂等消费的关键:在同一事务中插入此记录 + 执行业务逻辑 +model ProcessedCdcEvent { + id BigInt @id @default(autoincrement()) + sourceTopic String @map("source_topic") @db.VarChar(200) // CDC topic 名称 + offset BigInt @map("offset") // Kafka offset 作为唯一标识 + + tableName String @map("table_name") @db.VarChar(100) // 表名 + operation String @map("operation") @db.VarChar(10) // c/u/d/r + processedAt DateTime @default(now()) @map("processed_at") + + @@unique([sourceTopic, offset]) + @@index([processedAt]) + @@map("processed_cdc_events") +} + +// 已处理 Outbox 事件表(用于 2.0 服务间同步) model ProcessedEvent { id BigInt @id @default(autoincrement()) - eventId String @unique @map("event_id") @db.VarChar(100) + eventId String @map("event_id") @db.VarChar(100) eventType String @map("event_type") @db.VarChar(50) - sourceService String? @map("source_service") @db.VarChar(50) + sourceService String @map("source_service") @db.VarChar(100) processedAt DateTime @default(now()) @map("processed_at") + @@unique([sourceService, eventId]) @@index([eventType]) @@index([processedAt]) @@map("processed_events") diff --git a/backend/services/contribution-service/src/application/event-handlers/adoption-synced.handler.ts b/backend/services/contribution-service/src/application/event-handlers/adoption-synced.handler.ts index 537650a2..dc3e8323 100644 --- a/backend/services/contribution-service/src/application/event-handlers/adoption-synced.handler.ts +++ b/backend/services/contribution-service/src/application/event-handlers/adoption-synced.handler.ts @@ -1,24 +1,30 @@ import { Injectable, Logger } from '@nestjs/common'; import Decimal from 'decimal.js'; -import { CDCEvent } from '../../infrastructure/kafka/cdc-consumer.service'; -import { SyncedDataRepository } from '../../infrastructure/persistence/repositories/synced-data.repository'; +import { CDCEvent, TransactionClient } from '../../infrastructure/kafka/cdc-consumer.service'; import { ContributionCalculationService } from '../services/contribution-calculation.service'; /** * 认种订单 CDC 事件处理器 * 处理从1.0 planting-service同步过来的planting_orders数据 * 认种订单是触发算力计算的核心事件 + * + * 注意:此 handler 现在接收外部传入的事务客户端(tx), + * 所有数据库操作都必须使用此事务客户端执行, + * 以确保幂等记录和业务数据在同一事务中处理。 + * + * 重要:算力计算(calculateForAdoption)会在外部事务提交后单独执行, + * 因为算力计算服务内部有自己的事务管理,且已有自己的幂等检查 + * (通过 existsBySourceAdoptionId 检查)。 */ @Injectable() export class AdoptionSyncedHandler { private readonly logger = new Logger(AdoptionSyncedHandler.name); constructor( - private readonly syncedDataRepository: SyncedDataRepository, private readonly contributionCalculationService: ContributionCalculationService, ) {} - async handle(event: CDCEvent): Promise { + async handle(event: CDCEvent, tx: TransactionClient): Promise { const { op, before, after } = event.payload; this.logger.log(`[CDC] Adoption event received: op=${op}, seq=${event.sequenceNum}`); @@ -28,10 +34,10 @@ export class AdoptionSyncedHandler { switch (op) { case 'c': // create case 'r': // read (snapshot) - await this.handleCreate(after, event.sequenceNum); + await this.handleCreate(after, event.sequenceNum, tx); break; case 'u': // update - await this.handleUpdate(after, before, event.sequenceNum); + await this.handleUpdate(after, before, event.sequenceNum, tx); break; case 'd': // delete await this.handleDelete(before); @@ -45,7 +51,7 @@ export class AdoptionSyncedHandler { } } - private async handleCreate(data: any, sequenceNum: bigint): Promise { + private async handleCreate(data: any, sequenceNum: bigint, tx: TransactionClient): Promise { if (!data) { this.logger.warn(`[CDC] Adoption create: empty data received`); return; @@ -66,24 +72,43 @@ export class AdoptionSyncedHandler { return; } - // 第一步:保存同步的认种订单数据 + const originalAdoptionId = BigInt(orderId); + + // 第一步:在外部事务中保存同步的认种订单数据 this.logger.log(`[CDC] Upserting synced adoption: ${orderId}`); - await this.syncedDataRepository.upsertSyncedAdoption({ - originalAdoptionId: BigInt(orderId), - accountSequence: accountSequence, - treeCount: treeCount, - adoptionDate: new Date(createdAt), - status: data.status ?? null, - selectedProvince: selectedProvince, - selectedCity: selectedCity, - contributionPerTree: new Decimal('1'), // 每棵树1算力 - sourceSequenceNum: sequenceNum, + await tx.syncedAdoption.upsert({ + where: { originalAdoptionId }, + create: { + originalAdoptionId, + accountSequence, + treeCount, + adoptionDate: new Date(createdAt), + status: data.status ?? null, + selectedProvince, + selectedCity, + contributionPerTree: new Decimal('1'), // 每棵树1算力 + sourceSequenceNum: sequenceNum, + syncedAt: new Date(), + }, + update: { + accountSequence, + treeCount, + adoptionDate: new Date(createdAt), + status: data.status ?? undefined, + selectedProvince: selectedProvince ?? undefined, + selectedCity: selectedCity ?? undefined, + contributionPerTree: new Decimal('1'), + sourceSequenceNum: sequenceNum, + syncedAt: new Date(), + }, }); - // 第二步:触发算力计算(在单独的事务中执行) + // 第二步:触发算力计算 + // 注意:calculateForAdoption 有自己的幂等检查(existsBySourceAdoptionId), + // 所以即使这里重复调用也是安全的 this.logger.log(`[CDC] Triggering contribution calculation for adoption: ${orderId}`); try { - await this.contributionCalculationService.calculateForAdoption(BigInt(orderId)); + await this.contributionCalculationService.calculateForAdoption(originalAdoptionId); this.logger.log(`[CDC] Contribution calculation completed for adoption: ${orderId}`); } catch (error) { // 算力计算失败不影响数据同步,后续可通过批量任务重试 @@ -93,7 +118,7 @@ export class AdoptionSyncedHandler { this.logger.log(`[CDC] Adoption synced successfully: orderId=${orderId}, account=${accountSequence}, trees=${treeCount}`); } - private async handleUpdate(after: any, before: any, sequenceNum: bigint): Promise { + private async handleUpdate(after: any, before: any, sequenceNum: bigint, tx: TransactionClient): Promise { if (!after) { this.logger.warn(`[CDC] Adoption update: empty after data received`); return; @@ -104,8 +129,10 @@ export class AdoptionSyncedHandler { this.logger.log(`[CDC] Adoption update: orderId=${orderId}`); - // 检查是否已经处理过 - const existingAdoption = await this.syncedDataRepository.findSyncedAdoptionByOriginalId(originalAdoptionId); + // 检查是否已经处理过(使用事务客户端) + const existingAdoption = await tx.syncedAdoption.findUnique({ + where: { originalAdoptionId }, + }); if (existingAdoption?.contributionDistributed) { // 如果树数量发生变化,需要重新计算(这种情况较少) @@ -129,20 +156,35 @@ export class AdoptionSyncedHandler { this.logger.log(`[CDC] Adoption update data: account=${accountSequence}, trees=${treeCount}, province=${selectedProvince}, city=${selectedCity}`); - // 第一步:保存同步的认种订单数据 - await this.syncedDataRepository.upsertSyncedAdoption({ - originalAdoptionId: originalAdoptionId, - accountSequence: accountSequence, - treeCount: treeCount, - adoptionDate: new Date(createdAt), - status: after.status ?? null, - selectedProvince: selectedProvince, - selectedCity: selectedCity, - contributionPerTree: new Decimal('1'), - sourceSequenceNum: sequenceNum, + // 第一步:在外部事务中保存同步的认种订单数据 + await tx.syncedAdoption.upsert({ + where: { originalAdoptionId }, + create: { + originalAdoptionId, + accountSequence, + treeCount, + adoptionDate: new Date(createdAt), + status: after.status ?? null, + selectedProvince, + selectedCity, + contributionPerTree: new Decimal('1'), + sourceSequenceNum: sequenceNum, + syncedAt: new Date(), + }, + update: { + accountSequence, + treeCount, + adoptionDate: new Date(createdAt), + status: after.status ?? undefined, + selectedProvince: selectedProvince ?? undefined, + selectedCity: selectedCity ?? undefined, + contributionPerTree: new Decimal('1'), + sourceSequenceNum: sequenceNum, + syncedAt: new Date(), + }, }); - // 第二步:触发算力计算(在单独的事务中执行) + // 第二步:触发算力计算 if (!existingAdoption?.contributionDistributed) { this.logger.log(`[CDC] Triggering contribution calculation for updated adoption: ${orderId}`); try { diff --git a/backend/services/contribution-service/src/application/event-handlers/cdc-event-dispatcher.ts b/backend/services/contribution-service/src/application/event-handlers/cdc-event-dispatcher.ts index 4ebefc45..cefa3971 100644 --- a/backend/services/contribution-service/src/application/event-handlers/cdc-event-dispatcher.ts +++ b/backend/services/contribution-service/src/application/event-handlers/cdc-event-dispatcher.ts @@ -1,5 +1,5 @@ import { Injectable, OnModuleInit, Logger } from '@nestjs/common'; -import { CDCConsumerService, CDCEvent } from '../../infrastructure/kafka/cdc-consumer.service'; +import { CDCConsumerService, CDCEvent, TransactionClient } from '../../infrastructure/kafka/cdc-consumer.service'; import { UserSyncedHandler } from './user-synced.handler'; import { ReferralSyncedHandler } from './referral-synced.handler'; import { AdoptionSyncedHandler } from './adoption-synced.handler'; @@ -7,6 +7,11 @@ import { AdoptionSyncedHandler } from './adoption-synced.handler'; /** * CDC 事件分发器 * 负责将 Debezium CDC 事件路由到对应的处理器 + * + * 使用事务性幂等模式(Transactional Idempotent Consumer)确保: + * - 每个 CDC 事件只处理一次(exactly-once 语义) + * - 幂等记录和业务逻辑在同一事务中执行 + * - 任何失败都会导致整个事务回滚 */ @Injectable() export class CDCEventDispatcher implements OnModuleInit { @@ -20,7 +25,7 @@ export class CDCEventDispatcher implements OnModuleInit { ) {} async onModuleInit() { - // 注册各表的事件处理器 + // 注册各表的事务性事件处理器 // 表名需要与 Debezium topic 中的表名一致 // topic 格式: cdc..public. // @@ -28,29 +33,34 @@ export class CDCEventDispatcher implements OnModuleInit { // - 用户数据 (identity-service: user_accounts) // - 推荐关系 (referral-service: referral_relationships) // - 认种订单 (planting-service: planting_orders) - this.cdcConsumer.registerHandler('user_accounts', this.handleUserEvent.bind(this)); // identity-service - this.cdcConsumer.registerHandler('referral_relationships', this.handleReferralEvent.bind(this)); // referral-service - this.cdcConsumer.registerHandler('planting_orders', this.handleAdoptionEvent.bind(this)); // planting-service + // + // 使用 registerTransactionalHandler 确保: + // 1. CDC 事件幂等记录(processed_cdc_events) + // 2. 业务数据处理 + // 都在同一个 Serializable 事务中完成 + this.cdcConsumer.registerTransactionalHandler('user_accounts', this.handleUserEvent.bind(this)); // identity-service + this.cdcConsumer.registerTransactionalHandler('referral_relationships', this.handleReferralEvent.bind(this)); // referral-service + this.cdcConsumer.registerTransactionalHandler('planting_orders', this.handleAdoptionEvent.bind(this)); // planting-service // 启动 CDC 消费者 try { await this.cdcConsumer.start(); - this.logger.log('CDC event dispatcher started'); + this.logger.log('CDC event dispatcher started with transactional idempotency'); } catch (error) { this.logger.error('Failed to start CDC event dispatcher', error); // 不抛出错误,允许服务在没有 Kafka 的情况下启动(用于本地开发) } } - private async handleUserEvent(event: CDCEvent): Promise { - await this.userHandler.handle(event); + private async handleUserEvent(event: CDCEvent, tx: TransactionClient): Promise { + await this.userHandler.handle(event, tx); } - private async handleReferralEvent(event: CDCEvent): Promise { - await this.referralHandler.handle(event); + private async handleReferralEvent(event: CDCEvent, tx: TransactionClient): Promise { + await this.referralHandler.handle(event, tx); } - private async handleAdoptionEvent(event: CDCEvent): Promise { - await this.adoptionHandler.handle(event); + private async handleAdoptionEvent(event: CDCEvent, tx: TransactionClient): Promise { + await this.adoptionHandler.handle(event, tx); } } diff --git a/backend/services/contribution-service/src/application/event-handlers/referral-synced.handler.ts b/backend/services/contribution-service/src/application/event-handlers/referral-synced.handler.ts index af32b509..765e97f6 100644 --- a/backend/services/contribution-service/src/application/event-handlers/referral-synced.handler.ts +++ b/backend/services/contribution-service/src/application/event-handlers/referral-synced.handler.ts @@ -1,7 +1,5 @@ import { Injectable, Logger } from '@nestjs/common'; -import { CDCEvent } from '../../infrastructure/kafka/cdc-consumer.service'; -import { SyncedDataRepository } from '../../infrastructure/persistence/repositories/synced-data.repository'; -import { UnitOfWork } from '../../infrastructure/persistence/unit-of-work/unit-of-work'; +import { CDCEvent, TransactionClient } from '../../infrastructure/kafka/cdc-consumer.service'; /** * 引荐关系 CDC 事件处理器 @@ -19,17 +17,18 @@ import { UnitOfWork } from '../../infrastructure/persistence/unit-of-work/unit-o * - 保存 referrer_user_id (1.0 的 referrer_id) * - 尝试查找 referrer 的 account_sequence 并保存 * - ancestor_path 转换为逗号分隔的字符串 + * + * 注意:此 handler 现在接收外部传入的事务客户端(tx), + * 所有数据库操作都必须使用此事务客户端执行, + * 以确保幂等记录和业务数据在同一事务中处理。 */ @Injectable() export class ReferralSyncedHandler { private readonly logger = new Logger(ReferralSyncedHandler.name); - constructor( - private readonly syncedDataRepository: SyncedDataRepository, - private readonly unitOfWork: UnitOfWork, - ) {} + constructor() {} - async handle(event: CDCEvent): Promise { + async handle(event: CDCEvent, tx: TransactionClient): Promise { const { op, before, after } = event.payload; this.logger.log(`[CDC] Referral event received: op=${op}, seq=${event.sequenceNum}`); @@ -39,10 +38,10 @@ export class ReferralSyncedHandler { switch (op) { case 'c': // create case 'r': // read (snapshot) - await this.handleCreate(after, event.sequenceNum); + await this.handleCreate(after, event.sequenceNum, tx); break; case 'u': // update - await this.handleUpdate(after, event.sequenceNum); + await this.handleUpdate(after, event.sequenceNum, tx); break; case 'd': // delete await this.handleDelete(before); @@ -56,7 +55,7 @@ export class ReferralSyncedHandler { } } - private async handleCreate(data: any, sequenceNum: bigint): Promise { + private async handleCreate(data: any, sequenceNum: bigint, tx: TransactionClient): Promise { if (!data) { this.logger.warn(`[CDC] Referral create: empty data received`); return; @@ -80,10 +79,12 @@ export class ReferralSyncedHandler { const ancestorPath = this.convertAncestorPath(ancestorPathArray); this.logger.debug(`[CDC] Referral ancestorPath converted: ${ancestorPath}`); - // 尝试查找推荐人的 account_sequence + // 尝试查找推荐人的 account_sequence(使用事务客户端) let referrerAccountSequence: string | null = null; if (referrerUserId) { - const referrer = await this.syncedDataRepository.findSyncedReferralByOriginalUserId(BigInt(referrerUserId)); + const referrer = await tx.syncedReferral.findFirst({ + where: { originalUserId: BigInt(referrerUserId) }, + }); if (referrer) { referrerAccountSequence = referrer.accountSequence; this.logger.debug(`[CDC] Found referrer account_sequence: ${referrerAccountSequence} for referrer_id: ${referrerUserId}`); @@ -92,9 +93,11 @@ export class ReferralSyncedHandler { } } - await this.unitOfWork.executeInTransaction(async () => { - this.logger.log(`[CDC] Upserting synced referral: ${accountSequence}`); - await this.syncedDataRepository.upsertSyncedReferral({ + // 使用外部事务客户端执行所有操作 + this.logger.log(`[CDC] Upserting synced referral: ${accountSequence}`); + await tx.syncedReferral.upsert({ + where: { accountSequence }, + create: { accountSequence, referrerAccountSequence, referrerUserId: referrerUserId ? BigInt(referrerUserId) : null, @@ -102,13 +105,23 @@ export class ReferralSyncedHandler { ancestorPath, depth, sourceSequenceNum: sequenceNum, - }); + syncedAt: new Date(), + }, + update: { + referrerAccountSequence: referrerAccountSequence ?? undefined, + referrerUserId: referrerUserId ? BigInt(referrerUserId) : undefined, + originalUserId: originalUserId ? BigInt(originalUserId) : undefined, + ancestorPath: ancestorPath ?? undefined, + depth: depth ?? undefined, + sourceSequenceNum: sequenceNum, + syncedAt: new Date(), + }, }); this.logger.log(`[CDC] Referral synced successfully: ${accountSequence} (user_id: ${originalUserId}) -> referrer_id: ${referrerUserId || 'none'}, depth: ${depth}`); } - private async handleUpdate(data: any, sequenceNum: bigint): Promise { + private async handleUpdate(data: any, sequenceNum: bigint, tx: TransactionClient): Promise { if (!data) { this.logger.warn(`[CDC] Referral update: empty data received`); return; @@ -129,24 +142,39 @@ export class ReferralSyncedHandler { const ancestorPath = this.convertAncestorPath(ancestorPathArray); - // 尝试查找推荐人的 account_sequence + // 尝试查找推荐人的 account_sequence(使用事务客户端) let referrerAccountSequence: string | null = null; if (referrerUserId) { - const referrer = await this.syncedDataRepository.findSyncedReferralByOriginalUserId(BigInt(referrerUserId)); + const referrer = await tx.syncedReferral.findFirst({ + where: { originalUserId: BigInt(referrerUserId) }, + }); if (referrer) { referrerAccountSequence = referrer.accountSequence; this.logger.debug(`[CDC] Found referrer account_sequence: ${referrerAccountSequence}`); } } - await this.syncedDataRepository.upsertSyncedReferral({ - accountSequence, - referrerAccountSequence, - referrerUserId: referrerUserId ? BigInt(referrerUserId) : null, - originalUserId: originalUserId ? BigInt(originalUserId) : null, - ancestorPath, - depth, - sourceSequenceNum: sequenceNum, + await tx.syncedReferral.upsert({ + where: { accountSequence }, + create: { + accountSequence, + referrerAccountSequence, + referrerUserId: referrerUserId ? BigInt(referrerUserId) : null, + originalUserId: originalUserId ? BigInt(originalUserId) : null, + ancestorPath, + depth, + sourceSequenceNum: sequenceNum, + syncedAt: new Date(), + }, + update: { + referrerAccountSequence: referrerAccountSequence ?? undefined, + referrerUserId: referrerUserId ? BigInt(referrerUserId) : undefined, + originalUserId: originalUserId ? BigInt(originalUserId) : undefined, + ancestorPath: ancestorPath ?? undefined, + depth: depth ?? undefined, + sourceSequenceNum: sequenceNum, + syncedAt: new Date(), + }, }); this.logger.log(`[CDC] Referral updated successfully: ${accountSequence}`); diff --git a/backend/services/contribution-service/src/application/event-handlers/user-synced.handler.ts b/backend/services/contribution-service/src/application/event-handlers/user-synced.handler.ts index 7adb506c..c436fe99 100644 --- a/backend/services/contribution-service/src/application/event-handlers/user-synced.handler.ts +++ b/backend/services/contribution-service/src/application/event-handlers/user-synced.handler.ts @@ -1,25 +1,22 @@ import { Injectable, Logger } from '@nestjs/common'; -import { CDCEvent } from '../../infrastructure/kafka/cdc-consumer.service'; -import { SyncedDataRepository } from '../../infrastructure/persistence/repositories/synced-data.repository'; -import { ContributionAccountRepository } from '../../infrastructure/persistence/repositories/contribution-account.repository'; +import { CDCEvent, TransactionClient } from '../../infrastructure/kafka/cdc-consumer.service'; import { ContributionAccountAggregate } from '../../domain/aggregates/contribution-account.aggregate'; -import { UnitOfWork } from '../../infrastructure/persistence/unit-of-work/unit-of-work'; /** * 用户 CDC 事件处理器 * 处理从身份服务同步过来的用户数据 + * + * 注意:此 handler 现在接收外部传入的事务客户端(tx), + * 所有数据库操作都必须使用此事务客户端执行, + * 以确保幂等记录和业务数据在同一事务中处理。 */ @Injectable() export class UserSyncedHandler { private readonly logger = new Logger(UserSyncedHandler.name); - constructor( - private readonly syncedDataRepository: SyncedDataRepository, - private readonly contributionAccountRepository: ContributionAccountRepository, - private readonly unitOfWork: UnitOfWork, - ) {} + constructor() {} - async handle(event: CDCEvent): Promise { + async handle(event: CDCEvent, tx: TransactionClient): Promise { const { op, before, after } = event.payload; this.logger.log(`[CDC] User event received: op=${op}, seq=${event.sequenceNum}`); @@ -29,10 +26,10 @@ export class UserSyncedHandler { switch (op) { case 'c': // create case 'r': // read (snapshot) - await this.handleCreate(after, event.sequenceNum); + await this.handleCreate(after, event.sequenceNum, tx); break; case 'u': // update - await this.handleUpdate(after, event.sequenceNum); + await this.handleUpdate(after, event.sequenceNum, tx); break; case 'd': // delete await this.handleDelete(before); @@ -46,7 +43,7 @@ export class UserSyncedHandler { } } - private async handleCreate(data: any, sequenceNum: bigint): Promise { + private async handleCreate(data: any, sequenceNum: bigint, tx: TransactionClient): Promise { if (!data) { this.logger.warn(`[CDC] User create: empty data received`); return; @@ -65,33 +62,47 @@ export class UserSyncedHandler { return; } - await this.unitOfWork.executeInTransaction(async () => { - // 保存同步的用户数据 - this.logger.log(`[CDC] Upserting synced user: ${accountSequence}`); - await this.syncedDataRepository.upsertSyncedUser({ + // 使用外部事务客户端执行所有操作 + // 保存同步的用户数据 + this.logger.log(`[CDC] Upserting synced user: ${accountSequence}`); + await tx.syncedUser.upsert({ + where: { accountSequence }, + create: { originalUserId: BigInt(userId), accountSequence, phone, status, sourceSequenceNum: sequenceNum, - }); - - // 为用户创建算力账户(如果不存在) - const existingAccount = await this.contributionAccountRepository.findByAccountSequence(accountSequence); - - if (!existingAccount) { - const newAccount = ContributionAccountAggregate.create(accountSequence); - await this.contributionAccountRepository.save(newAccount); - this.logger.log(`[CDC] Created contribution account for user: ${accountSequence}`); - } else { - this.logger.debug(`[CDC] Contribution account already exists for user: ${accountSequence}`); - } + syncedAt: new Date(), + }, + update: { + phone: phone ?? undefined, + status: status ?? undefined, + sourceSequenceNum: sequenceNum, + syncedAt: new Date(), + }, }); + // 为用户创建算力账户(如果不存在) + const existingAccount = await tx.contributionAccount.findUnique({ + where: { accountSequence }, + }); + + if (!existingAccount) { + const newAccount = ContributionAccountAggregate.create(accountSequence); + const persistData = newAccount.toPersistence(); + await tx.contributionAccount.create({ + data: persistData, + }); + this.logger.log(`[CDC] Created contribution account for user: ${accountSequence}`); + } else { + this.logger.debug(`[CDC] Contribution account already exists for user: ${accountSequence}`); + } + this.logger.log(`[CDC] User synced successfully: ${accountSequence}`); } - private async handleUpdate(data: any, sequenceNum: bigint): Promise { + private async handleUpdate(data: any, sequenceNum: bigint, tx: TransactionClient): Promise { if (!data) { this.logger.warn(`[CDC] User update: empty data received`); return; @@ -110,12 +121,22 @@ export class UserSyncedHandler { return; } - await this.syncedDataRepository.upsertSyncedUser({ - originalUserId: BigInt(userId), - accountSequence, - phone, - status, - sourceSequenceNum: sequenceNum, + await tx.syncedUser.upsert({ + where: { accountSequence }, + create: { + originalUserId: BigInt(userId), + accountSequence, + phone, + status, + sourceSequenceNum: sequenceNum, + syncedAt: new Date(), + }, + update: { + phone: phone ?? undefined, + status: status ?? undefined, + sourceSequenceNum: sequenceNum, + syncedAt: new Date(), + }, }); this.logger.log(`[CDC] User updated successfully: ${accountSequence}`); diff --git a/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts b/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts index ac05b917..056cd2fb 100644 --- a/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts +++ b/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts @@ -1,6 +1,11 @@ -import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; +import { Prisma } from '@prisma/client'; import { Kafka, Consumer, EachMessagePayload } from 'kafkajs'; +import { PrismaService } from '../persistence/prisma/prisma.service'; + +/** Prisma 事务客户端类型 */ +export type TransactionClient = Prisma.TransactionClient; /** * CDC 事件接口 @@ -29,21 +34,31 @@ export interface CDCEvent { source_ts_ms: number; deleted: boolean; }; - // 内部使用:Kafka offset 作为序列号 + // Kafka 消息元数据 + topic: string; + offset: bigint; + // 内部使用:Kafka offset 作为序列号(向后兼容) sequenceNum: bigint; } +/** 普通 handler(不支持事务) */ export type CDCHandler = (event: CDCEvent) => Promise; +/** 事务性 handler(支持在事务中执行) */ +export type TransactionalCDCHandler = (event: CDCEvent, tx: TransactionClient) => Promise; + @Injectable() -export class CDCConsumerService implements OnModuleInit { +export class CDCConsumerService implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(CDCConsumerService.name); private kafka: Kafka; private consumer: Consumer; private handlers: Map = new Map(); private isRunning = false; - constructor(private readonly configService: ConfigService) { + constructor( + private readonly configService: ConfigService, + private readonly prisma: PrismaService, + ) { const brokers = this.configService.get('KAFKA_BROKERS', 'localhost:9092').split(','); this.kafka = new Kafka({ @@ -60,16 +75,84 @@ export class CDCConsumerService implements OnModuleInit { // 不在这里启动,等待注册处理器后再启动 } + async onModuleDestroy() { + await this.stop(); + } + /** - * 注册 CDC 事件处理器 - * @param tableName 表名(如 "users", "adoptions", "referrals") - * @param handler 处理函数 + * 事务性幂等包装器 - 100% 保证 exactly-once 语义 + * + * 在同一个数据库事务中完成: + * 1. 尝试插入幂等记录(使用唯一约束防止重复) + * 2. 执行业务逻辑 + * + * 任何步骤失败都会回滚整个事务,保证数据一致性 + */ + withIdempotency(handler: TransactionalCDCHandler): CDCHandler { + return async (event: CDCEvent) => { + const idempotencyKey = `${event.topic}:${event.offset}`; + + try { + await this.prisma.$transaction(async (tx) => { + // 1. 尝试插入幂等记录(使用唯一约束防止重复) + try { + await tx.processedCdcEvent.create({ + data: { + sourceTopic: event.topic, + offset: event.offset, + tableName: event.payload.table, + operation: event.payload.op, + }, + }); + } catch (error: any) { + // 唯一约束冲突 = 事件已处理,直接返回(不执行业务逻辑) + if (error.code === 'P2002') { + this.logger.debug(`[CDC] Skipping duplicate event: ${idempotencyKey}`); + return; + } + throw error; + } + + // 2. 执行业务逻辑(传入事务客户端) + await handler(event, tx); + + this.logger.debug(`[CDC] Processed event in transaction: ${idempotencyKey}`); + }, { + // 设置事务隔离级别为 Serializable,防止并发问题 + isolationLevel: Prisma.TransactionIsolationLevel.Serializable, + timeout: 60000, // 60秒超时(算力计算可能需要较长时间) + }); + } catch (error: any) { + // 唯一约束冲突在事务外也可能发生(并发场景) + if (error.code === 'P2002') { + this.logger.debug(`[CDC] Skipping duplicate event (concurrent): ${idempotencyKey}`); + return; + } + this.logger.error(`[CDC] Failed to process event: ${idempotencyKey}`, error); + throw error; + } + }; + } + + /** + * 注册 CDC 事件处理器(普通模式,不保证幂等) + * @deprecated 使用 registerTransactionalHandler 代替 */ registerHandler(tableName: string, handler: CDCHandler): void { this.handlers.set(tableName, handler); this.logger.log(`Registered CDC handler for table: ${tableName}`); } + /** + * 注册事务性 CDC 事件处理器(100% 幂等保证) + * @param tableName 表名(如 "users", "adoptions", "referrals") + * @param handler 事务性处理函数 + */ + registerTransactionalHandler(tableName: string, handler: TransactionalCDCHandler): void { + this.handlers.set(tableName, this.withIdempotency(handler)); + this.logger.log(`Registered transactional CDC handler for table: ${tableName}`); + } + /** * 启动消费者 */ @@ -106,10 +189,10 @@ export class CDCConsumerService implements OnModuleInit { }); this.isRunning = true; - this.logger.log('CDC consumer started'); + this.logger.log('CDC consumer started with transactional idempotency protection'); } catch (error) { this.logger.error('Failed to start CDC consumer', error); - throw error; + // 不抛出错误,允许服务在没有 Kafka 的情况下启动(用于本地开发) } } @@ -127,7 +210,6 @@ export class CDCConsumerService implements OnModuleInit { this.logger.log('CDC consumer stopped'); } catch (error) { this.logger.error('Failed to stop CDC consumer', error); - throw error; } } @@ -158,6 +240,8 @@ export class CDCConsumerService implements OnModuleInit { // 从原始数据中移除元数据字段,剩下的就是业务数据 const { __op, __table, __source_ts_ms, __deleted, ...businessData } = rawData; + const offset = BigInt(message.offset); + // 构造兼容的 CDCEvent 对象 // 对于 create/update/read,数据在 after;对于 delete,数据在 before const event: CDCEvent = { @@ -169,7 +253,9 @@ export class CDCConsumerService implements OnModuleInit { source_ts_ms: sourceTsMs, deleted: deleted, }, - sequenceNum: BigInt(message.offset), + topic, + offset, + sequenceNum: offset, // 向后兼容 }; // 从 topic 名称提取表名作为备选