fix(mpc-service): enable shutdown hooks for graceful Kafka disconnect
Root cause: When mpc-service restarts, the old Kafka consumer doesn't properly disconnect, causing the broker to wait for sessionTimeout (~5 minutes) before completing rebalance. This blocks app startup. Solution: Enable NestJS shutdown hooks with app.enableShutdownHooks(). This ensures onModuleDestroy() is called on SIGTERM/SIGINT, which calls consumer.disconnect() and allows immediate rebalance on next startup. Also reverted the "don't await consumer.run()" workaround since the proper fix is graceful shutdown. Sources: - https://github.com/tulios/kafkajs/issues/807 - https://kafka.js.org/docs/consuming 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
af896db36c
commit
c05722bbb3
|
|
@ -16,11 +16,7 @@ export class EventConsumerStarterService implements OnApplicationBootstrap {
|
||||||
|
|
||||||
async onApplicationBootstrap() {
|
async onApplicationBootstrap() {
|
||||||
try {
|
try {
|
||||||
// Don't await - consumer.run() never resolves (it runs continuously)
|
await this.eventConsumer.startConsuming();
|
||||||
// Just fire and forget, let it run in the background
|
|
||||||
this.eventConsumer.startConsuming().catch((error) => {
|
|
||||||
this.logger.error('Kafka consumer error', error);
|
|
||||||
});
|
|
||||||
this.logger.log('MPC event consumers started successfully');
|
this.logger.log('MPC event consumers started successfully');
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
this.logger.error('Failed to start MPC event consumers', error);
|
this.logger.error('Failed to start MPC event consumers', error);
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,10 @@ async function bootstrap() {
|
||||||
logger: ['error', 'warn', 'log', 'debug', 'verbose'],
|
logger: ['error', 'warn', 'log', 'debug', 'verbose'],
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Enable graceful shutdown hooks (calls onModuleDestroy on SIGTERM/SIGINT)
|
||||||
|
// This ensures Kafka consumer disconnects properly, allowing fast rebalance on restart
|
||||||
|
app.enableShutdownHooks();
|
||||||
|
|
||||||
// Get config service
|
// Get config service
|
||||||
const configService = app.get(ConfigService);
|
const configService = app.get(ConfigService);
|
||||||
const port = parseInt(process.env.APP_PORT || '3006', 10);
|
const port = parseInt(process.env.APP_PORT || '3006', 10);
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue