import { Controller, Logger } from '@nestjs/common'; import { MessagePattern, Payload, Ctx, KafkaContext } from '@nestjs/microservices'; import { PrismaService } from '../persistence/prisma/prisma.service'; import { EventAckPublisher } from './event-ack.publisher'; import Decimal from 'decimal.js'; /** * 奖励事件消息结构 * * 注意:OutboxPublisherService 发送的消息格式是将 payload 字段直接展开在顶层 * 所以 accountSequence 等字段直接在消息根级别 */ interface RewardSummaryUpdatedEvent { // payload 字段直接展开在顶层 accountSequence: string; userId: string; pendingUsdt: number; pendingHashpower: number; pendingExpireAt: string | null; settleableUsdt: number; settleableHashpower: number; settledTotalUsdt: number; settledTotalHashpower: number; expiredTotalUsdt: number; expiredTotalHashpower: number; // outbox 元数据 _outbox?: { id: string; aggregateId: string; eventType: string; }; } /** * 奖励事件 Kafka 控制器 * * 消费 reward-service 发布的奖励汇总更新事件 * 使用幂等性检查确保不重复处理 */ @Controller() export class RewardEventConsumerController { private readonly logger = new Logger(RewardEventConsumerController.name); constructor( private readonly prisma: PrismaService, private readonly eventAckPublisher: EventAckPublisher, ) {} /** * 处理奖励汇总更新事件 */ @MessagePattern('reward.summary.updated') async handleRewardSummaryUpdated( @Payload() message: RewardSummaryUpdatedEvent, @Ctx() context: KafkaContext, ): Promise { const partition = context.getPartition(); const offset = context.getMessage().offset; const outboxInfo = message._outbox; // eventId 和 eventType 来自 _outbox 元数据,或者使用 accountSequence 作为备用 const eventId = outboxInfo?.aggregateId || message.accountSequence; const eventType = outboxInfo?.eventType || 'reward.summary.updated'; this.logger.log( `[REWARD-EVENT] Received reward.summary.updated for ${eventId} ` + `[partition=${partition}, offset=${offset}]`, ); this.logger.debug(`[REWARD-EVENT] Message content: ${JSON.stringify(message)}`); try { // 1. 幂等性检查 const alreadyProcessed = await this.prisma.processedEvent.findUnique({ where: { eventId_eventType: { eventId, eventType, }, }, }); if (alreadyProcessed) { this.logger.warn(`[REWARD-EVENT] Event ${eventId} (${eventType}) already processed, skipping`); // 仍然发送确认,避免重复发送 if (outboxInfo) { await this.eventAckPublisher.sendSuccess(eventId, eventType); } return; } // 2. 更新 wallet_accounts 表的 rewards 数据 // 注意:payload 字段直接展开在消息顶层 const accountSequence = BigInt(message.accountSequence); await this.prisma.$transaction(async (tx) => { // 更新钱包账户的奖励数据 await tx.walletAccount.update({ where: { accountSequence }, data: { pendingUsdt: new Decimal(message.pendingUsdt), pendingHashpower: new Decimal(message.pendingHashpower), pendingExpireAt: message.pendingExpireAt ? new Date(message.pendingExpireAt) : null, settleableUsdt: new Decimal(message.settleableUsdt), settleableHashpower: new Decimal(message.settleableHashpower), settledTotalUsdt: new Decimal(message.settledTotalUsdt), settledTotalHashpower: new Decimal(message.settledTotalHashpower), expiredTotalUsdt: new Decimal(message.expiredTotalUsdt), expiredTotalHashpower: new Decimal(message.expiredTotalHashpower), }, }); // 记录已处理事件(幂等性) await tx.processedEvent.create({ data: { eventId, eventType, sourceService: 'reward-service', }, }); }); this.logger.log( `[REWARD-EVENT] ✓ Updated rewards for accountSequence ${accountSequence}: ` + `settleable=${message.settleableUsdt}, pending=${message.pendingUsdt}`, ); // 3. 发送确认 if (outboxInfo) { await this.eventAckPublisher.sendSuccess(eventId, eventType); } } catch (error) { const errorMessage = error instanceof Error ? error.message : String(error); this.logger.error(`[REWARD-EVENT] Error processing event ${eventId}: ${errorMessage}`); // 发送失败确认 if (outboxInfo) { await this.eventAckPublisher.sendFailure(eventId, eventType, errorMessage); } throw error; } } }