fix(wallet-service): 使用事务确保 settleUserPendingRewards 原子性

- 将 pending_rewards 状态更新和 wallet_accounts 余额更新包装在 Prisma $transaction 中
- 修复 Bug 4: pending_rewards 被标记为 SETTLED 但 settleable_usdt 未更新的问题
- 添加 PrismaService 依赖注入
- 同时减少 pendingUsdt/pendingHashpower,增加 settleableUsdt/settleableHashpower
- 记录 REWARD_TO_SETTLEABLE 类型的流水

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2025-12-13 06:58:07 -08:00
parent c93f43546e
commit 846915badc
1 changed files with 61 additions and 24 deletions

View File

@ -7,6 +7,7 @@ import {
IWithdrawalOrderRepository, WITHDRAWAL_ORDER_REPOSITORY,
IPendingRewardRepository, PENDING_REWARD_REPOSITORY,
} from '@/domain/repositories';
import { PrismaService } from '@/infrastructure/persistence/prisma/prisma.service';
import { LedgerEntry, DepositOrder, SettlementOrder, WithdrawalOrder, PendingReward, PendingRewardStatus } from '@/domain/aggregates';
import {
UserId, Money, Hashpower, LedgerEntryType, AssetType, ChainType, SettleCurrency,
@ -87,6 +88,7 @@ export class WalletApplicationService {
private readonly pendingRewardRepo: IPendingRewardRepository,
private readonly walletCacheService: WalletCacheService,
private readonly eventPublisher: EventPublisherService,
private readonly prisma: PrismaService,
) {}
// =============== Commands ===============
@ -1316,6 +1318,8 @@ export class WalletApplicationService {
*
* PENDING SETTLED
*
*
* 使 pending_rewards wallet_accounts
*/
async settleUserPendingRewards(accountSequence: string): Promise<{
settledCount: number;
@ -1331,44 +1335,77 @@ export class WalletApplicationService {
);
if (pendingRewards.length === 0) {
this.logger.debug(`[settleUserPendingRewards] No pending rewards for ${accountSequence}`);
return { settledCount: 0, totalUsdt: 0, totalHashpower: 0 };
}
let totalUsdt = 0;
let totalHashpower = 0;
// 标记为已结算
// 计算总金额
for (const reward of pendingRewards) {
reward.markAsSettled();
totalUsdt += reward.usdtAmount.value;
totalHashpower += reward.hashpowerAmount.value;
}
// 批量更新状态
await this.pendingRewardRepo.updateAll(pendingRewards);
this.logger.log(`[settleUserPendingRewards] Found ${pendingRewards.length} pending rewards, total: ${totalUsdt} USDT, ${totalHashpower} hashpower`);
// 更新钱包可结算余额
// 使用事务确保原子性pending_rewards 状态更新 + wallet_accounts 余额更新
await this.prisma.$transaction(async (tx) => {
// 1. 更新 pending_rewards 状态为 SETTLED
const now = new Date();
await tx.pendingReward.updateMany({
where: {
id: { in: pendingRewards.map(r => r.id) },
status: PendingRewardStatus.PENDING, // 双重检查,防止并发问题
},
data: {
status: PendingRewardStatus.SETTLED,
settledAt: now,
},
});
// 2. 更新 wallet_accounts 可结算余额
const walletRecord = await tx.walletAccount.findUnique({
where: { accountSequence },
});
if (walletRecord) {
await tx.walletAccount.update({
where: { accountSequence },
data: {
settleableUsdt: { increment: totalUsdt },
settleableHashpower: { increment: totalHashpower },
hashpower: { increment: totalHashpower },
// 同时减少 pending 余额
pendingUsdt: { decrement: totalUsdt },
pendingHashpower: { decrement: totalHashpower },
},
});
// 3. 记录流水
if (totalUsdt > 0) {
await tx.ledgerEntry.create({
data: {
accountSequence,
userId: walletRecord.userId,
entryType: LedgerEntryType.REWARD_TO_SETTLEABLE,
amount: totalUsdt,
assetType: 'USDT',
memo: `${pendingRewards.length} pending rewards settled`,
},
});
}
this.logger.log(`[settleUserPendingRewards] Transaction committed: ${pendingRewards.length} rewards settled for ${accountSequence}`);
} else {
this.logger.warn(`[settleUserPendingRewards] Wallet not found for ${accountSequence}, skipping wallet update`);
}
});
// 清除钱包缓存
const wallet = await this.walletRepo.findByAccountSequence(accountSequence);
if (wallet) {
// 将待领取转为可结算
wallet.addSettleableReward(
Money.USDT(totalUsdt),
Hashpower.create(totalHashpower),
);
await this.walletRepo.save(wallet);
// 记录流水
if (totalUsdt > 0) {
const ledgerEntry = LedgerEntry.create({
accountSequence,
userId: wallet.userId,
entryType: LedgerEntryType.REWARD_TO_SETTLEABLE,
amount: Money.USDT(totalUsdt),
memo: `${pendingRewards.length} pending rewards settled`,
});
await this.ledgerRepo.save(ledgerEntry);
}
await this.walletCacheService.invalidateWallet(wallet.userId.value);
}