feat(wallet): 增强认种流程可靠性 - 添加事务保护和乐观锁

## 问题背景

认种流程(冻结→确认扣款→解冻)存在以下可靠性问题:
1. 余额检查与冻结操作非原子性,存在并发竞态条件
2. 钱包更新与流水记录分开执行,可能导致数据不一致
3. 缺少乐观锁机制,并发修改时可能出现余额错误
4. Kafka consumer 错误被吞掉,消费失败无法重试

## 修复内容

### wallet-application.service.ts

1. **freezeForPlanting (冻结资金)**
   - 添加 `prisma.$transaction` 事务保护
   - 添加乐观锁 (version 字段检查)
   - 添加重试机制 (最多 3 次,指数退避)
   - 幂等性检查移入事务内,避免竞态

2. **confirmPlantingDeduction (确认扣款)**
   - 添加事务保护,确保扣款与流水原子性
   - 添加乐观锁防止并发修改
   - 添加重试机制

3. **unfreezeForPlanting (解冻资金)**
   - 添加事务保护,确保解冻与流水原子性
   - 添加乐观锁防止并发修改
   - 添加重试机制

### planting-event-consumer.service.ts

- 添加 `throw error` 重新抛出错误
- 确保消费失败时 Kafka 能感知并触发重试

## 乐观锁实现

```typescript
const updateResult = await tx.walletAccount.updateMany({
  where: {
    id: walletRecord.id,
    version: currentVersion,  // 版本检查
  },
  data: {
    usdtAvailable: newAvailable,
    version: currentVersion + 1,  // 版本递增
  },
});

if (updateResult.count === 0) {
  throw new OptimisticLockError('版本冲突');
}
```

## 测试验证

- wallet-service 构建成功
- 服务重启正常,所有 handler 注册成功

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2025-12-15 19:56:49 -08:00
parent 390bb1c22b
commit be7ec87f05
2 changed files with 376 additions and 177 deletions

View File

@ -303,230 +303,426 @@ export class WalletApplicationService {
/**
*
*
*
* 使 +
* 1.
* 2.
* 3. version
*/
async freezeForPlanting(command: FreezeForPlantingCommand): Promise<{
success: boolean;
frozenAmount: number;
}> {
const MAX_RETRIES = 3;
let retries = 0;
while (retries < MAX_RETRIES) {
try {
return await this.executeFreezeForPlanting(command);
} catch (error) {
if (this.isOptimisticLockError(error)) {
retries++;
this.logger.warn(`[freezeForPlanting] Optimistic lock conflict for ${command.orderId}, retry ${retries}/${MAX_RETRIES}`);
if (retries >= MAX_RETRIES) {
this.logger.error(`[freezeForPlanting] Max retries exceeded for ${command.orderId}`);
throw error;
}
await this.sleep(50 * retries);
} else {
throw error;
}
}
}
throw new Error('Unexpected: exited retry loop without result');
}
/**
* Execute freeze logic within a transaction
*/
private async executeFreezeForPlanting(command: FreezeForPlantingCommand): Promise<{
success: boolean;
frozenAmount: number;
}> {
this.logger.log(`[freezeForPlanting] ========== 开始处理 ==========`);
this.logger.log(`[freezeForPlanting] userId/accountSequence: ${command.userId}`);
this.logger.log(`[freezeForPlanting] amount: ${command.amount}`);
this.logger.log(`[freezeForPlanting] orderId: ${command.orderId}`);
const amount = Money.USDT(command.amount);
this.logger.log(`[freezeForPlanting] 解析后 amount.value: ${amount.value}`);
const Decimal = (await import('decimal.js')).default;
const amountDecimal = new Decimal(command.amount);
let walletUserId: bigint | null = null;
// 幂等性检查:通过 orderId 检查是否已经冻结
const existingEntries = await this.ledgerRepo.findByRefOrderId(command.orderId);
this.logger.log(`[freezeForPlanting] 已存在的流水条数: ${existingEntries.length}`);
const alreadyFrozen = existingEntries.some(
(entry) => entry.entryType === LedgerEntryType.PLANT_FREEZE,
);
if (alreadyFrozen) {
this.logger.warn(
`Order ${command.orderId} already frozen, returning success (idempotent)`,
);
return { success: true, frozenAmount: command.amount };
}
await this.prisma.$transaction(async (tx) => {
// 幂等性检查:通过 orderId 检查是否已经冻结
const existingEntry = await tx.ledgerEntry.findFirst({
where: {
refOrderId: command.orderId,
entryType: LedgerEntryType.PLANT_FREEZE,
},
});
// 优先按 accountSequence 查找,如果未找到则按 userId 查找
let wallet = await this.walletRepo.findByAccountSequence(command.userId);
if (!wallet) {
// 尝试将 userId 转换为 BigInt如果不是 accountSequence 格式)
const isAccountSequence = command.userId.startsWith('D');
if (!isAccountSequence) {
wallet = await this.walletRepo.findByUserId(BigInt(command.userId));
if (existingEntry) {
this.logger.warn(`Order ${command.orderId} already frozen, returning success (idempotent)`);
return; // 幂等返回
}
}
if (!wallet) {
this.logger.error(`[freezeForPlanting] 钱包不存在: userId/accountSequence=${command.userId}`);
throw new WalletNotFoundError(`userId/accountSequence: ${command.userId}`);
}
this.logger.log(`[freezeForPlanting] 钱包信息:`);
this.logger.log(`[freezeForPlanting] walletId: ${wallet.walletId}`);
this.logger.log(`[freezeForPlanting] accountSequence: ${wallet.accountSequence}`);
this.logger.log(`[freezeForPlanting] USDT可用余额: ${wallet.balances.usdt.available.value}`);
this.logger.log(`[freezeForPlanting] USDT冻结余额: ${wallet.balances.usdt.frozen.value}`);
// 查找钱包
const isAccountSequence = command.userId.startsWith('D');
const walletRecord = isAccountSequence
? await tx.walletAccount.findUnique({ where: { accountSequence: command.userId } })
: await tx.walletAccount.findFirst({ where: { userId: BigInt(command.userId) } });
// 检查余额是否足够
if (wallet.balances.usdt.available.lessThan(amount)) {
this.logger.error(`[freezeForPlanting] 余额不足!`);
this.logger.error(`[freezeForPlanting] 需要: ${command.amount} USDT`);
this.logger.error(`[freezeForPlanting] 当前可用: ${wallet.balances.usdt.available.value} USDT`);
throw new BadRequestException(
`余额不足: 需要 ${command.amount} USDT, 当前可用 ${wallet.balances.usdt.available.value} USDT`,
);
}
if (!walletRecord) {
this.logger.error(`[freezeForPlanting] 钱包不存在: userId/accountSequence=${command.userId}`);
throw new WalletNotFoundError(`userId/accountSequence: ${command.userId}`);
}
this.logger.log(`[freezeForPlanting] 余额检查通过,开始冻结...`);
walletUserId = walletRecord.userId;
const currentAvailable = new Decimal(walletRecord.usdtAvailable.toString());
const currentFrozen = new Decimal(walletRecord.usdtFrozen.toString());
const currentVersion = walletRecord.version;
// 冻结资金
wallet.freeze(amount);
await this.walletRepo.save(wallet);
this.logger.log(`[freezeForPlanting] 钱包已保存`);
this.logger.log(`[freezeForPlanting] 钱包信息:`);
this.logger.log(`[freezeForPlanting] walletId: ${walletRecord.id}`);
this.logger.log(`[freezeForPlanting] accountSequence: ${walletRecord.accountSequence}`);
this.logger.log(`[freezeForPlanting] USDT可用余额: ${currentAvailable.toString()}`);
this.logger.log(`[freezeForPlanting] USDT冻结余额: ${currentFrozen.toString()}`);
this.logger.log(`[freezeForPlanting] version: ${currentVersion}`);
// 记录冻结流水
const ledgerEntry = LedgerEntry.create({
accountSequence: wallet.accountSequence,
userId: wallet.userId,
entryType: LedgerEntryType.PLANT_FREEZE,
amount: Money.signed(-command.amount, 'USDT'), // Negative: 可用余额减少
balanceAfter: wallet.balances.usdt.available,
refOrderId: command.orderId,
memo: 'Plant freeze',
// 检查余额是否足够
if (currentAvailable.lessThan(amountDecimal)) {
this.logger.error(`[freezeForPlanting] 余额不足!`);
throw new BadRequestException(
`余额不足: 需要 ${command.amount} USDT, 当前可用 ${currentAvailable.toString()} USDT`,
);
}
// 计算新余额
const newAvailable = currentAvailable.minus(amountDecimal);
const newFrozen = currentFrozen.plus(amountDecimal);
// 使用乐观锁更新钱包
const updateResult = await tx.walletAccount.updateMany({
where: {
id: walletRecord.id,
version: currentVersion,
},
data: {
usdtAvailable: newAvailable,
usdtFrozen: newFrozen,
version: currentVersion + 1,
updatedAt: new Date(),
},
});
if (updateResult.count === 0) {
throw new OptimisticLockError(`Optimistic lock conflict for wallet ${walletRecord.id}`);
}
// 记录冻结流水
await tx.ledgerEntry.create({
data: {
accountSequence: walletRecord.accountSequence,
userId: walletRecord.userId,
entryType: LedgerEntryType.PLANT_FREEZE,
amount: amountDecimal.negated(), // Negative: 可用余额减少
assetType: 'USDT',
balanceAfter: newAvailable,
refOrderId: command.orderId,
memo: 'Plant freeze',
},
});
this.logger.log(`[freezeForPlanting] 成功冻结 ${command.amount} USDT (version: ${currentVersion} -> ${currentVersion + 1})`);
});
await this.ledgerRepo.save(ledgerEntry);
this.logger.log(`[freezeForPlanting] 流水已记录`);
await this.walletCacheService.invalidateWallet(wallet.userId.value);
// 事务成功后,使缓存失效
if (walletUserId) {
await this.walletCacheService.invalidateWallet(walletUserId);
}
this.logger.log(`[freezeForPlanting] 成功冻结 ${command.amount} USDT for order ${command.orderId}`);
this.logger.log(`[freezeForPlanting] 订单 ${command.orderId} 冻结完成`);
return { success: true, frozenAmount: command.amount };
}
/**
*
*
*
* 使 +
* 1.
* 2. version
*/
async confirmPlantingDeduction(command: ConfirmPlantingDeductionCommand): Promise<boolean> {
// 查找冻结记录,获取冻结金额
const existingEntries = await this.ledgerRepo.findByRefOrderId(command.orderId);
const MAX_RETRIES = 3;
let retries = 0;
// 幂等性检查:是否已经扣款
const alreadyDeducted = existingEntries.some(
(entry) => entry.entryType === LedgerEntryType.PLANT_PAYMENT,
);
if (alreadyDeducted) {
this.logger.warn(
`Order ${command.orderId} already confirmed deduction, returning success (idempotent)`,
);
return true;
}
// 查找冻结记录
const freezeEntry = existingEntries.find(
(entry) => entry.entryType === LedgerEntryType.PLANT_FREEZE,
);
if (!freezeEntry) {
throw new BadRequestException(`订单 ${command.orderId} 未找到冻结记录`);
}
// 获取冻结金额(流水中是负数,取绝对值)
const frozenAmount = Money.USDT(Math.abs(freezeEntry.amount.value));
// 优先按 accountSequence 查找,如果未找到则按 userId 查找
let wallet = await this.walletRepo.findByAccountSequence(command.userId);
if (!wallet) {
// 尝试将 userId 转换为 BigInt如果不是 accountSequence 格式)
const isAccountSequence = command.userId.startsWith('D');
if (!isAccountSequence) {
wallet = await this.walletRepo.findByUserId(BigInt(command.userId));
while (retries < MAX_RETRIES) {
try {
return await this.executeConfirmPlantingDeduction(command);
} catch (error) {
if (this.isOptimisticLockError(error)) {
retries++;
this.logger.warn(`[confirmPlantingDeduction] Optimistic lock conflict for ${command.orderId}, retry ${retries}/${MAX_RETRIES}`);
if (retries >= MAX_RETRIES) {
this.logger.error(`[confirmPlantingDeduction] Max retries exceeded for ${command.orderId}`);
throw error;
}
await this.sleep(50 * retries);
} else {
throw error;
}
}
}
if (!wallet) {
throw new WalletNotFoundError(`userId/accountSequence: ${command.userId}`);
throw new Error('Unexpected: exited retry loop without result');
}
/**
* Execute confirm deduction logic within a transaction
*/
private async executeConfirmPlantingDeduction(command: ConfirmPlantingDeductionCommand): Promise<boolean> {
const Decimal = (await import('decimal.js')).default;
let walletUserId: bigint | null = null;
await this.prisma.$transaction(async (tx) => {
// 幂等性检查:是否已经扣款
const existingPayment = await tx.ledgerEntry.findFirst({
where: {
refOrderId: command.orderId,
entryType: LedgerEntryType.PLANT_PAYMENT,
},
});
if (existingPayment) {
this.logger.warn(`Order ${command.orderId} already confirmed deduction, returning success (idempotent)`);
return; // 幂等返回
}
// 查找冻结记录
const freezeEntry = await tx.ledgerEntry.findFirst({
where: {
refOrderId: command.orderId,
entryType: LedgerEntryType.PLANT_FREEZE,
},
});
if (!freezeEntry) {
throw new BadRequestException(`订单 ${command.orderId} 未找到冻结记录`);
}
// 获取冻结金额(流水中是负数,取绝对值)
const frozenAmount = new Decimal(freezeEntry.amount.toString()).abs();
// 查找钱包
const isAccountSequence = command.userId.startsWith('D');
const walletRecord = isAccountSequence
? await tx.walletAccount.findUnique({ where: { accountSequence: command.userId } })
: await tx.walletAccount.findFirst({ where: { userId: BigInt(command.userId) } });
if (!walletRecord) {
throw new WalletNotFoundError(`userId/accountSequence: ${command.userId}`);
}
walletUserId = walletRecord.userId;
const currentFrozen = new Decimal(walletRecord.usdtFrozen.toString());
const currentVersion = walletRecord.version;
// 检查冻结余额是否足够
if (currentFrozen.lessThan(frozenAmount)) {
throw new BadRequestException(
`冻结余额不足: 需要 ${frozenAmount.toString()} USDT, 当前冻结 ${currentFrozen.toString()} USDT`,
);
}
// 计算新冻结余额(从冻结扣款,不影响可用)
const newFrozen = currentFrozen.minus(frozenAmount);
// 使用乐观锁更新钱包
const updateResult = await tx.walletAccount.updateMany({
where: {
id: walletRecord.id,
version: currentVersion,
},
data: {
usdtFrozen: newFrozen,
version: currentVersion + 1,
updatedAt: new Date(),
},
});
if (updateResult.count === 0) {
throw new OptimisticLockError(`Optimistic lock conflict for wallet ${walletRecord.id}`);
}
// 记录扣款流水
await tx.ledgerEntry.create({
data: {
accountSequence: walletRecord.accountSequence,
userId: walletRecord.userId,
entryType: LedgerEntryType.PLANT_PAYMENT,
amount: frozenAmount.negated(), // Negative: 冻结余额减少
assetType: 'USDT',
balanceAfter: new Decimal(walletRecord.usdtAvailable.toString()), // 可用余额不变
refOrderId: command.orderId,
memo: 'Plant payment (from frozen)',
},
});
this.logger.log(`[confirmPlantingDeduction] 成功扣款 ${frozenAmount.toString()} USDT (version: ${currentVersion} -> ${currentVersion + 1})`);
});
// 事务成功后,使缓存失效
if (walletUserId) {
await this.walletCacheService.invalidateWallet(walletUserId);
}
// 从冻结金额扣款
wallet.deductFrozen(frozenAmount, 'Plant payment confirmed', command.orderId);
await this.walletRepo.save(wallet);
// 记录扣款流水
const ledgerEntry = LedgerEntry.create({
accountSequence: wallet.accountSequence,
userId: wallet.userId,
entryType: LedgerEntryType.PLANT_PAYMENT,
amount: Money.signed(-frozenAmount.value, 'USDT'),
balanceAfter: wallet.balances.usdt.available,
refOrderId: command.orderId,
memo: 'Plant payment (from frozen)',
});
await this.ledgerRepo.save(ledgerEntry);
await this.walletCacheService.invalidateWallet(wallet.userId.value);
this.logger.log(`Confirmed deduction ${frozenAmount.value} USDT for order ${command.orderId}`);
this.logger.log(`[confirmPlantingDeduction] 订单 ${command.orderId} 扣款确认完成`);
return true;
}
/**
*
*
*
* 使 +
* 1.
* 2. version
*/
async unfreezeForPlanting(command: UnfreezeForPlantingCommand): Promise<boolean> {
// 查找相关流水
const existingEntries = await this.ledgerRepo.findByRefOrderId(command.orderId);
const MAX_RETRIES = 3;
let retries = 0;
// 幂等性检查:是否已经解冻
const alreadyUnfrozen = existingEntries.some(
(entry) => entry.entryType === LedgerEntryType.PLANT_UNFREEZE,
);
if (alreadyUnfrozen) {
this.logger.warn(
`Order ${command.orderId} already unfrozen, returning success (idempotent)`,
);
return true;
}
// 检查是否已经扣款(扣款后不能解冻)
const alreadyDeducted = existingEntries.some(
(entry) => entry.entryType === LedgerEntryType.PLANT_PAYMENT,
);
if (alreadyDeducted) {
this.logger.warn(
`Order ${command.orderId} already deducted, cannot unfreeze`,
);
throw new BadRequestException(`订单 ${command.orderId} 已扣款,无法解冻`);
}
// 查找冻结记录
const freezeEntry = existingEntries.find(
(entry) => entry.entryType === LedgerEntryType.PLANT_FREEZE,
);
if (!freezeEntry) {
// 没有冻结记录,可能从未冻结,直接返回成功
this.logger.warn(
`Order ${command.orderId} has no freeze record, returning success`,
);
return true;
}
// 获取冻结金额
const frozenAmount = Money.USDT(Math.abs(freezeEntry.amount.value));
// 优先按 accountSequence 查找,如果未找到则按 userId 查找
let wallet = await this.walletRepo.findByAccountSequence(command.userId);
if (!wallet) {
// 尝试将 userId 转换为 BigInt如果不是 accountSequence 格式)
const isAccountSequence = command.userId.startsWith('D');
if (!isAccountSequence) {
wallet = await this.walletRepo.findByUserId(BigInt(command.userId));
while (retries < MAX_RETRIES) {
try {
return await this.executeUnfreezeForPlanting(command);
} catch (error) {
if (this.isOptimisticLockError(error)) {
retries++;
this.logger.warn(`[unfreezeForPlanting] Optimistic lock conflict for ${command.orderId}, retry ${retries}/${MAX_RETRIES}`);
if (retries >= MAX_RETRIES) {
this.logger.error(`[unfreezeForPlanting] Max retries exceeded for ${command.orderId}`);
throw error;
}
await this.sleep(50 * retries);
} else {
throw error;
}
}
}
if (!wallet) {
throw new WalletNotFoundError(`userId/accountSequence: ${command.userId}`);
throw new Error('Unexpected: exited retry loop without result');
}
/**
* Execute unfreeze logic within a transaction
*/
private async executeUnfreezeForPlanting(command: UnfreezeForPlantingCommand): Promise<boolean> {
const Decimal = (await import('decimal.js')).default;
let walletUserId: bigint | null = null;
await this.prisma.$transaction(async (tx) => {
// 幂等性检查:是否已经解冻
const existingUnfreeze = await tx.ledgerEntry.findFirst({
where: {
refOrderId: command.orderId,
entryType: LedgerEntryType.PLANT_UNFREEZE,
},
});
if (existingUnfreeze) {
this.logger.warn(`Order ${command.orderId} already unfrozen, returning success (idempotent)`);
return; // 幂等返回
}
// 检查是否已经扣款(扣款后不能解冻)
const existingPayment = await tx.ledgerEntry.findFirst({
where: {
refOrderId: command.orderId,
entryType: LedgerEntryType.PLANT_PAYMENT,
},
});
if (existingPayment) {
this.logger.warn(`Order ${command.orderId} already deducted, cannot unfreeze`);
throw new BadRequestException(`订单 ${command.orderId} 已扣款,无法解冻`);
}
// 查找冻结记录
const freezeEntry = await tx.ledgerEntry.findFirst({
where: {
refOrderId: command.orderId,
entryType: LedgerEntryType.PLANT_FREEZE,
},
});
if (!freezeEntry) {
// 没有冻结记录,可能从未冻结,直接返回成功
this.logger.warn(`Order ${command.orderId} has no freeze record, returning success`);
return; // 幂等返回
}
// 获取冻结金额
const frozenAmount = new Decimal(freezeEntry.amount.toString()).abs();
// 查找钱包
const isAccountSequence = command.userId.startsWith('D');
const walletRecord = isAccountSequence
? await tx.walletAccount.findUnique({ where: { accountSequence: command.userId } })
: await tx.walletAccount.findFirst({ where: { userId: BigInt(command.userId) } });
if (!walletRecord) {
throw new WalletNotFoundError(`userId/accountSequence: ${command.userId}`);
}
walletUserId = walletRecord.userId;
const currentAvailable = new Decimal(walletRecord.usdtAvailable.toString());
const currentFrozen = new Decimal(walletRecord.usdtFrozen.toString());
const currentVersion = walletRecord.version;
// 计算新余额(解冻:冻结减少,可用增加)
const newAvailable = currentAvailable.plus(frozenAmount);
const newFrozen = currentFrozen.minus(frozenAmount);
// 使用乐观锁更新钱包
const updateResult = await tx.walletAccount.updateMany({
where: {
id: walletRecord.id,
version: currentVersion,
},
data: {
usdtAvailable: newAvailable,
usdtFrozen: newFrozen,
version: currentVersion + 1,
updatedAt: new Date(),
},
});
if (updateResult.count === 0) {
throw new OptimisticLockError(`Optimistic lock conflict for wallet ${walletRecord.id}`);
}
// 记录解冻流水
await tx.ledgerEntry.create({
data: {
accountSequence: walletRecord.accountSequence,
userId: walletRecord.userId,
entryType: LedgerEntryType.PLANT_UNFREEZE,
amount: frozenAmount, // Positive: 可用余额增加
assetType: 'USDT',
balanceAfter: newAvailable,
refOrderId: command.orderId,
memo: 'Plant unfreeze (rollback)',
},
});
this.logger.log(`[unfreezeForPlanting] 成功解冻 ${frozenAmount.toString()} USDT (version: ${currentVersion} -> ${currentVersion + 1})`);
});
// 事务成功后,使缓存失效
if (walletUserId) {
await this.walletCacheService.invalidateWallet(walletUserId);
}
// 解冻资金
wallet.unfreeze(frozenAmount);
await this.walletRepo.save(wallet);
// 记录解冻流水
const ledgerEntry = LedgerEntry.create({
accountSequence: wallet.accountSequence,
userId: wallet.userId,
entryType: LedgerEntryType.PLANT_UNFREEZE,
amount: frozenAmount, // Positive: 可用余额增加
balanceAfter: wallet.balances.usdt.available,
refOrderId: command.orderId,
memo: 'Plant unfreeze (rollback)',
});
await this.ledgerRepo.save(ledgerEntry);
await this.walletCacheService.invalidateWallet(wallet.userId.value);
this.logger.log(`Unfrozen ${frozenAmount.value} USDT for order ${command.orderId}`);
this.logger.log(`[unfreezeForPlanting] 订单 ${command.orderId} 解冻完成`);
return true;
}

View File

@ -138,6 +138,9 @@ export class PlantingEventConsumerService implements OnModuleInit, OnModuleDestr
}
} catch (error) {
this.logger.error(`[ERROR] Error processing planting event from ${topic}`, error);
// Re-throw to trigger Kafka retry mechanism
// This ensures messages are not marked as consumed until successfully processed
throw error;
}
},
});