From f36fc0e8b78c9fb4aa693433f59b135fa3d0645b Mon Sep 17 00:00:00 2001 From: hailin Date: Wed, 10 Dec 2025 09:55:58 -0800 Subject: [PATCH] fix(services): add Kafka microservice for @MessagePattern to work MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The @MessagePattern decorator requires connectMicroservice() to work. Without this configuration, services could not consume Kafka messages via @MessagePattern, causing events to be lost. Fixed services: - planting-service: ACK consumption - identity-service: event consumption - reward-service: event consumption blockchain-service already had this configured. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- backend/services/identity-service/src/main.ts | 28 +++++++++++++++++-- backend/services/planting-service/src/main.ts | 22 +++++++++++++++ backend/services/reward-service/src/main.ts | 21 ++++++++++++++ 3 files changed, 68 insertions(+), 3 deletions(-) diff --git a/backend/services/identity-service/src/main.ts b/backend/services/identity-service/src/main.ts index c118ad84..28df7f6d 100644 --- a/backend/services/identity-service/src/main.ts +++ b/backend/services/identity-service/src/main.ts @@ -1,9 +1,11 @@ import { NestFactory } from '@nestjs/core'; -import { ValidationPipe } from '@nestjs/common'; +import { ValidationPipe, Logger } from '@nestjs/common'; import { SwaggerModule, DocumentBuilder } from '@nestjs/swagger'; +import { MicroserviceOptions, Transport } from '@nestjs/microservices'; import { AppModule } from './app.module'; async function bootstrap() { + const logger = new Logger('Bootstrap'); const app = await NestFactory.create(AppModule); // Global prefix @@ -36,10 +38,30 @@ async function bootstrap() { const document = SwaggerModule.createDocument(app, config); SwaggerModule.setup('api/docs', app, document); + // Kafka 微服务 - 用于 @MessagePattern 消费消息 + const kafkaBrokers = process.env.KAFKA_BROKERS?.split(',') || ['localhost:9092']; + const kafkaGroupId = process.env.KAFKA_GROUP_ID || 'identity-service-group'; + + app.connectMicroservice({ + transport: Transport.KAFKA, + options: { + client: { + clientId: 'identity-service', + brokers: kafkaBrokers, + }, + consumer: { + groupId: kafkaGroupId, + }, + }, + }); + + await app.startAllMicroservices(); + logger.log('Kafka microservice started'); + const port = process.env.APP_PORT || 3000; await app.listen(port); - console.log(`Identity Service is running on port ${port}`); - console.log(`Swagger docs: http://localhost:${port}/api/docs`); + logger.log(`Identity Service is running on port ${port}`); + logger.log(`Swagger docs: http://localhost:${port}/api/docs`); } bootstrap(); diff --git a/backend/services/planting-service/src/main.ts b/backend/services/planting-service/src/main.ts index 50290aa1..327a44f1 100644 --- a/backend/services/planting-service/src/main.ts +++ b/backend/services/planting-service/src/main.ts @@ -1,6 +1,7 @@ import { NestFactory } from '@nestjs/core'; import { ValidationPipe, Logger } from '@nestjs/common'; import { SwaggerModule, DocumentBuilder } from '@nestjs/swagger'; +import { MicroserviceOptions, Transport } from '@nestjs/microservices'; import { AppModule } from './app.module'; async function bootstrap() { @@ -41,6 +42,27 @@ async function bootstrap() { const document = SwaggerModule.createDocument(app, config); SwaggerModule.setup('api/docs', app, document); + // Kafka 微服务 - 用于接收 ACK 确认消息 + const kafkaBrokers = process.env.KAFKA_BROKERS?.split(',') || ['localhost:9092']; + const kafkaGroupId = process.env.KAFKA_GROUP_ID || 'planting-service-group'; + + app.connectMicroservice({ + transport: Transport.KAFKA, + options: { + client: { + clientId: 'planting-service-ack', + brokers: kafkaBrokers, + }, + consumer: { + groupId: `${kafkaGroupId}-ack`, + }, + }, + }); + + // 启动 Kafka 微服务 + await app.startAllMicroservices(); + logger.log('Kafka microservice started for ACK consumption'); + const port = process.env.APP_PORT || 3003; await app.listen(port); diff --git a/backend/services/reward-service/src/main.ts b/backend/services/reward-service/src/main.ts index c5754f11..235af613 100644 --- a/backend/services/reward-service/src/main.ts +++ b/backend/services/reward-service/src/main.ts @@ -1,6 +1,7 @@ import { NestFactory } from '@nestjs/core'; import { ValidationPipe, Logger } from '@nestjs/common'; import { SwaggerModule, DocumentBuilder } from '@nestjs/swagger'; +import { MicroserviceOptions, Transport } from '@nestjs/microservices'; import { ConfigService } from '@nestjs/config'; import { AppModule } from './app.module'; @@ -38,6 +39,26 @@ async function bootstrap() { const document = SwaggerModule.createDocument(app, config); SwaggerModule.setup('api', app, document); + // Kafka 微服务 - 用于 @MessagePattern 消费消息 + const kafkaBrokers = process.env.KAFKA_BROKERS?.split(',') || ['localhost:9092']; + const kafkaGroupId = process.env.KAFKA_GROUP_ID || 'reward-service-group'; + + app.connectMicroservice({ + transport: Transport.KAFKA, + options: { + client: { + clientId: 'reward-service', + brokers: kafkaBrokers, + }, + consumer: { + groupId: kafkaGroupId, + }, + }, + }); + + await app.startAllMicroservices(); + logger.log('Kafka microservice started'); + await app.listen(port); logger.log(`${appName} is running on port ${port}`); logger.log(`Swagger documentation available at http://localhost:${port}/api`);