import { Controller, Logger, OnModuleInit } from '@nestjs/common'; import { EventPattern, Payload } from '@nestjs/microservices'; import Decimal from 'decimal.js'; import { PrismaService } from '../../persistence/prisma/prisma.service'; import { RedisService } from '../../redis/redis.service'; import { ProcessedEventRepository } from '../../persistence/repositories/processed-event.repository'; import { ContributionWalletService } from '../../../application/services/contribution-wallet.service'; import { SystemAccountService } from '../../../application/services/system-account.service'; import { ContributionDistributionCompletedEvent, ContributionDistributionPayload, BonusClaimedEvent, BonusClaimedPayload, } from '../events/contribution-distribution.event'; // 4小时 TTL(秒) const IDEMPOTENCY_TTL_SECONDS = 4 * 60 * 60; @Controller() export class ContributionDistributionConsumer implements OnModuleInit { private readonly logger = new Logger(ContributionDistributionConsumer.name); constructor( private readonly prisma: PrismaService, private readonly redis: RedisService, private readonly processedEventRepo: ProcessedEventRepository, private readonly contributionWalletService: ContributionWalletService, private readonly systemAccountService: SystemAccountService, ) {} async onModuleInit() { this.logger.log('ContributionDistributionConsumer initialized'); } @EventPattern('contribution.distribution.completed') async handleDistributionCompleted( @Payload() message: any, ): Promise { // 解析消息格式 const event: ContributionDistributionCompletedEvent = message.value || message; const eventId = event.eventId || message.eventId; if (!eventId) { this.logger.warn('Received event without eventId, skipping'); return; } this.logger.debug(`Processing distribution event: ${eventId}`); // 幂等性检查 if (await this.isEventProcessed(eventId)) { this.logger.debug(`Event ${eventId} already processed, skipping`); return; } try { await this.processDistribution(event.payload); // 标记为已处理 await this.markEventProcessed(eventId, event.eventType); this.logger.log( `Distribution for adoption ${event.payload.adoptionId} processed successfully`, ); } catch (error) { this.logger.error( `Failed to process distribution for adoption ${event.payload.adoptionId}`, error instanceof Error ? error.stack : error, ); throw error; // 让 Kafka 重试 } } private async processDistribution( payload: ContributionDistributionPayload, ): Promise { // 1. 处理用户贡献值 for (const userContrib of payload.userContributions) { await this.contributionWalletService.creditContribution({ accountSequence: userContrib.accountSequence, amount: new Decimal(userContrib.amount), contributionType: userContrib.contributionType, levelDepth: userContrib.levelDepth, bonusTier: userContrib.bonusTier, effectiveDate: new Date(userContrib.effectiveDate), expireDate: new Date(userContrib.expireDate), sourceAdoptionId: userContrib.sourceAdoptionId, sourceAccountSequence: userContrib.sourceAccountSequence, }); } // 2. 处理系统账户贡献值 for (const sysContrib of payload.systemContributions) { await this.contributionWalletService.creditSystemContribution({ accountType: sysContrib.accountType, amount: new Decimal(sysContrib.amount), provinceCode: sysContrib.provinceCode, cityCode: sysContrib.cityCode, neverExpires: sysContrib.neverExpires, sourceAdoptionId: payload.adoptionId, sourceAccountSequence: payload.adopterAccountSequence, }); } // 3. 处理未分配的贡献值(归总部) for (const unalloc of payload.unallocatedToHeadquarters) { await this.contributionWalletService.creditSystemContribution({ accountType: 'HEADQUARTERS', amount: new Decimal(unalloc.amount), neverExpires: true, sourceAdoptionId: payload.adoptionId, sourceAccountSequence: payload.adopterAccountSequence, memo: unalloc.reason, }); } } /** * 处理奖励补发事件 * 当用户解锁新的奖励档位时,补发之前所有认种对应的奖励 */ @EventPattern('contribution.bonus.claimed') async handleBonusClaimed(@Payload() message: any): Promise { const event: BonusClaimedEvent = message.value || message; const eventId = event.eventId || message.eventId; if (!eventId) { this.logger.warn('Received BonusClaimed event without eventId, skipping'); return; } this.logger.debug(`Processing bonus claim event: ${eventId}`); // 幂等性检查 if (await this.isEventProcessed(eventId)) { this.logger.debug(`Event ${eventId} already processed, skipping`); return; } try { await this.processBonusClaim(event.payload); // 标记为已处理 await this.markEventProcessed(eventId, event.eventType); this.logger.log( `Bonus claim for ${event.payload.accountSequence} T${event.payload.bonusTier} processed: ` + `${event.payload.claimedCount} records`, ); } catch (error) { this.logger.error( `Failed to process bonus claim for ${event.payload.accountSequence}`, error instanceof Error ? error.stack : error, ); throw error; // 让 Kafka 重试 } } /** * 处理奖励补发 */ private async processBonusClaim(payload: BonusClaimedPayload): Promise { for (const contrib of payload.userContributions) { await this.contributionWalletService.creditContribution({ accountSequence: contrib.accountSequence, amount: new Decimal(contrib.amount), contributionType: contrib.contributionType, bonusTier: contrib.bonusTier, effectiveDate: new Date(contrib.effectiveDate), expireDate: new Date(contrib.expireDate), sourceAdoptionId: contrib.sourceAdoptionId, sourceAccountSequence: contrib.sourceAccountSequence, }); } } /** * 幂等性检查 - Redis + DB 双重检查,4小时去重窗口 */ private async isEventProcessed(eventId: string): Promise { const redisKey = `processed-event:${eventId}`; // 1. 先检查 Redis 缓存(快速路径) const cached = await this.redis.get(redisKey); if (cached) return true; // 2. 检查数据库 const dbRecord = await this.processedEventRepo.findByEventId(eventId); if (dbRecord) { // 回填 Redis 缓存 await this.redis.set(redisKey, '1', IDEMPOTENCY_TTL_SECONDS); return true; } return false; } /** * 标记事件为已处理 */ private async markEventProcessed( eventId: string, eventType: string, ): Promise { // 1. 写入数据库 await this.processedEventRepo.create({ eventId, eventType, sourceService: 'contribution-service', }); // 2. 写入 Redis 缓存(4小时 TTL) const redisKey = `processed-event:${eventId}`; await this.redis.set(redisKey, '1', IDEMPOTENCY_TTL_SECONDS); } }