From c05722bbb33bba81977f21f06c086572407f10f3 Mon Sep 17 00:00:00 2001 From: hailin Date: Tue, 9 Dec 2025 03:12:33 -0800 Subject: [PATCH] fix(mpc-service): enable shutdown hooks for graceful Kafka disconnect MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Root cause: When mpc-service restarts, the old Kafka consumer doesn't properly disconnect, causing the broker to wait for sessionTimeout (~5 minutes) before completing rebalance. This blocks app startup. Solution: Enable NestJS shutdown hooks with app.enableShutdownHooks(). This ensures onModuleDestroy() is called on SIGTERM/SIGINT, which calls consumer.disconnect() and allows immediate rebalance on next startup. Also reverted the "don't await consumer.run()" workaround since the proper fix is graceful shutdown. Sources: - https://github.com/tulios/kafkajs/issues/807 - https://kafka.js.org/docs/consuming 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../application/services/event-consumer-starter.service.ts | 6 +----- backend/services/mpc-service/src/main.ts | 4 ++++ 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/backend/services/mpc-service/src/application/services/event-consumer-starter.service.ts b/backend/services/mpc-service/src/application/services/event-consumer-starter.service.ts index bf2b6bf6..e7d4f092 100644 --- a/backend/services/mpc-service/src/application/services/event-consumer-starter.service.ts +++ b/backend/services/mpc-service/src/application/services/event-consumer-starter.service.ts @@ -16,11 +16,7 @@ export class EventConsumerStarterService implements OnApplicationBootstrap { async onApplicationBootstrap() { try { - // Don't await - consumer.run() never resolves (it runs continuously) - // Just fire and forget, let it run in the background - this.eventConsumer.startConsuming().catch((error) => { - this.logger.error('Kafka consumer error', error); - }); + await this.eventConsumer.startConsuming(); this.logger.log('MPC event consumers started successfully'); } catch (error) { this.logger.error('Failed to start MPC event consumers', error); diff --git a/backend/services/mpc-service/src/main.ts b/backend/services/mpc-service/src/main.ts index 5c5b43a2..2c8facde 100644 --- a/backend/services/mpc-service/src/main.ts +++ b/backend/services/mpc-service/src/main.ts @@ -18,6 +18,10 @@ async function bootstrap() { logger: ['error', 'warn', 'log', 'debug', 'verbose'], }); + // Enable graceful shutdown hooks (calls onModuleDestroy on SIGTERM/SIGINT) + // This ensures Kafka consumer disconnects properly, allowing fast rebalance on restart + app.enableShutdownHooks(); + // Get config service const configService = app.get(ConfigService); const port = parseInt(process.env.APP_PORT || '3006', 10);