diff --git a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-consumer.service.ts b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-consumer.service.ts index 9f0c7983..9186ef66 100644 --- a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-consumer.service.ts +++ b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-consumer.service.ts @@ -195,7 +195,11 @@ export class CdcConsumerService implements OnModuleInit, OnModuleDestroy { } private isServiceEvent(data: any): boolean { - return data.eventType && data.aggregateType; + // 支持两种格式: + // 1. 驼峰格式 (服务直接发布): eventType, aggregateType + // 2. 下划线格式 (Debezium outbox): event_type, aggregate_type + return (data.eventType && data.aggregateType) || + (data.event_type && data.aggregate_type); } private async handleCdcEvent( @@ -226,8 +230,10 @@ export class CdcConsumerService implements OnModuleInit, OnModuleDestroy { eventData: any, sequenceNum: bigint, ): Promise { + // 规范化事件格式,支持 Debezium outbox 的下划线格式 + const normalizedEvent = this.normalizeServiceEvent(eventData); const event: ServiceEvent = { - ...eventData, + ...normalizedEvent, sequenceNum, }; @@ -248,4 +254,30 @@ export class CdcConsumerService implements OnModuleInit, OnModuleDestroy { } } } + + /** + * 规范化服务事件格式 + * 将 Debezium outbox 的下划线格式转换为驼峰格式 + */ + private normalizeServiceEvent(data: any): Omit { + // 如果已经是驼峰格式,直接返回 + if (data.eventType && data.aggregateType) { + return data; + } + + // Debezium outbox 格式转换 + // 原始格式:{ event_type, aggregate_type, aggregate_id, payload (JSON string), ... } + const payload = typeof data.payload === 'string' + ? JSON.parse(data.payload) + : data.payload; + + return { + id: String(data.id), + eventType: data.event_type, + aggregateType: data.aggregate_type, + aggregateId: data.aggregate_id, + payload, + createdAt: data.created_at ? new Date(data.created_at).toISOString() : new Date().toISOString(), + }; + } }