diff --git a/backend/services/auth-service/prisma/migrations/0002_add_transactional_idempotency/migration.sql b/backend/services/auth-service/prisma/migrations/0002_add_transactional_idempotency/migration.sql new file mode 100644 index 00000000..206f5fe5 --- /dev/null +++ b/backend/services/auth-service/prisma/migrations/0002_add_transactional_idempotency/migration.sql @@ -0,0 +1,22 @@ +-- ============================================================================ +-- 添加事务性幂等消费支持 +-- 用于 1.0 -> 2.0 CDC 同步的 100% exactly-once 语义 +-- ============================================================================ + +-- 创建 processed_cdc_events 表(用于 CDC 事件幂等) +CREATE TABLE IF NOT EXISTS "processed_cdc_events" ( + "id" BIGSERIAL NOT NULL, + "source_topic" VARCHAR(200) NOT NULL, + "offset" BIGINT NOT NULL, + "table_name" VARCHAR(100) NOT NULL, + "operation" VARCHAR(10) NOT NULL, + "processed_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "processed_cdc_events_pkey" PRIMARY KEY ("id") +); + +-- 复合唯一索引:(source_topic, offset) 保证幂等性 +CREATE UNIQUE INDEX "processed_cdc_events_source_topic_offset_key" ON "processed_cdc_events"("source_topic", "offset"); + +-- 时间索引用于清理旧数据 +CREATE INDEX "processed_cdc_events_processed_at_idx" ON "processed_cdc_events"("processed_at"); diff --git a/backend/services/auth-service/prisma/schema.prisma b/backend/services/auth-service/prisma/schema.prisma index aef229da..a883a798 100644 --- a/backend/services/auth-service/prisma/schema.prisma +++ b/backend/services/auth-service/prisma/schema.prisma @@ -256,3 +256,24 @@ enum OutboxStatus { PUBLISHED FAILED } + +// ============================================================================ +// CDC 幂等消费追踪 +// ============================================================================ + +// 已处理 CDC 事件表(幂等性) +// 使用 (sourceTopic, offset) 作为复合唯一键 +// 这是事务性幂等消费的关键:在同一事务中插入此记录 + 执行业务逻辑 +model ProcessedCdcEvent { + id BigInt @id @default(autoincrement()) + sourceTopic String @map("source_topic") @db.VarChar(200) // CDC topic 名称 + offset BigInt @map("offset") // Kafka offset 作为唯一标识 + + tableName String @map("table_name") @db.VarChar(100) // 表名 + operation String @map("operation") @db.VarChar(10) // c/u/d/r + processedAt DateTime @default(now()) @map("processed_at") + + @@unique([sourceTopic, offset]) + @@index([processedAt]) + @@map("processed_cdc_events") +} diff --git a/backend/services/auth-service/src/infrastructure/messaging/cdc/legacy-user-cdc.consumer.ts b/backend/services/auth-service/src/infrastructure/messaging/cdc/legacy-user-cdc.consumer.ts index 26065aaa..4dd97359 100644 --- a/backend/services/auth-service/src/infrastructure/messaging/cdc/legacy-user-cdc.consumer.ts +++ b/backend/services/auth-service/src/infrastructure/messaging/cdc/legacy-user-cdc.consumer.ts @@ -1,10 +1,16 @@ import { Injectable, OnModuleInit, OnModuleDestroy, Logger } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { Kafka, Consumer, EachMessagePayload } from 'kafkajs'; +import { Prisma, PrismaClient } from '@prisma/client'; import { PrismaService } from '@/infrastructure/persistence/prisma/prisma.service'; -import { OutboxService } from '@/application/services/outbox.service'; import { LegacyUserMigratedEvent } from '@/domain'; +/** Prisma 事务客户端类型 */ +type TransactionClient = Omit< + PrismaClient, + '$connect' | '$disconnect' | '$on' | '$transaction' | '$use' | '$extends' +>; + /** * ExtractNewRecordState 转换后的消息格式 * 字段来自 identity-service 的 user_accounts 表 + Debezium 元数据 @@ -29,6 +35,11 @@ interface UnwrappedCdcUser { /** * CDC Consumer - 消费 1.0 用户变更事件 * 监听 Debezium 发送的 CDC 事件,同步到 synced_legacy_users 表 + * + * 实现事务性幂等消费(Transactional Idempotent Consumer)确保: + * - 每个 CDC 事件只处理一次(exactly-once 语义) + * - 幂等记录(processed_cdc_events)和业务逻辑在同一事务中执行 + * - 任何失败都会导致整个事务回滚 */ @Injectable() export class LegacyUserCdcConsumer implements OnModuleInit, OnModuleDestroy { @@ -36,11 +47,11 @@ export class LegacyUserCdcConsumer implements OnModuleInit, OnModuleDestroy { private kafka: Kafka; private consumer: Consumer; private isConnected = false; + private topic: string; constructor( private readonly configService: ConfigService, private readonly prisma: PrismaService, - private readonly outboxService: OutboxService, ) { const brokers = this.configService.get('KAFKA_BROKERS', 'localhost:9092').split(','); @@ -52,6 +63,8 @@ export class LegacyUserCdcConsumer implements OnModuleInit, OnModuleDestroy { this.consumer = this.kafka.consumer({ groupId: this.configService.get('CDC_CONSUMER_GROUP', 'auth-service-cdc-group'), }); + + this.topic = this.configService.get('CDC_TOPIC_USERS', 'cdc.identity.public.user_accounts'); } async onModuleInit() { @@ -65,10 +78,7 @@ export class LegacyUserCdcConsumer implements OnModuleInit, OnModuleDestroy { await this.consumer.connect(); this.isConnected = true; - // Topic 格式: {topic.prefix}.{schema}.{table} - // identity-connector.json 配置: topic.prefix = "cdc.identity" - const topic = this.configService.get('CDC_TOPIC_USERS', 'cdc.identity.public.user_accounts'); - await this.consumer.subscribe({ topic, fromBeginning: true }); + await this.consumer.subscribe({ topic: this.topic, fromBeginning: true }); await this.consumer.run({ eachMessage: async (payload) => { @@ -76,7 +86,7 @@ export class LegacyUserCdcConsumer implements OnModuleInit, OnModuleDestroy { }, }); - this.logger.log(`CDC Consumer started, listening to topic: ${topic}`); + this.logger.log(`CDC Consumer started with transactional idempotency, listening to topic: ${this.topic}`); } catch (error) { this.logger.error('Failed to start CDC Consumer', error); } @@ -94,26 +104,88 @@ export class LegacyUserCdcConsumer implements OnModuleInit, OnModuleDestroy { if (!message.value) return; + const offset = BigInt(message.offset); + const idempotencyKey = `${topic}:${offset}`; + try { const cdcEvent: UnwrappedCdcUser = JSON.parse(message.value.toString()); - // 使用 Kafka offset 作为序列号 - const sequenceNum = BigInt(message.offset); - await this.processCdcEvent(cdcEvent, sequenceNum); - } catch (error) { + const op = cdcEvent.__op; + const tableName = cdcEvent.__table || 'user_accounts'; + + this.logger.log(`[CDC] Processing event: topic=${topic}, offset=${offset}, op=${op}`); + + // 使用事务性幂等消费 + await this.processWithIdempotency(topic, offset, tableName, op, cdcEvent); + + this.logger.log(`[CDC] Successfully processed event: ${idempotencyKey}`); + } catch (error: any) { + // 唯一约束冲突 = 事件已处理,跳过 + if (error.code === 'P2002') { + this.logger.debug(`[CDC] Skipping duplicate event: ${idempotencyKey}`); + return; + } this.logger.error( - `Failed to process CDC message from ${topic}[${partition}]`, + `[CDC] Failed to process message from ${topic}[${partition}], offset=${offset}`, error, ); } } - private async processCdcEvent(event: UnwrappedCdcUser, sequenceNum: bigint) { + /** + * 事务性幂等处理 - 100% 保证 exactly-once 语义 + * + * 在同一个数据库事务中完成: + * 1. 尝试插入幂等记录(使用唯一约束防止重复) + * 2. 执行业务逻辑 + * + * 任何步骤失败都会回滚整个事务,保证数据一致性 + */ + private async processWithIdempotency( + topic: string, + offset: bigint, + tableName: string, + operation: string, + event: UnwrappedCdcUser, + ): Promise { + await this.prisma.$transaction(async (tx) => { + // 1. 尝试插入幂等记录(使用唯一约束防止重复) + try { + await tx.processedCdcEvent.create({ + data: { + sourceTopic: topic, + offset: offset, + tableName: tableName, + operation: operation, + }, + }); + } catch (error: any) { + // 唯一约束冲突 = 事件已处理,直接返回(不执行业务逻辑) + if (error.code === 'P2002') { + this.logger.debug(`[CDC] Event already processed: ${topic}:${offset}`); + return; + } + throw error; + } + + // 2. 执行业务逻辑(传入事务客户端) + await this.processCdcEvent(event, offset, tx); + }, { + isolationLevel: Prisma.TransactionIsolationLevel.Serializable, + timeout: 30000, + }); + } + + private async processCdcEvent( + event: UnwrappedCdcUser, + sequenceNum: bigint, + tx: TransactionClient, + ): Promise { const op = event.__op; const isDeleted = event.__deleted === 'true'; - // 处理删除操作(通过 rewrite mode,删除消息包含 __deleted: 'true') + // 处理删除操作 if (isDeleted || op === 'd') { - await this.deleteLegacyUser(event.user_id); + await this.deleteLegacyUser(event.user_id, tx); return; } @@ -122,73 +194,85 @@ export class LegacyUserCdcConsumer implements OnModuleInit, OnModuleDestroy { case 'c': // Create case 'r': // Read (snapshot) case 'u': // Update - await this.upsertLegacyUser(event, sequenceNum, op); + await this.upsertLegacyUser(event, sequenceNum, op, tx); break; } } - private async upsertLegacyUser(user: UnwrappedCdcUser, sequenceNum: bigint, op: string) { - try { - // 检查是否是新用户(不存在于数据库中) - const existingUser = await this.prisma.syncedLegacyUser.findUnique({ - where: { legacyId: BigInt(user.user_id) }, - }); - const isNewUser = !existingUser; + private async upsertLegacyUser( + user: UnwrappedCdcUser, + sequenceNum: bigint, + op: string, + tx: TransactionClient, + ): Promise { + // 检查是否是新用户(不存在于数据库中) + const existingUser = await tx.syncedLegacyUser.findUnique({ + where: { legacyId: BigInt(user.user_id) }, + }); + const isNewUser = !existingUser; - await this.prisma.syncedLegacyUser.upsert({ - where: { legacyId: BigInt(user.user_id) }, - update: { - phone: user.phone_number ?? undefined, - passwordHash: user.password_hash ?? undefined, - nickname: user.nickname ?? undefined, - accountSequence: user.account_sequence, - status: user.status, - sourceSequenceNum: sequenceNum, - syncedAt: new Date(), - }, - create: { - legacyId: BigInt(user.user_id), - phone: user.phone_number ?? null, - passwordHash: user.password_hash ?? null, - nickname: user.nickname ?? null, - accountSequence: user.account_sequence, - status: user.status, - legacyCreatedAt: new Date(user.registered_at), - sourceSequenceNum: sequenceNum, + await tx.syncedLegacyUser.upsert({ + where: { legacyId: BigInt(user.user_id) }, + update: { + phone: user.phone_number ?? undefined, + passwordHash: user.password_hash ?? undefined, + nickname: user.nickname ?? undefined, + accountSequence: user.account_sequence, + status: user.status, + sourceSequenceNum: sequenceNum, + syncedAt: new Date(), + }, + create: { + legacyId: BigInt(user.user_id), + phone: user.phone_number ?? null, + passwordHash: user.password_hash ?? null, + nickname: user.nickname ?? null, + accountSequence: user.account_sequence, + status: user.status, + legacyCreatedAt: new Date(user.registered_at), + sourceSequenceNum: sequenceNum, + }, + }); + + // 只有新创建的用户才发布事件到 outbox(供 mining-admin-service 消费) + // 快照读取 (r) 不发布事件,因为 full-reset 时会通过 publish-all-legacy-users API 统一发布 + // 注意:outbox 事件也在同一事务中创建,保证原子性 + if (isNewUser && op === 'c') { + const migratedEvent = new LegacyUserMigratedEvent( + user.account_sequence, + user.phone_number || '', + user.nickname || '', + new Date(user.registered_at), + ); + // 直接在事务中创建 outbox 记录,保证原子性 + await tx.outboxEvent.create({ + data: { + aggregateType: 'User', + aggregateId: user.account_sequence, + eventType: LegacyUserMigratedEvent.EVENT_TYPE, + payload: migratedEvent.toPayload() as any, + topic: 'auth-events', + key: user.account_sequence, + status: 'PENDING', }, }); - - // 只有新创建的用户才发布事件到 outbox(供 mining-admin-service 消费) - // 快照读取 (r) 不发布事件,因为 full-reset 时会通过 publish-all-legacy-users API 统一发布 - if (isNewUser && op === 'c') { - const event = new LegacyUserMigratedEvent( - user.account_sequence, - user.phone_number || '', - user.nickname || '', - new Date(user.registered_at), - ); - await this.outboxService.publish(event); - this.logger.log(`Published LegacyUserMigratedEvent for new user: ${user.account_sequence}`); - } - - this.logger.debug(`Synced legacy user: ${user.account_sequence}`); - } catch (error) { - this.logger.error(`Failed to upsert legacy user ${user.user_id}`, error); - throw error; + this.logger.log(`[CDC] Created outbox event for new user: ${user.account_sequence}`); } + + this.logger.debug(`[CDC] Synced legacy user: ${user.account_sequence}`); } - private async deleteLegacyUser(legacyId: number) { + private async deleteLegacyUser(legacyId: number, tx: TransactionClient): Promise { try { // 不实际删除,只标记状态 - await this.prisma.syncedLegacyUser.update({ + await tx.syncedLegacyUser.update({ where: { legacyId: BigInt(legacyId) }, data: { status: 'DELETED' }, }); - this.logger.debug(`Marked legacy user as deleted: ${legacyId}`); + this.logger.debug(`[CDC] Marked legacy user as deleted: ${legacyId}`); } catch (error) { - this.logger.error(`Failed to mark legacy user as deleted: ${legacyId}`, error); + this.logger.error(`[CDC] Failed to mark legacy user as deleted: ${legacyId}`, error); } } }