From fff56e8baafd83cf4f18aa1efcc2e57076fb3936 Mon Sep 17 00:00:00 2001 From: hailin Date: Tue, 20 Jan 2026 22:09:34 -0800 Subject: [PATCH] =?UTF-8?q?fix(mining-admin):=20=E4=BF=AE=E5=A4=8D=20CDC?= =?UTF-8?q?=20=E4=BA=8B=E4=BB=B6=E7=BC=BA=E5=B0=91=20eventId=20=E7=9A=84?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 在 normalizeServiceEvent 中添加对多种 id 字段的支持 - 当事件缺少 id 时,使用 aggregateId + timestamp 生成备用 ID - 在 withIdempotency 中添加 event.id 验证,避免创建无效记录 - 修复驼峰格式事件可能没有 id 字段的问题 Co-Authored-By: Claude Opus 4.5 --- .../kafka/cdc-consumer.service.ts | 28 +++++++++++++------ .../infrastructure/kafka/cdc-sync.service.ts | 8 ++++++ 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-consumer.service.ts b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-consumer.service.ts index 7611818f..cbfa671a 100644 --- a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-consumer.service.ts +++ b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-consumer.service.ts @@ -317,23 +317,35 @@ export class CdcConsumerService implements OnModuleInit, OnModuleDestroy { * 将 Debezium outbox 的下划线格式转换为驼峰格式 */ private normalizeServiceEvent(data: any): Omit { - // 如果已经是驼峰格式,直接返回 + // 尝试从多种可能的字段名获取事件 ID + // - contribution-service 的 outbox 表主键是 outbox_id + // - mining-wallet-service 等其他服务的 outbox 表主键是 id + // - 直接发布的事件可能使用 eventId 或 id + let eventId = data.outbox_id ?? data.id ?? data.eventId; + + // 如果没有找到事件 ID,使用 aggregateId + timestamp 生成一个 + // 这可以确保幂等性仍然有效(相同的 aggregateId 在同一毫秒内只处理一次) + if (!eventId) { + const aggregateId = data.aggregateId ?? data.aggregate_id ?? 'unknown'; + const timestamp = data.created_at ?? data.createdAt ?? Date.now(); + eventId = `${aggregateId}-${typeof timestamp === 'string' ? new Date(timestamp).getTime() : timestamp}`; + this.logger.warn(`Event missing id, generated fallback: ${eventId}, eventType: ${data.eventType ?? data.event_type}`); + } + + // 如果已经是驼峰格式,确保 id 字段存在 if (data.eventType && data.aggregateType) { - return data; + return { + ...data, + id: String(eventId), + }; } // Debezium outbox 格式转换 // 原始格式:{ event_type, aggregate_type, aggregate_id, payload (JSON string), ... } - // 注意:不同服务的 outbox 表主键列名可能不同: - // - contribution-service: outbox_id (mapped from id in Prisma) - // - 其他服务可能使用: id const payload = typeof data.payload === 'string' ? JSON.parse(data.payload) : data.payload; - // 优先使用 outbox_id(contribution-service),回退到 id - const eventId = data.outbox_id ?? data.id; - return { id: String(eventId), eventType: data.event_type, diff --git a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts index d3babc9a..b41f9db7 100644 --- a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts +++ b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts @@ -51,6 +51,14 @@ export class CdcSyncService implements OnModuleInit { */ private withIdempotency(handler: TransactionalServiceEventHandler): ServiceEventHandler { return async (event: ServiceEvent) => { + // 验证事件 ID 存在 + if (!event.id) { + this.logger.error( + `Event missing id, skipping: eventType=${event.eventType}, topic=${event.sourceTopic}`, + ); + return; + } + const idempotencyKey = `${event.sourceTopic}:${event.id}`; try {