diff --git a/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts b/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts index 1d9b1de6..9b10f246 100644 --- a/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts +++ b/backend/services/contribution-service/src/infrastructure/kafka/cdc-consumer.service.ts @@ -2,28 +2,32 @@ import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { Kafka, Consumer, EachMessagePayload } from 'kafkajs'; +/** + * CDC 事件接口 + * + * 注意:由于 Debezium 配置了 ExtractNewRecordState 转换(unwrap), + * 消息格式是扁平化的,字段直接在根级别,元数据字段以 __ 为前缀。 + * + * 扁平化格式示例: + * { + * "order_id": 1, + * "tree_count": 1, + * "account_sequence": "D25122700015", + * "__op": "c", + * "__table": "planting_orders", + * "__source_ts_ms": 1767892060857, + * "__deleted": "false" + * } + */ export interface CDCEvent { - schema: any; + // 为了兼容性,构造一个 payload 对象 payload: { + op: 'c' | 'u' | 'd' | 'r'; // create, update, delete, read (snapshot) before: any | null; after: any | null; - source: { - version: string; - connector: string; - name: string; - ts_ms: number; - snapshot: string; - db: string; - sequence: string; - schema: string; - table: string; - txId: number; - lsn: number; - xmin: number | null; - }; - op: 'c' | 'u' | 'd' | 'r'; // create, update, delete, read (snapshot) - ts_ms: number; - transaction: any; + table: string; + source_ts_ms: number; + deleted: boolean; }; // 内部使用:Kafka offset 作为序列号 sequenceNum: bigint; @@ -133,21 +137,42 @@ export class CDCConsumerService implements OnModuleInit { return; } - const eventData = JSON.parse(message.value.toString()); + const rawData = JSON.parse(message.value.toString()); + + // Debezium ExtractNewRecordState 转换后的扁平化格式 + // 元数据字段: __op, __table, __source_ts_ms, __deleted + // 数据字段直接在根级别 + const op = rawData.__op || rawData.op; + const table = rawData.__table; + const sourceTsMs = rawData.__source_ts_ms || 0; + const deleted = rawData.__deleted === 'true' || rawData.__deleted === true; + + // 从原始数据中移除元数据字段,剩下的就是业务数据 + const { __op, __table, __source_ts_ms, __deleted, ...businessData } = rawData; + + // 构造兼容的 CDCEvent 对象 + // 对于 create/update/read,数据在 after;对于 delete,数据在 before const event: CDCEvent = { - ...eventData, + payload: { + op: op as 'c' | 'u' | 'd' | 'r', + before: op === 'd' ? businessData : null, + after: op !== 'd' ? businessData : null, + table: table, + source_ts_ms: sourceTsMs, + deleted: deleted, + }, sequenceNum: BigInt(message.offset), }; - // 从 topic 名称提取表名 - // 格式通常是: dbserver1.schema.tablename + // 从 topic 名称提取表名作为备选 + // 格式: cdc..public. const parts = topic.split('.'); - const tableName = parts[parts.length - 1]; + const tableName = table || parts[parts.length - 1]; const handler = this.handlers.get(tableName); if (handler) { await handler(event); - this.logger.debug(`Processed CDC event for table ${tableName}, op: ${event.payload.op}`); + this.logger.debug(`Processed CDC event for table ${tableName}, op: ${op}`); } else { this.logger.warn(`No handler registered for table: ${tableName}`); }