import { Injectable, OnModuleInit, OnModuleDestroy, Logger } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { Kafka, Consumer, EachMessagePayload } from 'kafkajs'; import { Prisma, PrismaClient } from '@prisma/client'; import { PrismaService } from '@/infrastructure/persistence/prisma/prisma.service'; import { LegacyUserMigratedEvent } from '@/domain'; /** Prisma 事务客户端类型 */ type TransactionClient = Omit< PrismaClient, '$connect' | '$disconnect' | '$on' | '$transaction' | '$use' | '$extends' >; /** * ExtractNewRecordState 转换后的消息格式 * 字段来自 identity-service 的 user_accounts 表 + Debezium 元数据 */ interface UnwrappedCdcUser { // 1.0 identity-service user_accounts 表字段 user_id: number; phone_number: string; password_hash: string; account_sequence: string; nickname: string; // 昵称 status: string; registered_at: number; // timestamp in milliseconds // Debezium ExtractNewRecordState 添加的元数据字段 __op: 'c' | 'u' | 'd' | 'r'; // create, update, delete, read (snapshot) __table: string; __source_ts_ms: number; __deleted?: string; // 'true' for tombstone messages (delete with rewrite mode) } /** * CDC Consumer - 消费 1.0 用户变更事件 * 监听 Debezium 发送的 CDC 事件,同步到 synced_legacy_users 表 * * 实现事务性幂等消费(Transactional Idempotent Consumer)确保: * - 每个 CDC 事件只处理一次(exactly-once 语义) * - 幂等记录(processed_cdc_events)和业务逻辑在同一事务中执行 * - 任何失败都会导致整个事务回滚 */ @Injectable() export class LegacyUserCdcConsumer implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(LegacyUserCdcConsumer.name); private kafka: Kafka; private consumer: Consumer; private isConnected = false; private topic: string; constructor( private readonly configService: ConfigService, private readonly prisma: PrismaService, ) { const brokers = this.configService.get('KAFKA_BROKERS', 'localhost:9092').split(','); this.kafka = new Kafka({ clientId: 'auth-service-cdc', brokers, }); this.consumer = this.kafka.consumer({ groupId: this.configService.get('CDC_CONSUMER_GROUP', 'auth-service-cdc-group'), }); this.topic = this.configService.get('CDC_TOPIC_USERS', 'cdc.identity.public.user_accounts'); } async onModuleInit() { // 开发环境可选择不启动 CDC if (this.configService.get('CDC_ENABLED', 'true') !== 'true') { this.logger.log('CDC Consumer is disabled'); return; } try { await this.consumer.connect(); this.isConnected = true; await this.consumer.subscribe({ topic: this.topic, fromBeginning: true }); await this.consumer.run({ eachMessage: async (payload) => { await this.handleMessage(payload); }, }); this.logger.log(`CDC Consumer started with transactional idempotency, listening to topic: ${this.topic}`); } catch (error) { this.logger.error('Failed to start CDC Consumer', error); } } async onModuleDestroy() { if (this.isConnected) { await this.consumer.disconnect(); this.logger.log('CDC Consumer disconnected'); } } private async handleMessage(payload: EachMessagePayload) { const { topic, partition, message } = payload; if (!message.value) return; const offset = BigInt(message.offset); const idempotencyKey = `${topic}:${offset}`; try { const cdcEvent: UnwrappedCdcUser = JSON.parse(message.value.toString()); const op = cdcEvent.__op; const tableName = cdcEvent.__table || 'user_accounts'; this.logger.log(`[CDC] Processing event: topic=${topic}, offset=${offset}, op=${op}`); // 使用事务性幂等消费 await this.processWithIdempotency(topic, offset, tableName, op, cdcEvent); this.logger.log(`[CDC] Successfully processed event: ${idempotencyKey}`); } catch (error: any) { // 唯一约束冲突 = 事件已处理,跳过 if (error.code === 'P2002') { this.logger.debug(`[CDC] Skipping duplicate event: ${idempotencyKey}`); return; } this.logger.error( `[CDC] Failed to process message from ${topic}[${partition}], offset=${offset}`, error, ); } } /** * 事务性幂等处理 - 100% 保证 exactly-once 语义 * * 在同一个数据库事务中完成: * 1. 尝试插入幂等记录(使用唯一约束防止重复) * 2. 执行业务逻辑 * * 任何步骤失败都会回滚整个事务,保证数据一致性 */ private async processWithIdempotency( topic: string, offset: bigint, tableName: string, operation: string, event: UnwrappedCdcUser, ): Promise { await this.prisma.$transaction(async (tx) => { // 1. 尝试插入幂等记录(使用唯一约束防止重复) try { await tx.processedCdcEvent.create({ data: { sourceTopic: topic, offset: offset, tableName: tableName, operation: operation, }, }); } catch (error: any) { // 唯一约束冲突 = 事件已处理,直接返回(不执行业务逻辑) if (error.code === 'P2002') { this.logger.debug(`[CDC] Event already processed: ${topic}:${offset}`); return; } throw error; } // 2. 执行业务逻辑(传入事务客户端) await this.processCdcEvent(event, offset, tx); }, { isolationLevel: Prisma.TransactionIsolationLevel.Serializable, timeout: 30000, }); } private async processCdcEvent( event: UnwrappedCdcUser, sequenceNum: bigint, tx: TransactionClient, ): Promise { const op = event.__op; const isDeleted = event.__deleted === 'true'; // 处理删除操作 if (isDeleted || op === 'd') { await this.deleteLegacyUser(event.user_id, tx); return; } // 处理创建、更新、快照读取 switch (op) { case 'c': // Create case 'r': // Read (snapshot) case 'u': // Update await this.upsertLegacyUser(event, sequenceNum, op, tx); break; } } private async upsertLegacyUser( user: UnwrappedCdcUser, sequenceNum: bigint, op: string, tx: TransactionClient, ): Promise { // 检查是否是新用户(不存在于数据库中) const existingUser = await tx.syncedLegacyUser.findUnique({ where: { legacyId: BigInt(user.user_id) }, }); const isNewUser = !existingUser; await tx.syncedLegacyUser.upsert({ where: { legacyId: BigInt(user.user_id) }, update: { phone: user.phone_number ?? undefined, passwordHash: user.password_hash ?? undefined, nickname: user.nickname ?? undefined, accountSequence: user.account_sequence, status: user.status, sourceSequenceNum: sequenceNum, syncedAt: new Date(), }, create: { legacyId: BigInt(user.user_id), phone: user.phone_number ?? null, passwordHash: user.password_hash ?? null, nickname: user.nickname ?? null, accountSequence: user.account_sequence, status: user.status, legacyCreatedAt: new Date(user.registered_at), sourceSequenceNum: sequenceNum, }, }); // 只有新创建的用户才发布事件到 outbox(供 mining-admin-service 消费) // 快照读取 (r) 不发布事件,因为 full-reset 时会通过 publish-all-legacy-users API 统一发布 // 注意:outbox 事件也在同一事务中创建,保证原子性 if (isNewUser && op === 'c') { const migratedEvent = new LegacyUserMigratedEvent( user.account_sequence, user.phone_number || '', user.nickname || '', new Date(user.registered_at), ); // 直接在事务中创建 outbox 记录,保证原子性 await tx.outboxEvent.create({ data: { aggregateType: 'User', aggregateId: user.account_sequence, eventType: LegacyUserMigratedEvent.EVENT_TYPE, payload: migratedEvent.toPayload() as any, topic: 'auth-events', key: user.account_sequence, status: 'PENDING', }, }); this.logger.log(`[CDC] Created outbox event for new user: ${user.account_sequence}`); } this.logger.debug(`[CDC] Synced legacy user: ${user.account_sequence}`); } private async deleteLegacyUser(legacyId: number, tx: TransactionClient): Promise { try { // 不实际删除,只标记状态 await tx.syncedLegacyUser.update({ where: { legacyId: BigInt(legacyId) }, data: { status: 'DELETED' }, }); this.logger.debug(`[CDC] Marked legacy user as deleted: ${legacyId}`); } catch (error) { this.logger.error(`[CDC] Failed to mark legacy user as deleted: ${legacyId}`, error); } } }