fix(mining-wallet): fix Kafka consumers not subscribing to topics

- Change consumers from @Injectable to @Controller for @EventPattern to work
- Move consumers from providers to controllers array in module
- Add subscribe.fromBeginning config to Kafka microservice

The consumers were not receiving messages because NestJS microservices
require @EventPattern handlers to be in @Controller classes, not just
@Injectable services.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-13 23:31:31 -08:00
parent 77b682c8a8
commit 6594845d4c
4 changed files with 13 additions and 7 deletions

View File

@ -17,6 +17,11 @@ import { UserRegisteredConsumer } from '../infrastructure/kafka/consumers/user-r
@Module({
imports: [ScheduleModule.forRoot()],
controllers: [
// Kafka Consumers (微服务消息处理器需要是 Controller)
ContributionDistributionConsumer,
UserRegisteredConsumer,
],
providers: [
// Services
SystemAccountService,
@ -26,9 +31,6 @@ import { UserRegisteredConsumer } from '../infrastructure/kafka/consumers/user-r
// Schedulers
OutboxScheduler,
ContributionExpiryScheduler,
// Consumers
ContributionDistributionConsumer,
UserRegisteredConsumer,
],
exports: [
SystemAccountService,

View File

@ -1,4 +1,4 @@
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { Controller, Logger, OnModuleInit } from '@nestjs/common';
import { EventPattern, Payload } from '@nestjs/microservices';
import Decimal from 'decimal.js';
import { PrismaService } from '../../persistence/prisma/prisma.service';
@ -14,7 +14,7 @@ import {
// 4小时 TTL
const IDEMPOTENCY_TTL_SECONDS = 4 * 60 * 60;
@Injectable()
@Controller()
export class ContributionDistributionConsumer implements OnModuleInit {
private readonly logger = new Logger(ContributionDistributionConsumer.name);

View File

@ -1,4 +1,4 @@
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { Controller, Logger, OnModuleInit } from '@nestjs/common';
import { EventPattern, Payload } from '@nestjs/microservices';
import { RedisService } from '../../redis/redis.service';
import { ProcessedEventRepository } from '../../persistence/repositories/processed-event.repository';
@ -8,7 +8,7 @@ import { UserRegisteredEvent } from '../events/contribution-distribution.event';
// 4小时 TTL
const IDEMPOTENCY_TTL_SECONDS = 4 * 60 * 60;
@Injectable()
@Controller()
export class UserRegisteredConsumer implements OnModuleInit {
private readonly logger = new Logger(UserRegisteredConsumer.name);

View File

@ -56,6 +56,10 @@ async function bootstrap() {
consumer: {
groupId: 'mining-wallet-service-group',
},
subscribe: {
// 显式订阅需要消费的 topics
fromBeginning: true,
},
},
});