import { Injectable, OnModuleInit, OnModuleDestroy, Logger } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { Kafka, Consumer, EachMessagePayload } from 'kafkajs'; import { PrismaService } from '@/infrastructure/persistence/prisma/prisma.service'; interface CdcUserPayload { before: CdcUser | null; after: CdcUser | null; source: { sequence: string; }; op: 'c' | 'u' | 'd' | 'r'; // create, update, delete, read (snapshot) } interface CdcUser { id: number; phone: string; password_hash: string; account_sequence: string; status: string; created_at: number; } /** * CDC Consumer - 消费 1.0 用户变更事件 * 监听 Debezium 发送的 CDC 事件,同步到 synced_legacy_users 表 */ @Injectable() export class LegacyUserCdcConsumer implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(LegacyUserCdcConsumer.name); private kafka: Kafka; private consumer: Consumer; private isConnected = false; constructor( private readonly configService: ConfigService, private readonly prisma: PrismaService, ) { const brokers = this.configService.get('KAFKA_BROKERS', 'localhost:9092').split(','); this.kafka = new Kafka({ clientId: 'auth-service-cdc', brokers, }); this.consumer = this.kafka.consumer({ groupId: this.configService.get('CDC_CONSUMER_GROUP', 'auth-service-cdc-group'), }); } async onModuleInit() { // 开发环境可选择不启动 CDC if (this.configService.get('CDC_ENABLED', 'true') !== 'true') { this.logger.log('CDC Consumer is disabled'); return; } try { await this.consumer.connect(); this.isConnected = true; const topic = this.configService.get('CDC_TOPIC_USERS', 'dbserver1.public.users'); await this.consumer.subscribe({ topic, fromBeginning: true }); await this.consumer.run({ eachMessage: async (payload) => { await this.handleMessage(payload); }, }); this.logger.log(`CDC Consumer started, listening to topic: ${topic}`); } catch (error) { this.logger.error('Failed to start CDC Consumer', error); } } async onModuleDestroy() { if (this.isConnected) { await this.consumer.disconnect(); this.logger.log('CDC Consumer disconnected'); } } private async handleMessage(payload: EachMessagePayload) { const { topic, partition, message } = payload; if (!message.value) return; try { const cdcEvent: CdcUserPayload = JSON.parse(message.value.toString()); await this.processCdcEvent(cdcEvent); } catch (error) { this.logger.error( `Failed to process CDC message from ${topic}[${partition}]`, error, ); } } private async processCdcEvent(event: CdcUserPayload) { const { before, after, source, op } = event; switch (op) { case 'c': // Create case 'r': // Read (snapshot) if (after) { await this.upsertLegacyUser(after, BigInt(source.sequence)); } break; case 'u': // Update if (after) { await this.upsertLegacyUser(after, BigInt(source.sequence)); } break; case 'd': // Delete if (before) { await this.deleteLegacyUser(before.id); } break; } } private async upsertLegacyUser(user: CdcUser, sequenceNum: bigint) { try { await this.prisma.syncedLegacyUser.upsert({ where: { legacyId: BigInt(user.id) }, update: { phone: user.phone, passwordHash: user.password_hash, accountSequence: user.account_sequence, status: user.status, sourceSequenceNum: sequenceNum, syncedAt: new Date(), }, create: { legacyId: BigInt(user.id), phone: user.phone, passwordHash: user.password_hash, accountSequence: user.account_sequence, status: user.status, legacyCreatedAt: new Date(user.created_at), sourceSequenceNum: sequenceNum, }, }); this.logger.debug(`Synced legacy user: ${user.account_sequence}`); } catch (error) { this.logger.error(`Failed to upsert legacy user ${user.id}`, error); throw error; } } private async deleteLegacyUser(legacyId: number) { try { // 不实际删除,只标记状态 await this.prisma.syncedLegacyUser.update({ where: { legacyId: BigInt(legacyId) }, data: { status: 'DELETED' }, }); this.logger.debug(`Marked legacy user as deleted: ${legacyId}`); } catch (error) { this.logger.error(`Failed to mark legacy user as deleted: ${legacyId}`, error); } } }