diff --git a/backend/services/reward-service/src/application/services/reward-application.service.ts b/backend/services/reward-service/src/application/services/reward-application.service.ts index 6e67a143..1b61be41 100644 --- a/backend/services/reward-service/src/application/services/reward-application.service.ts +++ b/backend/services/reward-service/src/application/services/reward-application.service.ts @@ -12,6 +12,8 @@ import { Money } from '../../domain/value-objects/money.vo'; import { Hashpower } from '../../domain/value-objects/hashpower.vo'; import { EventPublisherService } from '../../infrastructure/kafka/event-publisher.service'; import { WalletServiceClient } from '../../infrastructure/external/wallet-service/wallet-service.client'; +import { OutboxRepository, OutboxEventData } from '../../infrastructure/persistence/repositories/outbox.repository'; +import { RewardSummary } from '../../domain/aggregates/reward-summary/reward-summary.aggregate'; // 总部社区账户ID const HEADQUARTERS_COMMUNITY_USER_ID = BigInt(1); @@ -29,6 +31,7 @@ export class RewardApplicationService { private readonly rewardSummaryRepository: IRewardSummaryRepository, private readonly eventPublisher: EventPublisherService, private readonly walletService: WalletServiceClient, + private readonly outboxRepository: OutboxRepository, ) {} /** @@ -49,8 +52,10 @@ export class RewardApplicationService { // 2. 保存奖励流水 await this.rewardLedgerEntryRepository.saveAll(rewards); - // 3. 更新各用户的汇总数据 + // 3. 更新各用户的汇总数据并收集需要同步到 wallet-service 的汇总 const userIds = [...new Set(rewards.map(r => r.userId))]; + const updatedSummaries: RewardSummary[] = []; + for (const userId of userIds) { const userRewards = rewards.filter(r => r.userId === userId); const accountSequence = userRewards[0].accountSequence; @@ -69,9 +74,13 @@ export class RewardApplicationService { } await this.rewardSummaryRepository.save(summary); + updatedSummaries.push(summary); } - // 4. 发布领域事件 + // 4. 写入 Outbox 发送到 wallet-service 同步汇总数据 + await this.publishSummaryUpdatesToOutbox(updatedSummaries); + + // 5. 发布领域事件 for (const reward of rewards) { await this.eventPublisher.publishAll(reward.domainEvents); reward.clearDomainEvents(); @@ -113,6 +122,11 @@ export class RewardApplicationService { await this.rewardSummaryRepository.save(summary); + // 写入 Outbox 同步到 wallet-service + if (claimedCount > 0) { + await this.publishSummaryUpdatesToOutbox([summary]); + } + this.logger.log(`Claimed ${claimedCount} rewards for user ${userId}, total ${totalUsdtClaimed} USDT`); return { claimedCount, totalUsdtClaimed }; @@ -182,6 +196,9 @@ export class RewardApplicationService { if (summary) { summary.settle(Money.USDT(totalUsdt), Hashpower.create(totalHashpower)); await this.rewardSummaryRepository.save(summary); + + // 写入 Outbox 同步到 wallet-service + await this.publishSummaryUpdatesToOutbox([summary]); } this.logger.log(`Settled ${totalUsdt} USDT for accountSequence ${params.accountSequence}`); @@ -223,6 +240,8 @@ export class RewardApplicationService { } // 更新每个用户的汇总数据 + const updatedSummaries: RewardSummary[] = []; + for (const [userId, rewards] of userRewardsMap) { const userIdBigInt = BigInt(userId); // 使用 userId 作为 accountSequence (过期任务中无法获取真实 accountSequence) @@ -236,6 +255,7 @@ export class RewardApplicationService { } await this.rewardSummaryRepository.save(summary); + updatedSummaries.push(summary); } // 将过期奖励转入总部社区 @@ -245,6 +265,12 @@ export class RewardApplicationService { const totalHqHashpower = expiredRewards.reduce((sum, r) => sum + r.hashpowerAmount.value, 0); hqSummary.addSettleable(Money.USDT(totalHqUsdt), Hashpower.create(totalHqHashpower)); await this.rewardSummaryRepository.save(hqSummary); + updatedSummaries.push(hqSummary); + } + + // 写入 Outbox 同步到 wallet-service + if (updatedSummaries.length > 0) { + await this.publishSummaryUpdatesToOutbox(updatedSummaries); } this.logger.log(`Expired ${expiredRewards.length} rewards, total ${totalUsdtExpired} USDT`); @@ -344,4 +370,38 @@ export class RewardApplicationService { memo: r.memo, })); } + + /** + * 发布汇总更新到 Outbox(同步到 wallet-service) + */ + private async publishSummaryUpdatesToOutbox(summaries: RewardSummary[]): Promise { + if (summaries.length === 0) return; + + const outboxEvents: OutboxEventData[] = summaries.map(summary => ({ + eventType: 'reward.summary.updated', + topic: 'reward.summary.updated', + key: summary.accountSequence.toString(), + aggregateId: summary.accountSequence.toString(), + aggregateType: 'RewardSummary', + payload: { + accountSequence: summary.accountSequence.toString(), + userId: summary.userId.toString(), + pendingUsdt: summary.pendingUsdt.amount, + pendingHashpower: summary.pendingHashpower.value, + pendingExpireAt: summary.pendingExpireAt?.toISOString() || null, + settleableUsdt: summary.settleableUsdt.amount, + settleableHashpower: summary.settleableHashpower.value, + settledTotalUsdt: summary.settledTotalUsdt.amount, + settledTotalHashpower: summary.settledTotalHashpower.value, + expiredTotalUsdt: summary.expiredTotalUsdt.amount, + expiredTotalHashpower: summary.expiredTotalHashpower.value, + }, + })); + + await this.outboxRepository.saveEvents(outboxEvents); + + this.logger.log( + `[OUTBOX] Published ${outboxEvents.length} reward.summary.updated events to outbox`, + ); + } }