From 0a199ae3b56c7db371f61d58a363e00d1a546e00 Mon Sep 17 00:00:00 2001 From: hailin Date: Tue, 20 Jan 2026 22:38:42 -0800 Subject: [PATCH] =?UTF-8?q?Revert=20"fix(mining-admin):=20=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D=20CDC=20=E4=BA=8B=E4=BB=B6=E7=BC=BA=E5=B0=91=20eventI?= =?UTF-8?q?d=20=E7=9A=84=E9=97=AE=E9=A2=98"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit fff56e8baafd83cf4f18aa1efcc2e57076fb3936. --- .../kafka/cdc-consumer.service.ts | 28 ++++++------------- .../infrastructure/kafka/cdc-sync.service.ts | 8 ------ 2 files changed, 8 insertions(+), 28 deletions(-) diff --git a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-consumer.service.ts b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-consumer.service.ts index cbfa671a..7611818f 100644 --- a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-consumer.service.ts +++ b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-consumer.service.ts @@ -317,35 +317,23 @@ export class CdcConsumerService implements OnModuleInit, OnModuleDestroy { * 将 Debezium outbox 的下划线格式转换为驼峰格式 */ private normalizeServiceEvent(data: any): Omit { - // 尝试从多种可能的字段名获取事件 ID - // - contribution-service 的 outbox 表主键是 outbox_id - // - mining-wallet-service 等其他服务的 outbox 表主键是 id - // - 直接发布的事件可能使用 eventId 或 id - let eventId = data.outbox_id ?? data.id ?? data.eventId; - - // 如果没有找到事件 ID,使用 aggregateId + timestamp 生成一个 - // 这可以确保幂等性仍然有效(相同的 aggregateId 在同一毫秒内只处理一次) - if (!eventId) { - const aggregateId = data.aggregateId ?? data.aggregate_id ?? 'unknown'; - const timestamp = data.created_at ?? data.createdAt ?? Date.now(); - eventId = `${aggregateId}-${typeof timestamp === 'string' ? new Date(timestamp).getTime() : timestamp}`; - this.logger.warn(`Event missing id, generated fallback: ${eventId}, eventType: ${data.eventType ?? data.event_type}`); - } - - // 如果已经是驼峰格式,确保 id 字段存在 + // 如果已经是驼峰格式,直接返回 if (data.eventType && data.aggregateType) { - return { - ...data, - id: String(eventId), - }; + return data; } // Debezium outbox 格式转换 // 原始格式:{ event_type, aggregate_type, aggregate_id, payload (JSON string), ... } + // 注意:不同服务的 outbox 表主键列名可能不同: + // - contribution-service: outbox_id (mapped from id in Prisma) + // - 其他服务可能使用: id const payload = typeof data.payload === 'string' ? JSON.parse(data.payload) : data.payload; + // 优先使用 outbox_id(contribution-service),回退到 id + const eventId = data.outbox_id ?? data.id; + return { id: String(eventId), eventType: data.event_type, diff --git a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts index b41f9db7..d3babc9a 100644 --- a/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts +++ b/backend/services/mining-admin-service/src/infrastructure/kafka/cdc-sync.service.ts @@ -51,14 +51,6 @@ export class CdcSyncService implements OnModuleInit { */ private withIdempotency(handler: TransactionalServiceEventHandler): ServiceEventHandler { return async (event: ServiceEvent) => { - // 验证事件 ID 存在 - if (!event.id) { - this.logger.error( - `Event missing id, skipping: eventType=${event.eventType}, topic=${event.sourceTopic}`, - ); - return; - } - const idempotencyKey = `${event.sourceTopic}:${event.id}`; try {