diff --git a/backend/services/mining-wallet-service/src/application/application.module.ts b/backend/services/mining-wallet-service/src/application/application.module.ts index 047db791..f39c22af 100644 --- a/backend/services/mining-wallet-service/src/application/application.module.ts +++ b/backend/services/mining-wallet-service/src/application/application.module.ts @@ -17,6 +17,11 @@ import { UserRegisteredConsumer } from '../infrastructure/kafka/consumers/user-r @Module({ imports: [ScheduleModule.forRoot()], + controllers: [ + // Kafka Consumers (微服务消息处理器需要是 Controller) + ContributionDistributionConsumer, + UserRegisteredConsumer, + ], providers: [ // Services SystemAccountService, @@ -26,9 +31,6 @@ import { UserRegisteredConsumer } from '../infrastructure/kafka/consumers/user-r // Schedulers OutboxScheduler, ContributionExpiryScheduler, - // Consumers - ContributionDistributionConsumer, - UserRegisteredConsumer, ], exports: [ SystemAccountService, diff --git a/backend/services/mining-wallet-service/src/infrastructure/kafka/consumers/contribution-distribution.consumer.ts b/backend/services/mining-wallet-service/src/infrastructure/kafka/consumers/contribution-distribution.consumer.ts index e77b6964..745f4a85 100644 --- a/backend/services/mining-wallet-service/src/infrastructure/kafka/consumers/contribution-distribution.consumer.ts +++ b/backend/services/mining-wallet-service/src/infrastructure/kafka/consumers/contribution-distribution.consumer.ts @@ -1,4 +1,4 @@ -import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { Controller, Logger, OnModuleInit } from '@nestjs/common'; import { EventPattern, Payload } from '@nestjs/microservices'; import Decimal from 'decimal.js'; import { PrismaService } from '../../persistence/prisma/prisma.service'; @@ -14,7 +14,7 @@ import { // 4小时 TTL(秒) const IDEMPOTENCY_TTL_SECONDS = 4 * 60 * 60; -@Injectable() +@Controller() export class ContributionDistributionConsumer implements OnModuleInit { private readonly logger = new Logger(ContributionDistributionConsumer.name); diff --git a/backend/services/mining-wallet-service/src/infrastructure/kafka/consumers/user-registered.consumer.ts b/backend/services/mining-wallet-service/src/infrastructure/kafka/consumers/user-registered.consumer.ts index 4f2ea634..08b58144 100644 --- a/backend/services/mining-wallet-service/src/infrastructure/kafka/consumers/user-registered.consumer.ts +++ b/backend/services/mining-wallet-service/src/infrastructure/kafka/consumers/user-registered.consumer.ts @@ -1,4 +1,4 @@ -import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { Controller, Logger, OnModuleInit } from '@nestjs/common'; import { EventPattern, Payload } from '@nestjs/microservices'; import { RedisService } from '../../redis/redis.service'; import { ProcessedEventRepository } from '../../persistence/repositories/processed-event.repository'; @@ -8,7 +8,7 @@ import { UserRegisteredEvent } from '../events/contribution-distribution.event'; // 4小时 TTL(秒) const IDEMPOTENCY_TTL_SECONDS = 4 * 60 * 60; -@Injectable() +@Controller() export class UserRegisteredConsumer implements OnModuleInit { private readonly logger = new Logger(UserRegisteredConsumer.name); diff --git a/backend/services/mining-wallet-service/src/main.ts b/backend/services/mining-wallet-service/src/main.ts index 4c33c4e1..f8eece85 100644 --- a/backend/services/mining-wallet-service/src/main.ts +++ b/backend/services/mining-wallet-service/src/main.ts @@ -56,6 +56,10 @@ async function bootstrap() { consumer: { groupId: 'mining-wallet-service-group', }, + subscribe: { + // 显式订阅需要消费的 topics + fromBeginning: true, + }, }, });