diff --git a/backend/services/wallet-service/src/infrastructure/kafka/reward-event-consumer.controller.ts b/backend/services/wallet-service/src/infrastructure/kafka/reward-event-consumer.controller.ts index 879bcf13..7eea09aa 100644 --- a/backend/services/wallet-service/src/infrastructure/kafka/reward-event-consumer.controller.ts +++ b/backend/services/wallet-service/src/infrastructure/kafka/reward-event-consumer.controller.ts @@ -6,23 +6,24 @@ import Decimal from 'decimal.js'; /** * 奖励事件消息结构 + * + * 注意:OutboxPublisherService 发送的消息格式是将 payload 字段直接展开在顶层 + * 所以 accountSequence 等字段直接在消息根级别 */ interface RewardSummaryUpdatedEvent { - eventType: string; - aggregateId: string; - payload: { - accountSequence: string; - userId: string; - pendingUsdt: number; - pendingHashpower: number; - pendingExpireAt: string | null; - settleableUsdt: number; - settleableHashpower: number; - settledTotalUsdt: number; - settledTotalHashpower: number; - expiredTotalUsdt: number; - expiredTotalHashpower: number; - }; + // payload 字段直接展开在顶层 + accountSequence: string; + userId: string; + pendingUsdt: number; + pendingHashpower: number; + pendingExpireAt: string | null; + settleableUsdt: number; + settleableHashpower: number; + settledTotalUsdt: number; + settledTotalHashpower: number; + expiredTotalUsdt: number; + expiredTotalHashpower: number; + // outbox 元数据 _outbox?: { id: string; aggregateId: string; @@ -56,13 +57,15 @@ export class RewardEventConsumerController { const partition = context.getPartition(); const offset = context.getMessage().offset; const outboxInfo = message._outbox; - const eventId = outboxInfo?.aggregateId || message.aggregateId; - const eventType = outboxInfo?.eventType || message.eventType; + // eventId 和 eventType 来自 _outbox 元数据,或者使用 accountSequence 作为备用 + const eventId = outboxInfo?.aggregateId || message.accountSequence; + const eventType = outboxInfo?.eventType || 'reward.summary.updated'; this.logger.log( `[REWARD-EVENT] Received reward.summary.updated for ${eventId} ` + `[partition=${partition}, offset=${offset}]`, ); + this.logger.debug(`[REWARD-EVENT] Message content: ${JSON.stringify(message)}`); try { // 1. 幂等性检查 @@ -85,23 +88,23 @@ export class RewardEventConsumerController { } // 2. 更新 wallet_accounts 表的 rewards 数据 - const payload = message.payload; - const accountSequence = BigInt(payload.accountSequence); + // 注意:payload 字段直接展开在消息顶层 + const accountSequence = BigInt(message.accountSequence); await this.prisma.$transaction(async (tx) => { // 更新钱包账户的奖励数据 await tx.walletAccount.update({ where: { accountSequence }, data: { - pendingUsdt: new Decimal(payload.pendingUsdt), - pendingHashpower: new Decimal(payload.pendingHashpower), - pendingExpireAt: payload.pendingExpireAt ? new Date(payload.pendingExpireAt) : null, - settleableUsdt: new Decimal(payload.settleableUsdt), - settleableHashpower: new Decimal(payload.settleableHashpower), - settledTotalUsdt: new Decimal(payload.settledTotalUsdt), - settledTotalHashpower: new Decimal(payload.settledTotalHashpower), - expiredTotalUsdt: new Decimal(payload.expiredTotalUsdt), - expiredTotalHashpower: new Decimal(payload.expiredTotalHashpower), + pendingUsdt: new Decimal(message.pendingUsdt), + pendingHashpower: new Decimal(message.pendingHashpower), + pendingExpireAt: message.pendingExpireAt ? new Date(message.pendingExpireAt) : null, + settleableUsdt: new Decimal(message.settleableUsdt), + settleableHashpower: new Decimal(message.settleableHashpower), + settledTotalUsdt: new Decimal(message.settledTotalUsdt), + settledTotalHashpower: new Decimal(message.settledTotalHashpower), + expiredTotalUsdt: new Decimal(message.expiredTotalUsdt), + expiredTotalHashpower: new Decimal(message.expiredTotalHashpower), }, }); @@ -117,7 +120,7 @@ export class RewardEventConsumerController { this.logger.log( `[REWARD-EVENT] ✓ Updated rewards for accountSequence ${accountSequence}: ` + - `settleable=${payload.settleableUsdt}, pending=${payload.pendingUsdt}`, + `settleable=${message.settleableUsdt}, pending=${message.pendingUsdt}`, ); // 3. 发送确认