diff --git a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-consumer.service.ts b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-consumer.service.ts index 2a589639..826db48c 100644 --- a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-consumer.service.ts +++ b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-consumer.service.ts @@ -174,6 +174,11 @@ export class CdcConsumerService implements OnModuleInit, OnModuleDestroy { const eventData = JSON.parse(message.value.toString()); const sequenceNum = BigInt(message.offset); + // 忽略 Debezium 心跳消息 (只有 ts_ms 字段) + if (this.isHeartbeatMessage(eventData)) { + return; + } + // 判断事件类型:Debezium CDC 或 服务 Outbox 事件 if (this.isDebeziumEvent(eventData)) { // Debezium outbox 事件:从 payload.after 提取服务事件 @@ -198,6 +203,15 @@ export class CdcConsumerService implements OnModuleInit, OnModuleDestroy { } } + /** + * 判断是否为 Debezium 心跳消息 + * 心跳消息格式: { ts_ms: number } + */ + private isHeartbeatMessage(data: any): boolean { + const keys = Object.keys(data); + return keys.length === 1 && keys[0] === 'ts_ms'; + } + private isDebeziumEvent(data: any): boolean { return data.payload && data.payload.source && data.payload.op; }