import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { Kafka, Consumer, EachMessagePayload } from 'kafkajs'; export interface CDCEvent { schema: any; payload: { before: any | null; after: any | null; source: { version: string; connector: string; name: string; ts_ms: number; snapshot: string; db: string; sequence: string; schema: string; table: string; txId: number; lsn: number; xmin: number | null; }; op: 'c' | 'u' | 'd' | 'r'; // create, update, delete, read (snapshot) ts_ms: number; transaction: any; }; // 内部使用:Kafka offset 作为序列号 sequenceNum: bigint; } export type CDCHandler = (event: CDCEvent) => Promise; @Injectable() export class CDCConsumerService implements OnModuleInit { private readonly logger = new Logger(CDCConsumerService.name); private kafka: Kafka; private consumer: Consumer; private handlers: Map = new Map(); private isRunning = false; constructor(private readonly configService: ConfigService) { const brokers = this.configService.get('KAFKA_BROKERS', 'localhost:9092').split(','); this.kafka = new Kafka({ clientId: 'contribution-service-cdc', brokers, }); this.consumer = this.kafka.consumer({ groupId: 'contribution-service-cdc-group', }); } async onModuleInit() { // 不在这里启动,等待注册处理器后再启动 } /** * 注册 CDC 事件处理器 * @param tableName 表名(如 "users", "adoptions", "referrals") * @param handler 处理函数 */ registerHandler(tableName: string, handler: CDCHandler): void { this.handlers.set(tableName, handler); this.logger.log(`Registered CDC handler for table: ${tableName}`); } /** * 启动消费者 */ async start(): Promise { if (this.isRunning) { this.logger.warn('CDC consumer is already running'); return; } try { await this.consumer.connect(); this.logger.log('CDC consumer connected'); // 订阅 Debezium CDC topics (从1.0服务同步) const topics = [ // 认种订单表 (planting-service: planting_orders) this.configService.get('CDC_TOPIC_ADOPTIONS', 'cdc.planting.public.planting_orders'), // 推荐关系表 (referral-service: referral_relationships) this.configService.get('CDC_TOPIC_REFERRALS', 'cdc.referral.public.referral_relationships'), ]; await this.consumer.subscribe({ topics, fromBeginning: false, }); this.logger.log(`Subscribed to topics: ${topics.join(', ')}`); await this.consumer.run({ eachMessage: async (payload: EachMessagePayload) => { await this.handleMessage(payload); }, }); this.isRunning = true; this.logger.log('CDC consumer started'); } catch (error) { this.logger.error('Failed to start CDC consumer', error); throw error; } } /** * 停止消费者 */ async stop(): Promise { if (!this.isRunning) { return; } try { await this.consumer.disconnect(); this.isRunning = false; this.logger.log('CDC consumer stopped'); } catch (error) { this.logger.error('Failed to stop CDC consumer', error); throw error; } } private async handleMessage(payload: EachMessagePayload): Promise { const { topic, partition, message } = payload; try { if (!message.value) { return; } const eventData = JSON.parse(message.value.toString()); const event: CDCEvent = { ...eventData, sequenceNum: BigInt(message.offset), }; // 从 topic 名称提取表名 // 格式通常是: dbserver1.schema.tablename const parts = topic.split('.'); const tableName = parts[parts.length - 1]; const handler = this.handlers.get(tableName); if (handler) { await handler(event); this.logger.debug(`Processed CDC event for table ${tableName}, op: ${event.payload.op}`); } else { this.logger.warn(`No handler registered for table: ${tableName}`); } } catch (error) { this.logger.error( `Error processing CDC message from topic ${topic}, partition ${partition}`, error, ); // 根据业务需求决定是否重试或记录到死信队列 } } }