fix(services): add Kafka microservice for @MessagePattern to work

The @MessagePattern decorator requires connectMicroservice() to work.
Without this configuration, services could not consume Kafka messages
via @MessagePattern, causing events to be lost.

Fixed services:
- planting-service: ACK consumption
- identity-service: event consumption
- reward-service: event consumption

blockchain-service already had this configured.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2025-12-10 09:55:58 -08:00
parent 70d1a8bfb8
commit f36fc0e8b7
3 changed files with 68 additions and 3 deletions

View File

@ -1,9 +1,11 @@
import { NestFactory } from '@nestjs/core';
import { ValidationPipe } from '@nestjs/common';
import { ValidationPipe, Logger } from '@nestjs/common';
import { SwaggerModule, DocumentBuilder } from '@nestjs/swagger';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { AppModule } from './app.module';
async function bootstrap() {
const logger = new Logger('Bootstrap');
const app = await NestFactory.create(AppModule);
// Global prefix
@ -36,10 +38,30 @@ async function bootstrap() {
const document = SwaggerModule.createDocument(app, config);
SwaggerModule.setup('api/docs', app, document);
// Kafka 微服务 - 用于 @MessagePattern 消费消息
const kafkaBrokers = process.env.KAFKA_BROKERS?.split(',') || ['localhost:9092'];
const kafkaGroupId = process.env.KAFKA_GROUP_ID || 'identity-service-group';
app.connectMicroservice<MicroserviceOptions>({
transport: Transport.KAFKA,
options: {
client: {
clientId: 'identity-service',
brokers: kafkaBrokers,
},
consumer: {
groupId: kafkaGroupId,
},
},
});
await app.startAllMicroservices();
logger.log('Kafka microservice started');
const port = process.env.APP_PORT || 3000;
await app.listen(port);
console.log(`Identity Service is running on port ${port}`);
console.log(`Swagger docs: http://localhost:${port}/api/docs`);
logger.log(`Identity Service is running on port ${port}`);
logger.log(`Swagger docs: http://localhost:${port}/api/docs`);
}
bootstrap();

View File

@ -1,6 +1,7 @@
import { NestFactory } from '@nestjs/core';
import { ValidationPipe, Logger } from '@nestjs/common';
import { SwaggerModule, DocumentBuilder } from '@nestjs/swagger';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { AppModule } from './app.module';
async function bootstrap() {
@ -41,6 +42,27 @@ async function bootstrap() {
const document = SwaggerModule.createDocument(app, config);
SwaggerModule.setup('api/docs', app, document);
// Kafka 微服务 - 用于接收 ACK 确认消息
const kafkaBrokers = process.env.KAFKA_BROKERS?.split(',') || ['localhost:9092'];
const kafkaGroupId = process.env.KAFKA_GROUP_ID || 'planting-service-group';
app.connectMicroservice<MicroserviceOptions>({
transport: Transport.KAFKA,
options: {
client: {
clientId: 'planting-service-ack',
brokers: kafkaBrokers,
},
consumer: {
groupId: `${kafkaGroupId}-ack`,
},
},
});
// 启动 Kafka 微服务
await app.startAllMicroservices();
logger.log('Kafka microservice started for ACK consumption');
const port = process.env.APP_PORT || 3003;
await app.listen(port);

View File

@ -1,6 +1,7 @@
import { NestFactory } from '@nestjs/core';
import { ValidationPipe, Logger } from '@nestjs/common';
import { SwaggerModule, DocumentBuilder } from '@nestjs/swagger';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { ConfigService } from '@nestjs/config';
import { AppModule } from './app.module';
@ -38,6 +39,26 @@ async function bootstrap() {
const document = SwaggerModule.createDocument(app, config);
SwaggerModule.setup('api', app, document);
// Kafka 微服务 - 用于 @MessagePattern 消费消息
const kafkaBrokers = process.env.KAFKA_BROKERS?.split(',') || ['localhost:9092'];
const kafkaGroupId = process.env.KAFKA_GROUP_ID || 'reward-service-group';
app.connectMicroservice<MicroserviceOptions>({
transport: Transport.KAFKA,
options: {
client: {
clientId: 'reward-service',
brokers: kafkaBrokers,
},
consumer: {
groupId: kafkaGroupId,
},
},
});
await app.startAllMicroservices();
logger.log('Kafka microservice started');
await app.listen(port);
logger.log(`${appName} is running on port ${port}`);
logger.log(`Swagger documentation available at http://localhost:${port}/api`);