fix(contribution-service): 修复CDC消息解析以支持Debezium扁平化格式
Debezium配置了ExtractNewRecordState转换,消息格式是扁平化的: - 元数据字段使用__前缀(__op, __table, __source_ts_ms, __deleted) - 业务数据字段直接在根级别 - 修改handleMessage方法正确解析扁平化格式 - 更新CDCEvent接口以匹配实际消息结构 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
4b55c63e71
commit
5f76108579
|
|
@ -2,28 +2,32 @@ import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
|
||||||
import { ConfigService } from '@nestjs/config';
|
import { ConfigService } from '@nestjs/config';
|
||||||
import { Kafka, Consumer, EachMessagePayload } from 'kafkajs';
|
import { Kafka, Consumer, EachMessagePayload } from 'kafkajs';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* CDC 事件接口
|
||||||
|
*
|
||||||
|
* 注意:由于 Debezium 配置了 ExtractNewRecordState 转换(unwrap),
|
||||||
|
* 消息格式是扁平化的,字段直接在根级别,元数据字段以 __ 为前缀。
|
||||||
|
*
|
||||||
|
* 扁平化格式示例:
|
||||||
|
* {
|
||||||
|
* "order_id": 1,
|
||||||
|
* "tree_count": 1,
|
||||||
|
* "account_sequence": "D25122700015",
|
||||||
|
* "__op": "c",
|
||||||
|
* "__table": "planting_orders",
|
||||||
|
* "__source_ts_ms": 1767892060857,
|
||||||
|
* "__deleted": "false"
|
||||||
|
* }
|
||||||
|
*/
|
||||||
export interface CDCEvent {
|
export interface CDCEvent {
|
||||||
schema: any;
|
// 为了兼容性,构造一个 payload 对象
|
||||||
payload: {
|
payload: {
|
||||||
|
op: 'c' | 'u' | 'd' | 'r'; // create, update, delete, read (snapshot)
|
||||||
before: any | null;
|
before: any | null;
|
||||||
after: any | null;
|
after: any | null;
|
||||||
source: {
|
table: string;
|
||||||
version: string;
|
source_ts_ms: number;
|
||||||
connector: string;
|
deleted: boolean;
|
||||||
name: string;
|
|
||||||
ts_ms: number;
|
|
||||||
snapshot: string;
|
|
||||||
db: string;
|
|
||||||
sequence: string;
|
|
||||||
schema: string;
|
|
||||||
table: string;
|
|
||||||
txId: number;
|
|
||||||
lsn: number;
|
|
||||||
xmin: number | null;
|
|
||||||
};
|
|
||||||
op: 'c' | 'u' | 'd' | 'r'; // create, update, delete, read (snapshot)
|
|
||||||
ts_ms: number;
|
|
||||||
transaction: any;
|
|
||||||
};
|
};
|
||||||
// 内部使用:Kafka offset 作为序列号
|
// 内部使用:Kafka offset 作为序列号
|
||||||
sequenceNum: bigint;
|
sequenceNum: bigint;
|
||||||
|
|
@ -133,21 +137,42 @@ export class CDCConsumerService implements OnModuleInit {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const eventData = JSON.parse(message.value.toString());
|
const rawData = JSON.parse(message.value.toString());
|
||||||
|
|
||||||
|
// Debezium ExtractNewRecordState 转换后的扁平化格式
|
||||||
|
// 元数据字段: __op, __table, __source_ts_ms, __deleted
|
||||||
|
// 数据字段直接在根级别
|
||||||
|
const op = rawData.__op || rawData.op;
|
||||||
|
const table = rawData.__table;
|
||||||
|
const sourceTsMs = rawData.__source_ts_ms || 0;
|
||||||
|
const deleted = rawData.__deleted === 'true' || rawData.__deleted === true;
|
||||||
|
|
||||||
|
// 从原始数据中移除元数据字段,剩下的就是业务数据
|
||||||
|
const { __op, __table, __source_ts_ms, __deleted, ...businessData } = rawData;
|
||||||
|
|
||||||
|
// 构造兼容的 CDCEvent 对象
|
||||||
|
// 对于 create/update/read,数据在 after;对于 delete,数据在 before
|
||||||
const event: CDCEvent = {
|
const event: CDCEvent = {
|
||||||
...eventData,
|
payload: {
|
||||||
|
op: op as 'c' | 'u' | 'd' | 'r',
|
||||||
|
before: op === 'd' ? businessData : null,
|
||||||
|
after: op !== 'd' ? businessData : null,
|
||||||
|
table: table,
|
||||||
|
source_ts_ms: sourceTsMs,
|
||||||
|
deleted: deleted,
|
||||||
|
},
|
||||||
sequenceNum: BigInt(message.offset),
|
sequenceNum: BigInt(message.offset),
|
||||||
};
|
};
|
||||||
|
|
||||||
// 从 topic 名称提取表名
|
// 从 topic 名称提取表名作为备选
|
||||||
// 格式通常是: dbserver1.schema.tablename
|
// 格式: cdc.<service>.public.<table_name>
|
||||||
const parts = topic.split('.');
|
const parts = topic.split('.');
|
||||||
const tableName = parts[parts.length - 1];
|
const tableName = table || parts[parts.length - 1];
|
||||||
|
|
||||||
const handler = this.handlers.get(tableName);
|
const handler = this.handlers.get(tableName);
|
||||||
if (handler) {
|
if (handler) {
|
||||||
await handler(event);
|
await handler(event);
|
||||||
this.logger.debug(`Processed CDC event for table ${tableName}, op: ${event.payload.op}`);
|
this.logger.debug(`Processed CDC event for table ${tableName}, op: ${op}`);
|
||||||
} else {
|
} else {
|
||||||
this.logger.warn(`No handler registered for table: ${tableName}`);
|
this.logger.warn(`No handler registered for table: ${tableName}`);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue