From 7ee6d633c63e09317f8710cf93a55e7fe73f09c6 Mon Sep 17 00:00:00 2001 From: hailin Date: Mon, 2 Feb 2026 19:18:10 -0800 Subject: [PATCH] =?UTF-8?q?fix(kafka):=20=E4=BF=AE=E5=A4=8D=20MPC=20Kafka?= =?UTF-8?q?=20consumer=20=E5=90=AF=E5=8A=A8=E8=AE=A2=E9=98=85=E5=A4=B1?= =?UTF-8?q?=E8=B4=A5=E5=90=8E=E4=B8=8D=E9=87=8D=E8=AF=95=E7=9A=84=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 问题现象: - blockchain-service 启动时 Kafka consumer subscribe() 抛出 "This server does not host this topic-partition" 错误 - 原实现只 try-catch 一次就放弃,consumer 永久失效 - 后续所有 MPC 签名结果都收不到,表现为 signing timeout 300s - 需要手动 docker restart 才能恢复 根因分析: - 服务启动时 Kafka broker 可能尚未完成 topic-partition 分配 (特别是容器编排环境中服务启动顺序不确定) - onModuleInit() 中的 connect/subscribe/run 是一次性调用 - KafkaJS 的 retry 配置只作用于内部操作,不覆盖初始连接流程 修复方案: - 新增 connectWithRetry() 方法:指数退避重试(2s→4s→8s→...→60s) 最多 10 次,总等待约 5 分钟 - 每次重试前断开连接清理状态,避免 KafkaJS 内部状态残留 - 监听 consumer CRASH 事件:当 KafkaJS 不自动重启时(restart=false) 手动触发 connectWithRetry() 重连 - 新增 isShuttingDown 标志:防止 onModuleDestroy 时触发无意义的重连 - 同时修复 blockchain-service 和 identity-service 两个 consumer 影响范围: - blockchain-service: 影响 MPC 签名结果接收(热钱包转账) - identity-service: 影响 MPC 密钥生成结果接收(用户钱包创建) Co-Authored-By: Claude Opus 4.5 --- .../kafka/mpc-event-consumer.service.ts | 72 +++++++++++++--- .../kafka/mpc-event-consumer.service.ts | 82 ++++++++++++++----- 2 files changed, 120 insertions(+), 34 deletions(-) diff --git a/backend/services/blockchain-service/src/infrastructure/kafka/mpc-event-consumer.service.ts b/backend/services/blockchain-service/src/infrastructure/kafka/mpc-event-consumer.service.ts index cff98b82..d97bf22c 100644 --- a/backend/services/blockchain-service/src/infrastructure/kafka/mpc-event-consumer.service.ts +++ b/backend/services/blockchain-service/src/infrastructure/kafka/mpc-event-consumer.service.ts @@ -72,6 +72,7 @@ export class MpcEventConsumerService implements OnModuleInit, OnModuleDestroy { private kafka: Kafka; private consumer: Consumer; private isConnected = false; + private isShuttingDown = false; private keygenCompletedHandler?: MpcEventHandler; private signingCompletedHandler?: MpcEventHandler; @@ -111,24 +112,71 @@ export class MpcEventConsumerService implements OnModuleInit, OnModuleDestroy { heartbeatInterval: 3000, }); - try { - this.logger.log(`[CONNECT] Connecting MPC Event consumer...`); - await this.consumer.connect(); - this.isConnected = true; - this.logger.log(`[CONNECT] MPC Event Kafka consumer connected successfully`); + // 监听 consumer crash 事件,自动重连 + // 当 Kafka topic-partition 不可用或其他运行时错误导致 consumer 崩溃时触发 + this.consumer.on(this.consumer.events.CRASH, async (event) => { + if (this.isShuttingDown) return; + this.logger.error(`[CRASH] Kafka consumer crashed: ${event.payload.error?.message || 'unknown'}, restart: ${event.payload.restart}`); + // 如果 KafkaJS 内部不自动重启(restart=false),手动触发重连 + if (!event.payload.restart) { + this.logger.warn(`[CRASH] KafkaJS will not auto-restart, triggering manual reconnect...`); + this.isConnected = false; + await this.connectWithRetry(); + } + }); - // Subscribe to MPC topics - await this.consumer.subscribe({ topics: Object.values(MPC_TOPICS), fromBeginning: false }); - this.logger.log(`[SUBSCRIBE] Subscribed to MPC topics: ${Object.values(MPC_TOPICS).join(', ')}`); + await this.connectWithRetry(); + } - // Start consuming - await this.startConsuming(); - } catch (error) { - this.logger.error(`[ERROR] Failed to connect MPC Event Kafka consumer`, error); + /** + * 带指数退避的连接重试逻辑 + * + * 解决问题:服务启动时 Kafka topic-partition 未就绪,导致 subscribe() 抛出 + * "This server does not host this topic-partition" 错误。原实现只 catch 一次就放弃, + * consumer 永久失效,后续所有 MPC 签名结果都收不到(表现为 signing timeout 300s)。 + * + * 策略:指数退避重试,2s→4s→8s→...→60s(上限),最多 10 次,总等待约 5 分钟。 + */ + private async connectWithRetry(maxRetries = 10): Promise { + for (let attempt = 1; attempt <= maxRetries; attempt++) { + if (this.isShuttingDown) return; + + try { + if (!this.isConnected) { + this.logger.log(`[CONNECT] Connecting MPC Event consumer (attempt ${attempt}/${maxRetries})...`); + await this.consumer.connect(); + this.isConnected = true; + this.logger.log(`[CONNECT] MPC Event Kafka consumer connected successfully`); + } + + // Subscribe to MPC topics + await this.consumer.subscribe({ topics: Object.values(MPC_TOPICS), fromBeginning: false }); + this.logger.log(`[SUBSCRIBE] Subscribed to MPC topics: ${Object.values(MPC_TOPICS).join(', ')}`); + + // Start consuming + await this.startConsuming(); + return; // 成功,退出重试循环 + } catch (error: any) { + this.logger.error(`[ERROR] Failed to connect/subscribe Kafka consumer (attempt ${attempt}/${maxRetries}): ${error.message}`); + + if (attempt < maxRetries) { + // 指数退避:2s, 4s, 8s, 16s, 32s, 60s, 60s, ... + const delay = Math.min(2000 * Math.pow(2, attempt - 1), 60000); + this.logger.log(`[RETRY] Will retry in ${delay / 1000}s...`); + await new Promise(resolve => setTimeout(resolve, delay)); + + // 断开连接以清理状态,下次循环重新建立 + try { await this.consumer.disconnect(); } catch (_) {} + this.isConnected = false; + } + } } + + this.logger.error(`[FATAL] Failed to connect Kafka consumer after ${maxRetries} attempts. MPC events will NOT be received!`); } async onModuleDestroy() { + this.isShuttingDown = true; if (this.isConnected) { await this.consumer.disconnect(); this.logger.log('MPC Event Kafka consumer disconnected'); diff --git a/backend/services/identity-service/src/infrastructure/kafka/mpc-event-consumer.service.ts b/backend/services/identity-service/src/infrastructure/kafka/mpc-event-consumer.service.ts index 57bc65bc..910796e3 100644 --- a/backend/services/identity-service/src/infrastructure/kafka/mpc-event-consumer.service.ts +++ b/backend/services/identity-service/src/infrastructure/kafka/mpc-event-consumer.service.ts @@ -81,6 +81,7 @@ export class MpcEventConsumerService implements OnModuleInit, OnModuleDestroy { private kafka: Kafka; private consumer: Consumer; private isConnected = false; + private isShuttingDown = false; private keygenStartedHandler?: MpcEventHandler; private keygenCompletedHandler?: MpcEventHandler; @@ -125,34 +126,71 @@ export class MpcEventConsumerService implements OnModuleInit, OnModuleDestroy { heartbeatInterval: 3000, }); - try { - this.logger.log(`[CONNECT] Connecting MPC Event consumer...`); - await this.consumer.connect(); - this.isConnected = true; - this.logger.log( - `[CONNECT] MPC Event Kafka consumer connected successfully`, - ); + // 监听 consumer crash 事件,自动重连 + // 当 Kafka topic-partition 不可用或其他运行时错误导致 consumer 崩溃时触发 + this.consumer.on(this.consumer.events.CRASH, async (event) => { + if (this.isShuttingDown) return; + this.logger.error(`[CRASH] Kafka consumer crashed: ${event.payload.error?.message || 'unknown'}, restart: ${event.payload.restart}`); + if (!event.payload.restart) { + this.logger.warn(`[CRASH] KafkaJS will not auto-restart, triggering manual reconnect...`); + this.isConnected = false; + await this.connectWithRetry(); + } + }); - // Subscribe to MPC topics - await this.consumer.subscribe({ - topics: Object.values(MPC_TOPICS), - fromBeginning: false, - }); - this.logger.log( - `[SUBSCRIBE] Subscribed to MPC topics: ${Object.values(MPC_TOPICS).join(', ')}`, - ); + await this.connectWithRetry(); + } - // Start consuming - await this.startConsuming(); - } catch (error) { - this.logger.error( - `[ERROR] Failed to connect MPC Event Kafka consumer`, - error, - ); + /** + * 带指数退避的连接重试逻辑 + * + * 解决问题:服务启动时 Kafka topic-partition 未就绪,导致 subscribe() 抛出 + * "This server does not host this topic-partition" 错误。原实现只 catch 一次就放弃, + * consumer 永久失效,后续所有 MPC 事件都收不到。 + * + * 策略:指数退避重试,2s→4s→8s→...→60s(上限),最多 10 次,总等待约 5 分钟。 + */ + private async connectWithRetry(maxRetries = 10): Promise { + for (let attempt = 1; attempt <= maxRetries; attempt++) { + if (this.isShuttingDown) return; + + try { + if (!this.isConnected) { + this.logger.log(`[CONNECT] Connecting MPC Event consumer (attempt ${attempt}/${maxRetries})...`); + await this.consumer.connect(); + this.isConnected = true; + this.logger.log(`[CONNECT] MPC Event Kafka consumer connected successfully`); + } + + await this.consumer.subscribe({ + topics: Object.values(MPC_TOPICS), + fromBeginning: false, + }); + this.logger.log( + `[SUBSCRIBE] Subscribed to MPC topics: ${Object.values(MPC_TOPICS).join(', ')}`, + ); + + await this.startConsuming(); + return; // 成功,退出重试循环 + } catch (error: any) { + this.logger.error(`[ERROR] Failed to connect/subscribe Kafka consumer (attempt ${attempt}/${maxRetries}): ${error.message}`); + + if (attempt < maxRetries) { + const delay = Math.min(2000 * Math.pow(2, attempt - 1), 60000); + this.logger.log(`[RETRY] Will retry in ${delay / 1000}s...`); + await new Promise(resolve => setTimeout(resolve, delay)); + + try { await this.consumer.disconnect(); } catch (_) {} + this.isConnected = false; + } + } } + + this.logger.error(`[FATAL] Failed to connect Kafka consumer after ${maxRetries} attempts. MPC events will NOT be received!`); } async onModuleDestroy() { + this.isShuttingDown = true; if (this.isConnected) { await this.consumer.disconnect(); this.logger.log('MPC Event Kafka consumer disconnected');