import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { Kafka, Consumer, EachMessagePayload } from 'kafkajs'; /** * CDC 事件结构 (Debezium 格式) */ export interface CdcEvent { schema: any; payload: { before: any | null; after: any | null; source: { version: string; connector: string; name: string; ts_ms: number; snapshot: string; db: string; sequence: string; schema: string; table: string; txId: number; lsn: number; xmin: number | null; }; op: 'c' | 'u' | 'd' | 'r'; // create, update, delete, read (snapshot) ts_ms: number; transaction: any; }; sequenceNum: bigint; } /** * 2.0 服务间事件结构 (Outbox 格式) */ export interface ServiceEvent { id: string; aggregateType: string; aggregateId: string; eventType: string; payload: any; createdAt: string; sequenceNum: bigint; /** 来源 topic,用于构建全局唯一的幂等键 (topic + id) */ sourceTopic: string; } export type CdcHandler = (event: CdcEvent) => Promise; export type ServiceEventHandler = (event: ServiceEvent) => Promise; /** 支持事务的 handler 类型,tx 参数为 Prisma 事务客户端 */ export type TransactionalServiceEventHandler = (event: ServiceEvent, tx: any) => Promise; @Injectable() export class CdcConsumerService implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(CdcConsumerService.name); private kafka: Kafka; private consumer: Consumer; private cdcHandlers: Map = new Map(); private serviceHandlers: Map = new Map(); private isRunning = false; private topics: string[] = []; constructor(private readonly configService: ConfigService) { const brokers = this.configService .get('KAFKA_BROKERS', 'localhost:9092') .split(','); this.kafka = new Kafka({ clientId: 'mining-admin-service-cdc', brokers, }); this.consumer = this.kafka.consumer({ groupId: this.configService.get( 'CDC_CONSUMER_GROUP', 'mining-admin-service-cdc-group', ), }); } async onModuleInit() { // 启动延迟到 CdcSyncService 注册完处理器后 } async onModuleDestroy() { await this.stop(); } /** * 注册 CDC 事件处理器 (1.0 → 2.0 同步) */ registerCdcHandler(tableName: string, handler: CdcHandler): void { this.cdcHandlers.set(tableName, handler); this.logger.log(`Registered CDC handler for table: ${tableName}`); } /** * 注册服务事件处理器 (2.0 服务间同步) */ registerServiceHandler(eventType: string, handler: ServiceEventHandler): void { this.serviceHandlers.set(eventType, handler); this.logger.log(`Registered service event handler for: ${eventType}`); } /** * 添加要订阅的 topic */ addTopic(topic: string): void { if (!this.topics.includes(topic)) { this.topics.push(topic); } } /** * 启动消费者 */ async start(): Promise { if (this.isRunning) { this.logger.warn('CDC consumer is already running'); return; } if (this.topics.length === 0) { this.logger.warn('No topics to subscribe, skipping CDC consumer start'); return; } try { await this.consumer.connect(); this.logger.log('CDC consumer connected'); await this.consumer.subscribe({ topics: this.topics, fromBeginning: true, // 首次启动时全量同步历史数据 }); this.logger.log(`Subscribed to topics: ${this.topics.join(', ')}`); await this.consumer.run({ eachMessage: async (payload: EachMessagePayload) => { await this.handleMessage(payload); }, }); this.isRunning = true; this.logger.log('CDC consumer started'); } catch (error) { this.logger.error('Failed to start CDC consumer', error); // 不抛出错误,允许服务继续运行(CDC 可能暂时不可用) } } /** * 停止消费者 */ async stop(): Promise { if (!this.isRunning) { return; } try { await this.consumer.disconnect(); this.isRunning = false; this.logger.log('CDC consumer stopped'); } catch (error) { this.logger.error('Failed to stop CDC consumer', error); } } private async handleMessage(payload: EachMessagePayload): Promise { const { topic, partition, message } = payload; try { if (!message.value) { return; } const eventData = JSON.parse(message.value.toString()); const sequenceNum = BigInt(message.offset); // 忽略 Debezium 心跳消息 (只有 ts_ms 字段) if (this.isHeartbeatMessage(eventData)) { return; } // 判断事件类型:Debezium CDC 或 服务 Outbox 事件 if (this.isDebeziumEvent(eventData)) { // Debezium outbox 事件:从 payload.after 提取服务事件 if (this.isDebeziumOutboxEvent(eventData)) { await this.handleDebeziumOutboxEvent(topic, eventData, sequenceNum); } else { // 普通 Debezium CDC 事件(表变更) await this.handleCdcEvent(topic, eventData, sequenceNum); } } else if (this.isServiceEvent(eventData)) { // 直接发布的服务事件(非 Debezium) await this.handleServiceEvent(topic, eventData, sequenceNum); } else { this.logger.warn(`Unknown event format from topic: ${topic}`); this.logger.debug(`Event data: ${JSON.stringify(eventData).substring(0, 500)}`); } } catch (error) { this.logger.error( `Error processing message from topic ${topic}, partition ${partition}`, error, ); } } /** * 判断是否为 Debezium 心跳消息 * 心跳消息格式: { ts_ms: number } */ private isHeartbeatMessage(data: any): boolean { const keys = Object.keys(data); return keys.length === 1 && keys[0] === 'ts_ms'; } private isDebeziumEvent(data: any): boolean { return data.payload && data.payload.source && data.payload.op; } /** * 判断是否为 Debezium outbox 事件 * Debezium 监听 outbox_events 表产生的 CDC 消息 */ private isDebeziumOutboxEvent(data: any): boolean { const after = data.payload?.after; if (!after) return false; // outbox 表有 event_type 字段 return after.event_type && after.aggregate_type; } private isServiceEvent(data: any): boolean { // 支持两种格式: // 1. 驼峰格式 (服务直接发布): eventType, aggregateType // 2. 下划线格式 (Debezium outbox): event_type, aggregate_type return (data.eventType && data.aggregateType) || (data.event_type && data.aggregate_type); } /** * 处理 Debezium outbox 事件 * 从 payload.after 提取服务事件并处理 */ private async handleDebeziumOutboxEvent( topic: string, eventData: any, sequenceNum: bigint, ): Promise { const op = eventData.payload.op; // 只处理 create 操作(新增的 outbox 记录) if (op !== 'c' && op !== 'r') { return; } const after = eventData.payload.after; await this.handleServiceEvent(topic, after, sequenceNum); } private async handleCdcEvent( topic: string, eventData: any, sequenceNum: bigint, ): Promise { const event: CdcEvent = { ...eventData, sequenceNum, }; // 从 topic 名称提取表名 (格式: dbserver1.schema.tablename) const parts = topic.split('.'); const tableName = parts[parts.length - 1]; const handler = this.cdcHandlers.get(tableName); if (handler) { await handler(event); this.logger.debug( `Processed CDC event for table ${tableName}, op: ${event.payload.op}`, ); } } private async handleServiceEvent( topic: string, eventData: any, sequenceNum: bigint, ): Promise { // 规范化事件格式,支持 Debezium outbox 的下划线格式 const normalizedEvent = this.normalizeServiceEvent(eventData); const event: ServiceEvent = { ...normalizedEvent, sequenceNum, sourceTopic: topic, }; const handler = this.serviceHandlers.get(event.eventType); if (handler) { await handler(event); this.logger.debug(`Processed service event: ${event.eventType}`); } else { // 尝试通配符处理器 const aggregateHandler = this.serviceHandlers.get( `${event.aggregateType}.*`, ); if (aggregateHandler) { await aggregateHandler(event); this.logger.debug( `Processed service event via wildcard: ${event.eventType}`, ); } } } /** * 规范化服务事件格式 * 将 Debezium outbox 的下划线格式转换为驼峰格式 */ private normalizeServiceEvent(data: any): Omit { // 尝试从多种可能的字段名获取事件 ID // - contribution-service 的 outbox 表主键是 outbox_id // - mining-wallet-service 等其他服务的 outbox 表主键是 id // - 直接发布的事件可能使用 eventId 或 id let eventId = data.outbox_id ?? data.id ?? data.eventId; // 如果没有找到事件 ID,使用 aggregateId + timestamp 生成一个 // 这可以确保幂等性仍然有效(相同的 aggregateId 在同一毫秒内只处理一次) if (!eventId) { const aggregateId = data.aggregateId ?? data.aggregate_id ?? 'unknown'; const timestamp = data.created_at ?? data.createdAt ?? Date.now(); eventId = `${aggregateId}-${typeof timestamp === 'string' ? new Date(timestamp).getTime() : timestamp}`; this.logger.warn(`Event missing id, generated fallback: ${eventId}, eventType: ${data.eventType ?? data.event_type}`); } // 如果已经是驼峰格式,确保 id 字段存在 if (data.eventType && data.aggregateType) { return { ...data, id: String(eventId), }; } // Debezium outbox 格式转换 // 原始格式:{ event_type, aggregate_type, aggregate_id, payload (JSON string), ... } const payload = typeof data.payload === 'string' ? JSON.parse(data.payload) : data.payload; return { id: String(eventId), eventType: data.event_type, aggregateType: data.aggregate_type, aggregateId: data.aggregate_id, payload, createdAt: data.created_at ? new Date(data.created_at).toISOString() : new Date().toISOString(), }; } }