diff --git a/backend/services/blockchain-service/src/infrastructure/kafka/mpc-event-consumer.service.ts b/backend/services/blockchain-service/src/infrastructure/kafka/mpc-event-consumer.service.ts index cff98b82..d97bf22c 100644 --- a/backend/services/blockchain-service/src/infrastructure/kafka/mpc-event-consumer.service.ts +++ b/backend/services/blockchain-service/src/infrastructure/kafka/mpc-event-consumer.service.ts @@ -72,6 +72,7 @@ export class MpcEventConsumerService implements OnModuleInit, OnModuleDestroy { private kafka: Kafka; private consumer: Consumer; private isConnected = false; + private isShuttingDown = false; private keygenCompletedHandler?: MpcEventHandler; private signingCompletedHandler?: MpcEventHandler; @@ -111,24 +112,71 @@ export class MpcEventConsumerService implements OnModuleInit, OnModuleDestroy { heartbeatInterval: 3000, }); - try { - this.logger.log(`[CONNECT] Connecting MPC Event consumer...`); - await this.consumer.connect(); - this.isConnected = true; - this.logger.log(`[CONNECT] MPC Event Kafka consumer connected successfully`); + // 监听 consumer crash 事件,自动重连 + // 当 Kafka topic-partition 不可用或其他运行时错误导致 consumer 崩溃时触发 + this.consumer.on(this.consumer.events.CRASH, async (event) => { + if (this.isShuttingDown) return; + this.logger.error(`[CRASH] Kafka consumer crashed: ${event.payload.error?.message || 'unknown'}, restart: ${event.payload.restart}`); + // 如果 KafkaJS 内部不自动重启(restart=false),手动触发重连 + if (!event.payload.restart) { + this.logger.warn(`[CRASH] KafkaJS will not auto-restart, triggering manual reconnect...`); + this.isConnected = false; + await this.connectWithRetry(); + } + }); - // Subscribe to MPC topics - await this.consumer.subscribe({ topics: Object.values(MPC_TOPICS), fromBeginning: false }); - this.logger.log(`[SUBSCRIBE] Subscribed to MPC topics: ${Object.values(MPC_TOPICS).join(', ')}`); + await this.connectWithRetry(); + } - // Start consuming - await this.startConsuming(); - } catch (error) { - this.logger.error(`[ERROR] Failed to connect MPC Event Kafka consumer`, error); + /** + * 带指数退避的连接重试逻辑 + * + * 解决问题:服务启动时 Kafka topic-partition 未就绪,导致 subscribe() 抛出 + * "This server does not host this topic-partition" 错误。原实现只 catch 一次就放弃, + * consumer 永久失效,后续所有 MPC 签名结果都收不到(表现为 signing timeout 300s)。 + * + * 策略:指数退避重试,2s→4s→8s→...→60s(上限),最多 10 次,总等待约 5 分钟。 + */ + private async connectWithRetry(maxRetries = 10): Promise { + for (let attempt = 1; attempt <= maxRetries; attempt++) { + if (this.isShuttingDown) return; + + try { + if (!this.isConnected) { + this.logger.log(`[CONNECT] Connecting MPC Event consumer (attempt ${attempt}/${maxRetries})...`); + await this.consumer.connect(); + this.isConnected = true; + this.logger.log(`[CONNECT] MPC Event Kafka consumer connected successfully`); + } + + // Subscribe to MPC topics + await this.consumer.subscribe({ topics: Object.values(MPC_TOPICS), fromBeginning: false }); + this.logger.log(`[SUBSCRIBE] Subscribed to MPC topics: ${Object.values(MPC_TOPICS).join(', ')}`); + + // Start consuming + await this.startConsuming(); + return; // 成功,退出重试循环 + } catch (error: any) { + this.logger.error(`[ERROR] Failed to connect/subscribe Kafka consumer (attempt ${attempt}/${maxRetries}): ${error.message}`); + + if (attempt < maxRetries) { + // 指数退避:2s, 4s, 8s, 16s, 32s, 60s, 60s, ... + const delay = Math.min(2000 * Math.pow(2, attempt - 1), 60000); + this.logger.log(`[RETRY] Will retry in ${delay / 1000}s...`); + await new Promise(resolve => setTimeout(resolve, delay)); + + // 断开连接以清理状态,下次循环重新建立 + try { await this.consumer.disconnect(); } catch (_) {} + this.isConnected = false; + } + } } + + this.logger.error(`[FATAL] Failed to connect Kafka consumer after ${maxRetries} attempts. MPC events will NOT be received!`); } async onModuleDestroy() { + this.isShuttingDown = true; if (this.isConnected) { await this.consumer.disconnect(); this.logger.log('MPC Event Kafka consumer disconnected'); diff --git a/backend/services/identity-service/src/infrastructure/kafka/mpc-event-consumer.service.ts b/backend/services/identity-service/src/infrastructure/kafka/mpc-event-consumer.service.ts index 57bc65bc..910796e3 100644 --- a/backend/services/identity-service/src/infrastructure/kafka/mpc-event-consumer.service.ts +++ b/backend/services/identity-service/src/infrastructure/kafka/mpc-event-consumer.service.ts @@ -81,6 +81,7 @@ export class MpcEventConsumerService implements OnModuleInit, OnModuleDestroy { private kafka: Kafka; private consumer: Consumer; private isConnected = false; + private isShuttingDown = false; private keygenStartedHandler?: MpcEventHandler; private keygenCompletedHandler?: MpcEventHandler; @@ -125,34 +126,71 @@ export class MpcEventConsumerService implements OnModuleInit, OnModuleDestroy { heartbeatInterval: 3000, }); - try { - this.logger.log(`[CONNECT] Connecting MPC Event consumer...`); - await this.consumer.connect(); - this.isConnected = true; - this.logger.log( - `[CONNECT] MPC Event Kafka consumer connected successfully`, - ); + // 监听 consumer crash 事件,自动重连 + // 当 Kafka topic-partition 不可用或其他运行时错误导致 consumer 崩溃时触发 + this.consumer.on(this.consumer.events.CRASH, async (event) => { + if (this.isShuttingDown) return; + this.logger.error(`[CRASH] Kafka consumer crashed: ${event.payload.error?.message || 'unknown'}, restart: ${event.payload.restart}`); + if (!event.payload.restart) { + this.logger.warn(`[CRASH] KafkaJS will not auto-restart, triggering manual reconnect...`); + this.isConnected = false; + await this.connectWithRetry(); + } + }); - // Subscribe to MPC topics - await this.consumer.subscribe({ - topics: Object.values(MPC_TOPICS), - fromBeginning: false, - }); - this.logger.log( - `[SUBSCRIBE] Subscribed to MPC topics: ${Object.values(MPC_TOPICS).join(', ')}`, - ); + await this.connectWithRetry(); + } - // Start consuming - await this.startConsuming(); - } catch (error) { - this.logger.error( - `[ERROR] Failed to connect MPC Event Kafka consumer`, - error, - ); + /** + * 带指数退避的连接重试逻辑 + * + * 解决问题:服务启动时 Kafka topic-partition 未就绪,导致 subscribe() 抛出 + * "This server does not host this topic-partition" 错误。原实现只 catch 一次就放弃, + * consumer 永久失效,后续所有 MPC 事件都收不到。 + * + * 策略:指数退避重试,2s→4s→8s→...→60s(上限),最多 10 次,总等待约 5 分钟。 + */ + private async connectWithRetry(maxRetries = 10): Promise { + for (let attempt = 1; attempt <= maxRetries; attempt++) { + if (this.isShuttingDown) return; + + try { + if (!this.isConnected) { + this.logger.log(`[CONNECT] Connecting MPC Event consumer (attempt ${attempt}/${maxRetries})...`); + await this.consumer.connect(); + this.isConnected = true; + this.logger.log(`[CONNECT] MPC Event Kafka consumer connected successfully`); + } + + await this.consumer.subscribe({ + topics: Object.values(MPC_TOPICS), + fromBeginning: false, + }); + this.logger.log( + `[SUBSCRIBE] Subscribed to MPC topics: ${Object.values(MPC_TOPICS).join(', ')}`, + ); + + await this.startConsuming(); + return; // 成功,退出重试循环 + } catch (error: any) { + this.logger.error(`[ERROR] Failed to connect/subscribe Kafka consumer (attempt ${attempt}/${maxRetries}): ${error.message}`); + + if (attempt < maxRetries) { + const delay = Math.min(2000 * Math.pow(2, attempt - 1), 60000); + this.logger.log(`[RETRY] Will retry in ${delay / 1000}s...`); + await new Promise(resolve => setTimeout(resolve, delay)); + + try { await this.consumer.disconnect(); } catch (_) {} + this.isConnected = false; + } + } } + + this.logger.error(`[FATAL] Failed to connect Kafka consumer after ${maxRetries} attempts. MPC events will NOT be received!`); } async onModuleDestroy() { + this.isShuttingDown = true; if (this.isConnected) { await this.consumer.disconnect(); this.logger.log('MPC Event Kafka consumer disconnected');