rwadurian/backend/services/reward-service/src/application/services/reward-application.service.ts

412 lines
15 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import { Injectable, Inject, Logger } from '@nestjs/common';
import { RewardLedgerEntry } from '../../domain/aggregates/reward-ledger-entry/reward-ledger-entry.aggregate';
import { RewardCalculationService } from '../../domain/services/reward-calculation.service';
import { RewardExpirationService } from '../../domain/services/reward-expiration.service';
import type { IRewardLedgerEntryRepository } from '../../domain/repositories/reward-ledger-entry.repository.interface';
import { REWARD_LEDGER_ENTRY_REPOSITORY } from '../../domain/repositories/reward-ledger-entry.repository.interface';
import type { IRewardSummaryRepository } from '../../domain/repositories/reward-summary.repository.interface';
import { REWARD_SUMMARY_REPOSITORY } from '../../domain/repositories/reward-summary.repository.interface';
import { RewardStatus } from '../../domain/value-objects/reward-status.enum';
import { RightType } from '../../domain/value-objects/right-type.enum';
import { Money } from '../../domain/value-objects/money.vo';
import { Hashpower } from '../../domain/value-objects/hashpower.vo';
import { EventPublisherService } from '../../infrastructure/kafka/event-publisher.service';
import { WalletServiceClient } from '../../infrastructure/external/wallet-service/wallet-service.client';
import { OutboxRepository, OutboxEventData } from '../../infrastructure/persistence/repositories/outbox.repository';
import { RewardSummary } from '../../domain/aggregates/reward-summary/reward-summary.aggregate';
// 总部社区账户ID
const HEADQUARTERS_COMMUNITY_USER_ID = BigInt(1);
@Injectable()
export class RewardApplicationService {
private readonly logger = new Logger(RewardApplicationService.name);
constructor(
private readonly rewardCalculationService: RewardCalculationService,
private readonly rewardExpirationService: RewardExpirationService,
@Inject(REWARD_LEDGER_ENTRY_REPOSITORY)
private readonly rewardLedgerEntryRepository: IRewardLedgerEntryRepository,
@Inject(REWARD_SUMMARY_REPOSITORY)
private readonly rewardSummaryRepository: IRewardSummaryRepository,
private readonly eventPublisher: EventPublisherService,
private readonly walletService: WalletServiceClient,
private readonly outboxRepository: OutboxRepository,
) {}
/**
* 分配奖励 (响应认种订单支付成功事件)
*/
async distributeRewards(params: {
sourceOrderNo: string; // 订单号是字符串格式如 PLT1765391584505Q0Q6QD
sourceUserId: bigint;
treeCount: number;
provinceCode: string;
cityCode: string;
}): Promise<void> {
this.logger.log(`Distributing rewards for order ${params.sourceOrderNo}`);
// 1. 计算所有奖励
const rewards = await this.rewardCalculationService.calculateRewards(params);
// 2. 保存奖励流水
await this.rewardLedgerEntryRepository.saveAll(rewards);
// 3. 更新各用户的汇总数据并收集需要同步到 wallet-service 的汇总
const userIds = [...new Set(rewards.map(r => r.userId))];
const updatedSummaries: RewardSummary[] = [];
for (const userId of userIds) {
const userRewards = rewards.filter(r => r.userId === userId);
const accountSequence = userRewards[0].accountSequence;
const summary = await this.rewardSummaryRepository.getOrCreate(userId, accountSequence);
for (const reward of userRewards) {
if (reward.isPending) {
summary.addPending(
reward.usdtAmount,
reward.hashpowerAmount,
reward.expireAt!,
);
} else if (reward.isSettleable) {
summary.addSettleable(reward.usdtAmount, reward.hashpowerAmount);
}
}
await this.rewardSummaryRepository.save(summary);
updatedSummaries.push(summary);
}
// 4. 写入 Outbox 发送到 wallet-service 同步汇总数据
// [已屏蔽] 前端直接从 reward-service 查询,不再同步到 wallet-service
// await this.publishSummaryUpdatesToOutbox(updatedSummaries);
// 5. 发布领域事件
for (const reward of rewards) {
await this.eventPublisher.publishAll(reward.domainEvents);
reward.clearDomainEvents();
}
this.logger.log(`Distributed ${rewards.length} rewards for order ${params.sourceOrderNo}`);
}
/**
* 用户认种后,将该用户的待领取奖励转为可结算
*/
async claimPendingRewardsForUser(userId: bigint): Promise<{
claimedCount: number;
totalUsdtClaimed: number;
}> {
this.logger.log(`Claiming pending rewards for user ${userId}`);
const pendingRewards = await this.rewardLedgerEntryRepository.findPendingByUserId(userId);
// 使用 userId 作为 accountSequence (系统账户场景)
const summary = await this.rewardSummaryRepository.getOrCreate(userId, userId);
let claimedCount = 0;
let totalUsdtClaimed = 0;
for (const reward of pendingRewards) {
if (!reward.isExpiredNow()) {
reward.claim();
await this.rewardLedgerEntryRepository.save(reward);
summary.movePendingToSettleable(reward.usdtAmount, reward.hashpowerAmount);
claimedCount++;
totalUsdtClaimed += reward.usdtAmount.amount;
await this.eventPublisher.publishAll(reward.domainEvents);
reward.clearDomainEvents();
}
}
await this.rewardSummaryRepository.save(summary);
// 写入 Outbox 同步到 wallet-service
// [已屏蔽] 前端直接从 reward-service 查询,不再同步到 wallet-service
// if (claimedCount > 0) {
// await this.publishSummaryUpdatesToOutbox([summary]);
// }
this.logger.log(`Claimed ${claimedCount} rewards for user ${userId}, total ${totalUsdtClaimed} USDT`);
return { claimedCount, totalUsdtClaimed };
}
/**
* 结算可结算收益
*/
async settleRewards(params: {
accountSequence: bigint;
settleCurrency: string; // BNB/OG/USDT/DST
}): Promise<{
success: boolean;
settledUsdtAmount: number;
receivedAmount: number;
settleCurrency: string;
txHash?: string;
error?: string;
}> {
this.logger.log(`Settling rewards for accountSequence ${params.accountSequence}`);
// 1. 获取可结算奖励
const settleableRewards = await this.rewardLedgerEntryRepository.findSettleableByAccountSequence(params.accountSequence);
if (settleableRewards.length === 0) {
return {
success: false,
settledUsdtAmount: 0,
receivedAmount: 0,
settleCurrency: params.settleCurrency,
error: '没有可结算的收益',
};
}
// 2. 计算总金额
const totalUsdt = settleableRewards.reduce((sum, r) => sum + r.usdtAmount.amount, 0);
const totalHashpower = settleableRewards.reduce((sum, r) => sum + r.hashpowerAmount.value, 0);
// 3. 调用钱包服务执行SWAP (使用第一条记录的userId)
const userId = settleableRewards[0].userId;
const swapResult = await this.walletService.executeSwap({
userId: userId,
usdtAmount: totalUsdt,
targetCurrency: params.settleCurrency,
});
if (!swapResult.success) {
return {
success: false,
settledUsdtAmount: totalUsdt,
receivedAmount: 0,
settleCurrency: params.settleCurrency,
error: swapResult.error,
};
}
// 4. 更新奖励状态为已结算
for (const reward of settleableRewards) {
reward.settle(params.settleCurrency, swapResult.receivedAmount!);
await this.rewardLedgerEntryRepository.save(reward);
await this.eventPublisher.publishAll(reward.domainEvents);
reward.clearDomainEvents();
}
// 5. 更新汇总数据
const summary = await this.rewardSummaryRepository.findByAccountSequence(params.accountSequence);
if (summary) {
summary.settle(Money.USDT(totalUsdt), Hashpower.create(totalHashpower));
await this.rewardSummaryRepository.save(summary);
// 写入 Outbox 同步到 wallet-service
// [已屏蔽] 前端直接从 reward-service 查询,不再同步到 wallet-service
// await this.publishSummaryUpdatesToOutbox([summary]);
}
this.logger.log(`Settled ${totalUsdt} USDT for accountSequence ${params.accountSequence}`);
return {
success: true,
settledUsdtAmount: totalUsdt,
receivedAmount: swapResult.receivedAmount!,
settleCurrency: params.settleCurrency,
txHash: swapResult.txHash,
};
}
/**
* 过期到期的待领取奖励 (定时任务调用)
*/
async expireOverdueRewards(): Promise<{
expiredCount: number;
totalUsdtExpired: number;
}> {
this.logger.log('Processing expired rewards');
const now = new Date();
const expiredPendingRewards = await this.rewardLedgerEntryRepository.findExpiredPending(now);
const expiredRewards = this.rewardExpirationService.expireOverdueRewards(expiredPendingRewards);
let totalUsdtExpired = 0;
// 按用户分组处理
const userRewardsMap = new Map<string, RewardLedgerEntry[]>();
for (const reward of expiredRewards) {
const userId = reward.userId.toString();
if (!userRewardsMap.has(userId)) {
userRewardsMap.set(userId, []);
}
userRewardsMap.get(userId)!.push(reward);
totalUsdtExpired += reward.usdtAmount.amount;
}
// 更新每个用户的汇总数据
const updatedSummaries: RewardSummary[] = [];
for (const [userId, rewards] of userRewardsMap) {
const userIdBigInt = BigInt(userId);
// 使用 userId 作为 accountSequence (过期任务中无法获取真实 accountSequence)
const summary = await this.rewardSummaryRepository.getOrCreate(userIdBigInt, userIdBigInt);
for (const reward of rewards) {
await this.rewardLedgerEntryRepository.save(reward);
summary.movePendingToExpired(reward.usdtAmount, reward.hashpowerAmount);
await this.eventPublisher.publishAll(reward.domainEvents);
reward.clearDomainEvents();
}
await this.rewardSummaryRepository.save(summary);
updatedSummaries.push(summary);
}
// 将过期奖励转入总部社区
if (expiredRewards.length > 0) {
const hqSummary = await this.rewardSummaryRepository.getOrCreate(HEADQUARTERS_COMMUNITY_USER_ID, HEADQUARTERS_COMMUNITY_USER_ID);
const totalHqUsdt = expiredRewards.reduce((sum, r) => sum + r.usdtAmount.amount, 0);
const totalHqHashpower = expiredRewards.reduce((sum, r) => sum + r.hashpowerAmount.value, 0);
hqSummary.addSettleable(Money.USDT(totalHqUsdt), Hashpower.create(totalHqHashpower));
await this.rewardSummaryRepository.save(hqSummary);
updatedSummaries.push(hqSummary);
}
// 写入 Outbox 同步到 wallet-service
// [已屏蔽] 前端直接从 reward-service 查询,不再同步到 wallet-service
// if (updatedSummaries.length > 0) {
// await this.publishSummaryUpdatesToOutbox(updatedSummaries);
// }
this.logger.log(`Expired ${expiredRewards.length} rewards, total ${totalUsdtExpired} USDT`);
return {
expiredCount: expiredRewards.length,
totalUsdtExpired,
};
}
/**
* 获取用户奖励汇总
*/
async getRewardSummary(accountSequence: bigint) {
const summary = await this.rewardSummaryRepository.findByAccountSequence(accountSequence);
if (!summary) {
return {
pendingUsdt: 0,
pendingHashpower: 0,
pendingExpireAt: null,
settleableUsdt: 0,
settleableHashpower: 0,
settledTotalUsdt: 0,
settledTotalHashpower: 0,
expiredTotalUsdt: 0,
expiredTotalHashpower: 0,
};
}
return {
pendingUsdt: summary.pendingUsdt.amount,
pendingHashpower: summary.pendingHashpower.value,
pendingExpireAt: summary.pendingExpireAt,
settleableUsdt: summary.settleableUsdt.amount,
settleableHashpower: summary.settleableHashpower.value,
settledTotalUsdt: summary.settledTotalUsdt.amount,
settledTotalHashpower: summary.settledTotalHashpower.value,
expiredTotalUsdt: summary.expiredTotalUsdt.amount,
expiredTotalHashpower: summary.expiredTotalHashpower.value,
};
}
/**
* 获取用户奖励明细
*/
async getRewardDetails(
accountSequence: bigint,
filters?: {
status?: RewardStatus;
rightType?: RightType;
startDate?: Date;
endDate?: Date;
},
pagination?: { page: number; pageSize: number },
) {
const rewards = await this.rewardLedgerEntryRepository.findByAccountSequence(accountSequence, filters, pagination);
const total = await this.rewardLedgerEntryRepository.countByAccountSequence(accountSequence, filters?.status);
return {
data: rewards.map(r => ({
id: r.id?.toString(),
rightType: r.rewardSource.rightType,
usdtAmount: r.usdtAmount.amount,
hashpowerAmount: r.hashpowerAmount.value,
rewardStatus: r.rewardStatus,
createdAt: r.createdAt,
expireAt: r.expireAt,
remainingTimeMs: r.getRemainingTimeMs(),
claimedAt: r.claimedAt,
settledAt: r.settledAt,
expiredAt: r.expiredAt,
memo: r.memo,
})),
pagination: {
page: pagination?.page || 1,
pageSize: pagination?.pageSize || 20,
total,
},
};
}
/**
* 获取待领取奖励(含倒计时)
*/
async getPendingRewards(accountSequence: bigint) {
const rewards = await this.rewardLedgerEntryRepository.findPendingByAccountSequence(accountSequence);
return rewards.map(r => ({
id: r.id?.toString(),
rightType: r.rewardSource.rightType,
usdtAmount: r.usdtAmount.amount,
hashpowerAmount: r.hashpowerAmount.value,
createdAt: r.createdAt,
expireAt: r.expireAt,
remainingTimeMs: r.getRemainingTimeMs(),
memo: r.memo,
}));
}
/**
* 发布汇总更新到 Outbox同步到 wallet-service
*/
private async publishSummaryUpdatesToOutbox(summaries: RewardSummary[]): Promise<void> {
if (summaries.length === 0) return;
const outboxEvents: OutboxEventData[] = summaries.map(summary => ({
eventType: 'reward.summary.updated',
topic: 'reward.summary.updated',
key: summary.accountSequence.toString(),
aggregateId: summary.accountSequence.toString(),
aggregateType: 'RewardSummary',
payload: {
accountSequence: summary.accountSequence.toString(),
userId: summary.userId.toString(),
pendingUsdt: summary.pendingUsdt.amount,
pendingHashpower: summary.pendingHashpower.value,
pendingExpireAt: summary.pendingExpireAt?.toISOString() || null,
settleableUsdt: summary.settleableUsdt.amount,
settleableHashpower: summary.settleableHashpower.value,
settledTotalUsdt: summary.settledTotalUsdt.amount,
settledTotalHashpower: summary.settledTotalHashpower.value,
expiredTotalUsdt: summary.expiredTotalUsdt.amount,
expiredTotalHashpower: summary.expiredTotalHashpower.value,
},
}));
await this.outboxRepository.saveEvents(outboxEvents);
this.logger.log(
`[OUTBOX] Published ${outboxEvents.length} reward.summary.updated events to outbox`,
);
}
}