fix(mining-admin-service): properly handle Debezium outbox CDC events

- Add isDebeziumOutboxEvent to detect outbox table CDC messages
- Add handleDebeziumOutboxEvent to extract service event from payload.after
- Fix CDC consumer not recognizing events from contribution-service outbox

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-12 07:39:27 -08:00
parent 52c573d507
commit 30e1867eb0
1 changed files with 39 additions and 1 deletions

View File

@ -176,11 +176,19 @@ export class CdcConsumerService implements OnModuleInit, OnModuleDestroy {
// 判断事件类型Debezium CDC 或 服务 Outbox 事件
if (this.isDebeziumEvent(eventData)) {
await this.handleCdcEvent(topic, eventData, sequenceNum);
// Debezium outbox 事件:从 payload.after 提取服务事件
if (this.isDebeziumOutboxEvent(eventData)) {
await this.handleDebeziumOutboxEvent(topic, eventData, sequenceNum);
} else {
// 普通 Debezium CDC 事件(表变更)
await this.handleCdcEvent(topic, eventData, sequenceNum);
}
} else if (this.isServiceEvent(eventData)) {
// 直接发布的服务事件(非 Debezium
await this.handleServiceEvent(topic, eventData, sequenceNum);
} else {
this.logger.warn(`Unknown event format from topic: ${topic}`);
this.logger.debug(`Event data: ${JSON.stringify(eventData).substring(0, 500)}`);
}
} catch (error) {
this.logger.error(
@ -194,6 +202,17 @@ export class CdcConsumerService implements OnModuleInit, OnModuleDestroy {
return data.payload && data.payload.source && data.payload.op;
}
/**
* Debezium outbox
* Debezium outbox_events CDC
*/
private isDebeziumOutboxEvent(data: any): boolean {
const after = data.payload?.after;
if (!after) return false;
// outbox 表有 event_type 字段
return after.event_type && after.aggregate_type;
}
private isServiceEvent(data: any): boolean {
// 支持两种格式:
// 1. 驼峰格式 (服务直接发布): eventType, aggregateType
@ -202,6 +221,25 @@ export class CdcConsumerService implements OnModuleInit, OnModuleDestroy {
(data.event_type && data.aggregate_type);
}
/**
* Debezium outbox
* payload.after
*/
private async handleDebeziumOutboxEvent(
topic: string,
eventData: any,
sequenceNum: bigint,
): Promise<void> {
const op = eventData.payload.op;
// 只处理 create 操作(新增的 outbox 记录)
if (op !== 'c' && op !== 'r') {
return;
}
const after = eventData.payload.after;
await this.handleServiceEvent(topic, after, sequenceNum);
}
private async handleCdcEvent(
topic: string,
eventData: any,