feat(reward): add outbox event publishing for reward summary updates

Add publishSummaryUpdatesToOutbox method to write reward.summary.updated
events to outbox table for synchronization to wallet-service.

Events are published after:
- distributeRewards: when rewards are distributed to users
- claimPendingRewardsForUser: when pending rewards are claimed
- settleRewards: when rewards are settled
- expireOverdueRewards: when rewards expire

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2025-12-10 23:40:35 -08:00
parent 538aae4ef0
commit 1acb314e7b
1 changed files with 62 additions and 2 deletions

View File

@ -12,6 +12,8 @@ import { Money } from '../../domain/value-objects/money.vo';
import { Hashpower } from '../../domain/value-objects/hashpower.vo';
import { EventPublisherService } from '../../infrastructure/kafka/event-publisher.service';
import { WalletServiceClient } from '../../infrastructure/external/wallet-service/wallet-service.client';
import { OutboxRepository, OutboxEventData } from '../../infrastructure/persistence/repositories/outbox.repository';
import { RewardSummary } from '../../domain/aggregates/reward-summary/reward-summary.aggregate';
// 总部社区账户ID
const HEADQUARTERS_COMMUNITY_USER_ID = BigInt(1);
@ -29,6 +31,7 @@ export class RewardApplicationService {
private readonly rewardSummaryRepository: IRewardSummaryRepository,
private readonly eventPublisher: EventPublisherService,
private readonly walletService: WalletServiceClient,
private readonly outboxRepository: OutboxRepository,
) {}
/**
@ -49,8 +52,10 @@ export class RewardApplicationService {
// 2. 保存奖励流水
await this.rewardLedgerEntryRepository.saveAll(rewards);
// 3. 更新各用户的汇总数据
// 3. 更新各用户的汇总数据并收集需要同步到 wallet-service 的汇总
const userIds = [...new Set(rewards.map(r => r.userId))];
const updatedSummaries: RewardSummary[] = [];
for (const userId of userIds) {
const userRewards = rewards.filter(r => r.userId === userId);
const accountSequence = userRewards[0].accountSequence;
@ -69,9 +74,13 @@ export class RewardApplicationService {
}
await this.rewardSummaryRepository.save(summary);
updatedSummaries.push(summary);
}
// 4. 发布领域事件
// 4. 写入 Outbox 发送到 wallet-service 同步汇总数据
await this.publishSummaryUpdatesToOutbox(updatedSummaries);
// 5. 发布领域事件
for (const reward of rewards) {
await this.eventPublisher.publishAll(reward.domainEvents);
reward.clearDomainEvents();
@ -113,6 +122,11 @@ export class RewardApplicationService {
await this.rewardSummaryRepository.save(summary);
// 写入 Outbox 同步到 wallet-service
if (claimedCount > 0) {
await this.publishSummaryUpdatesToOutbox([summary]);
}
this.logger.log(`Claimed ${claimedCount} rewards for user ${userId}, total ${totalUsdtClaimed} USDT`);
return { claimedCount, totalUsdtClaimed };
@ -182,6 +196,9 @@ export class RewardApplicationService {
if (summary) {
summary.settle(Money.USDT(totalUsdt), Hashpower.create(totalHashpower));
await this.rewardSummaryRepository.save(summary);
// 写入 Outbox 同步到 wallet-service
await this.publishSummaryUpdatesToOutbox([summary]);
}
this.logger.log(`Settled ${totalUsdt} USDT for accountSequence ${params.accountSequence}`);
@ -223,6 +240,8 @@ export class RewardApplicationService {
}
// 更新每个用户的汇总数据
const updatedSummaries: RewardSummary[] = [];
for (const [userId, rewards] of userRewardsMap) {
const userIdBigInt = BigInt(userId);
// 使用 userId 作为 accountSequence (过期任务中无法获取真实 accountSequence)
@ -236,6 +255,7 @@ export class RewardApplicationService {
}
await this.rewardSummaryRepository.save(summary);
updatedSummaries.push(summary);
}
// 将过期奖励转入总部社区
@ -245,6 +265,12 @@ export class RewardApplicationService {
const totalHqHashpower = expiredRewards.reduce((sum, r) => sum + r.hashpowerAmount.value, 0);
hqSummary.addSettleable(Money.USDT(totalHqUsdt), Hashpower.create(totalHqHashpower));
await this.rewardSummaryRepository.save(hqSummary);
updatedSummaries.push(hqSummary);
}
// 写入 Outbox 同步到 wallet-service
if (updatedSummaries.length > 0) {
await this.publishSummaryUpdatesToOutbox(updatedSummaries);
}
this.logger.log(`Expired ${expiredRewards.length} rewards, total ${totalUsdtExpired} USDT`);
@ -344,4 +370,38 @@ export class RewardApplicationService {
memo: r.memo,
}));
}
/**
* Outbox wallet-service
*/
private async publishSummaryUpdatesToOutbox(summaries: RewardSummary[]): Promise<void> {
if (summaries.length === 0) return;
const outboxEvents: OutboxEventData[] = summaries.map(summary => ({
eventType: 'reward.summary.updated',
topic: 'reward.summary.updated',
key: summary.accountSequence.toString(),
aggregateId: summary.accountSequence.toString(),
aggregateType: 'RewardSummary',
payload: {
accountSequence: summary.accountSequence.toString(),
userId: summary.userId.toString(),
pendingUsdt: summary.pendingUsdt.amount,
pendingHashpower: summary.pendingHashpower.value,
pendingExpireAt: summary.pendingExpireAt?.toISOString() || null,
settleableUsdt: summary.settleableUsdt.amount,
settleableHashpower: summary.settleableHashpower.value,
settledTotalUsdt: summary.settledTotalUsdt.amount,
settledTotalHashpower: summary.settledTotalHashpower.value,
expiredTotalUsdt: summary.expiredTotalUsdt.amount,
expiredTotalHashpower: summary.expiredTotalHashpower.value,
},
}));
await this.outboxRepository.saveEvents(outboxEvents);
this.logger.log(
`[OUTBOX] Published ${outboxEvents.length} reward.summary.updated events to outbox`,
);
}
}