diff --git a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-consumer.service.ts b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-consumer.service.ts index 9186ef66..2a589639 100644 --- a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-consumer.service.ts +++ b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-consumer.service.ts @@ -176,11 +176,19 @@ export class CdcConsumerService implements OnModuleInit, OnModuleDestroy { // 判断事件类型:Debezium CDC 或 服务 Outbox 事件 if (this.isDebeziumEvent(eventData)) { - await this.handleCdcEvent(topic, eventData, sequenceNum); + // Debezium outbox 事件:从 payload.after 提取服务事件 + if (this.isDebeziumOutboxEvent(eventData)) { + await this.handleDebeziumOutboxEvent(topic, eventData, sequenceNum); + } else { + // 普通 Debezium CDC 事件(表变更) + await this.handleCdcEvent(topic, eventData, sequenceNum); + } } else if (this.isServiceEvent(eventData)) { + // 直接发布的服务事件(非 Debezium) await this.handleServiceEvent(topic, eventData, sequenceNum); } else { this.logger.warn(`Unknown event format from topic: ${topic}`); + this.logger.debug(`Event data: ${JSON.stringify(eventData).substring(0, 500)}`); } } catch (error) { this.logger.error( @@ -194,6 +202,17 @@ export class CdcConsumerService implements OnModuleInit, OnModuleDestroy { return data.payload && data.payload.source && data.payload.op; } + /** + * 判断是否为 Debezium outbox 事件 + * Debezium 监听 outbox_events 表产生的 CDC 消息 + */ + private isDebeziumOutboxEvent(data: any): boolean { + const after = data.payload?.after; + if (!after) return false; + // outbox 表有 event_type 字段 + return after.event_type && after.aggregate_type; + } + private isServiceEvent(data: any): boolean { // 支持两种格式: // 1. 驼峰格式 (服务直接发布): eventType, aggregateType @@ -202,6 +221,25 @@ export class CdcConsumerService implements OnModuleInit, OnModuleDestroy { (data.event_type && data.aggregate_type); } + /** + * 处理 Debezium outbox 事件 + * 从 payload.after 提取服务事件并处理 + */ + private async handleDebeziumOutboxEvent( + topic: string, + eventData: any, + sequenceNum: bigint, + ): Promise { + const op = eventData.payload.op; + // 只处理 create 操作(新增的 outbox 记录) + if (op !== 'c' && op !== 'r') { + return; + } + + const after = eventData.payload.after; + await this.handleServiceEvent(topic, after, sequenceNum); + } + private async handleCdcEvent( topic: string, eventData: any,