From 0d0608076079953f20a0aedf0669f5556b86cbf3 Mon Sep 17 00:00:00 2001 From: hailin Date: Mon, 12 Jan 2026 01:10:24 -0800 Subject: [PATCH] =?UTF-8?q?fix(mining-admin):=20=E5=85=BC=E5=AE=B9=20Debez?= =?UTF-8?q?ium=20outbox=20=E6=B6=88=E6=81=AF=E6=A0=BC=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 问题:Debezium 产生的 outbox 消息使用下划线命名(event_type, aggregate_type),而代码期望驼峰格式(eventType, aggregateType) 解决方案: - isServiceEvent() 同时检查两种命名格式 - 新增 normalizeServiceEvent() 转换 Debezium 格式到驼峰格式 - 解析 payload JSON 字符串 Co-Authored-By: Claude Opus 4.5 --- .../kafka/cdc-consumer.service.ts | 36 +++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-consumer.service.ts b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-consumer.service.ts index 9f0c7983..9186ef66 100644 --- a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-consumer.service.ts +++ b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-consumer.service.ts @@ -195,7 +195,11 @@ export class CdcConsumerService implements OnModuleInit, OnModuleDestroy { } private isServiceEvent(data: any): boolean { - return data.eventType && data.aggregateType; + // 支持两种格式: + // 1. 驼峰格式 (服务直接发布): eventType, aggregateType + // 2. 下划线格式 (Debezium outbox): event_type, aggregate_type + return (data.eventType && data.aggregateType) || + (data.event_type && data.aggregate_type); } private async handleCdcEvent( @@ -226,8 +230,10 @@ export class CdcConsumerService implements OnModuleInit, OnModuleDestroy { eventData: any, sequenceNum: bigint, ): Promise { + // 规范化事件格式,支持 Debezium outbox 的下划线格式 + const normalizedEvent = this.normalizeServiceEvent(eventData); const event: ServiceEvent = { - ...eventData, + ...normalizedEvent, sequenceNum, }; @@ -248,4 +254,30 @@ export class CdcConsumerService implements OnModuleInit, OnModuleDestroy { } } } + + /** + * 规范化服务事件格式 + * 将 Debezium outbox 的下划线格式转换为驼峰格式 + */ + private normalizeServiceEvent(data: any): Omit { + // 如果已经是驼峰格式,直接返回 + if (data.eventType && data.aggregateType) { + return data; + } + + // Debezium outbox 格式转换 + // 原始格式:{ event_type, aggregate_type, aggregate_id, payload (JSON string), ... } + const payload = typeof data.payload === 'string' + ? JSON.parse(data.payload) + : data.payload; + + return { + id: String(data.id), + eventType: data.event_type, + aggregateType: data.aggregate_type, + aggregateId: data.aggregate_id, + payload, + createdAt: data.created_at ? new Date(data.created_at).toISOString() : new Date().toISOString(), + }; + } }