fix(mining-admin): 兼容 Debezium outbox 消息格式

问题:Debezium 产生的 outbox 消息使用下划线命名(event_type,
aggregate_type),而代码期望驼峰格式(eventType, aggregateType)

解决方案:
- isServiceEvent() 同时检查两种命名格式
- 新增 normalizeServiceEvent() 转换 Debezium 格式到驼峰格式
- 解析 payload JSON 字符串

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-12 01:10:24 -08:00
parent 273f2f1d96
commit 0d06080760
1 changed files with 34 additions and 2 deletions

View File

@ -195,7 +195,11 @@ export class CdcConsumerService implements OnModuleInit, OnModuleDestroy {
}
private isServiceEvent(data: any): boolean {
return data.eventType && data.aggregateType;
// 支持两种格式:
// 1. 驼峰格式 (服务直接发布): eventType, aggregateType
// 2. 下划线格式 (Debezium outbox): event_type, aggregate_type
return (data.eventType && data.aggregateType) ||
(data.event_type && data.aggregate_type);
}
private async handleCdcEvent(
@ -226,8 +230,10 @@ export class CdcConsumerService implements OnModuleInit, OnModuleDestroy {
eventData: any,
sequenceNum: bigint,
): Promise<void> {
// 规范化事件格式,支持 Debezium outbox 的下划线格式
const normalizedEvent = this.normalizeServiceEvent(eventData);
const event: ServiceEvent = {
...eventData,
...normalizedEvent,
sequenceNum,
};
@ -248,4 +254,30 @@ export class CdcConsumerService implements OnModuleInit, OnModuleDestroy {
}
}
}
/**
*
* Debezium outbox 线
*/
private normalizeServiceEvent(data: any): Omit<ServiceEvent, 'sequenceNum'> {
// 如果已经是驼峰格式,直接返回
if (data.eventType && data.aggregateType) {
return data;
}
// Debezium outbox 格式转换
// 原始格式:{ event_type, aggregate_type, aggregate_id, payload (JSON string), ... }
const payload = typeof data.payload === 'string'
? JSON.parse(data.payload)
: data.payload;
return {
id: String(data.id),
eventType: data.event_type,
aggregateType: data.aggregate_type,
aggregateId: data.aggregate_id,
payload,
createdAt: data.created_at ? new Date(data.created_at).toISOString() : new Date().toISOString(),
};
}
}