From be7ec87f05495c038d79ced53301edfa4bd1eff7 Mon Sep 17 00:00:00 2001 From: hailin Date: Mon, 15 Dec 2025 19:56:49 -0800 Subject: [PATCH] =?UTF-8?q?feat(wallet):=20=E5=A2=9E=E5=BC=BA=E8=AE=A4?= =?UTF-8?q?=E7=A7=8D=E6=B5=81=E7=A8=8B=E5=8F=AF=E9=9D=A0=E6=80=A7=20-=20?= =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E4=BA=8B=E5=8A=A1=E4=BF=9D=E6=8A=A4=E5=92=8C?= =?UTF-8?q?=E4=B9=90=E8=A7=82=E9=94=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## 问题背景 认种流程(冻结→确认扣款→解冻)存在以下可靠性问题: 1. 余额检查与冻结操作非原子性,存在并发竞态条件 2. 钱包更新与流水记录分开执行,可能导致数据不一致 3. 缺少乐观锁机制,并发修改时可能出现余额错误 4. Kafka consumer 错误被吞掉,消费失败无法重试 ## 修复内容 ### wallet-application.service.ts 1. **freezeForPlanting (冻结资金)** - 添加 `prisma.$transaction` 事务保护 - 添加乐观锁 (version 字段检查) - 添加重试机制 (最多 3 次,指数退避) - 幂等性检查移入事务内,避免竞态 2. **confirmPlantingDeduction (确认扣款)** - 添加事务保护,确保扣款与流水原子性 - 添加乐观锁防止并发修改 - 添加重试机制 3. **unfreezeForPlanting (解冻资金)** - 添加事务保护,确保解冻与流水原子性 - 添加乐观锁防止并发修改 - 添加重试机制 ### planting-event-consumer.service.ts - 添加 `throw error` 重新抛出错误 - 确保消费失败时 Kafka 能感知并触发重试 ## 乐观锁实现 ```typescript const updateResult = await tx.walletAccount.updateMany({ where: { id: walletRecord.id, version: currentVersion, // 版本检查 }, data: { usdtAvailable: newAvailable, version: currentVersion + 1, // 版本递增 }, }); if (updateResult.count === 0) { throw new OptimisticLockError('版本冲突'); } ``` ## 测试验证 - wallet-service 构建成功 - 服务重启正常,所有 handler 注册成功 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../services/wallet-application.service.ts | 550 ++++++++++++------ .../kafka/planting-event-consumer.service.ts | 3 + 2 files changed, 376 insertions(+), 177 deletions(-) diff --git a/backend/services/wallet-service/src/application/services/wallet-application.service.ts b/backend/services/wallet-service/src/application/services/wallet-application.service.ts index 4ecd879d..5b885f74 100644 --- a/backend/services/wallet-service/src/application/services/wallet-application.service.ts +++ b/backend/services/wallet-service/src/application/services/wallet-application.service.ts @@ -303,230 +303,426 @@ export class WalletApplicationService { /** * 冻结资金用于认种 * 幂等设计:如果订单已冻结,直接返回成功 + * + * 使用数据库事务 + 乐观锁确保: + * 1. 余额检查和冻结操作的原子性 + * 2. 钱包更新和流水记录的一致性 + * 3. 并发安全(通过 version 字段) */ async freezeForPlanting(command: FreezeForPlantingCommand): Promise<{ success: boolean; frozenAmount: number; + }> { + const MAX_RETRIES = 3; + let retries = 0; + + while (retries < MAX_RETRIES) { + try { + return await this.executeFreezeForPlanting(command); + } catch (error) { + if (this.isOptimisticLockError(error)) { + retries++; + this.logger.warn(`[freezeForPlanting] Optimistic lock conflict for ${command.orderId}, retry ${retries}/${MAX_RETRIES}`); + if (retries >= MAX_RETRIES) { + this.logger.error(`[freezeForPlanting] Max retries exceeded for ${command.orderId}`); + throw error; + } + await this.sleep(50 * retries); + } else { + throw error; + } + } + } + throw new Error('Unexpected: exited retry loop without result'); + } + + /** + * Execute freeze logic within a transaction + */ + private async executeFreezeForPlanting(command: FreezeForPlantingCommand): Promise<{ + success: boolean; + frozenAmount: number; }> { this.logger.log(`[freezeForPlanting] ========== 开始处理 ==========`); this.logger.log(`[freezeForPlanting] userId/accountSequence: ${command.userId}`); this.logger.log(`[freezeForPlanting] amount: ${command.amount}`); this.logger.log(`[freezeForPlanting] orderId: ${command.orderId}`); - const amount = Money.USDT(command.amount); - this.logger.log(`[freezeForPlanting] 解析后 amount.value: ${amount.value}`); + const Decimal = (await import('decimal.js')).default; + const amountDecimal = new Decimal(command.amount); + let walletUserId: bigint | null = null; - // 幂等性检查:通过 orderId 检查是否已经冻结 - const existingEntries = await this.ledgerRepo.findByRefOrderId(command.orderId); - this.logger.log(`[freezeForPlanting] 已存在的流水条数: ${existingEntries.length}`); - const alreadyFrozen = existingEntries.some( - (entry) => entry.entryType === LedgerEntryType.PLANT_FREEZE, - ); - if (alreadyFrozen) { - this.logger.warn( - `Order ${command.orderId} already frozen, returning success (idempotent)`, - ); - return { success: true, frozenAmount: command.amount }; - } + await this.prisma.$transaction(async (tx) => { + // 幂等性检查:通过 orderId 检查是否已经冻结 + const existingEntry = await tx.ledgerEntry.findFirst({ + where: { + refOrderId: command.orderId, + entryType: LedgerEntryType.PLANT_FREEZE, + }, + }); - // 优先按 accountSequence 查找,如果未找到则按 userId 查找 - let wallet = await this.walletRepo.findByAccountSequence(command.userId); - if (!wallet) { - // 尝试将 userId 转换为 BigInt(如果不是 accountSequence 格式) - const isAccountSequence = command.userId.startsWith('D'); - if (!isAccountSequence) { - wallet = await this.walletRepo.findByUserId(BigInt(command.userId)); + if (existingEntry) { + this.logger.warn(`Order ${command.orderId} already frozen, returning success (idempotent)`); + return; // 幂等返回 } - } - if (!wallet) { - this.logger.error(`[freezeForPlanting] 钱包不存在: userId/accountSequence=${command.userId}`); - throw new WalletNotFoundError(`userId/accountSequence: ${command.userId}`); - } - this.logger.log(`[freezeForPlanting] 钱包信息:`); - this.logger.log(`[freezeForPlanting] walletId: ${wallet.walletId}`); - this.logger.log(`[freezeForPlanting] accountSequence: ${wallet.accountSequence}`); - this.logger.log(`[freezeForPlanting] USDT可用余额: ${wallet.balances.usdt.available.value}`); - this.logger.log(`[freezeForPlanting] USDT冻结余额: ${wallet.balances.usdt.frozen.value}`); + // 查找钱包 + const isAccountSequence = command.userId.startsWith('D'); + const walletRecord = isAccountSequence + ? await tx.walletAccount.findUnique({ where: { accountSequence: command.userId } }) + : await tx.walletAccount.findFirst({ where: { userId: BigInt(command.userId) } }); - // 检查余额是否足够 - if (wallet.balances.usdt.available.lessThan(amount)) { - this.logger.error(`[freezeForPlanting] 余额不足!`); - this.logger.error(`[freezeForPlanting] 需要: ${command.amount} USDT`); - this.logger.error(`[freezeForPlanting] 当前可用: ${wallet.balances.usdt.available.value} USDT`); - throw new BadRequestException( - `余额不足: 需要 ${command.amount} USDT, 当前可用 ${wallet.balances.usdt.available.value} USDT`, - ); - } + if (!walletRecord) { + this.logger.error(`[freezeForPlanting] 钱包不存在: userId/accountSequence=${command.userId}`); + throw new WalletNotFoundError(`userId/accountSequence: ${command.userId}`); + } - this.logger.log(`[freezeForPlanting] 余额检查通过,开始冻结...`); + walletUserId = walletRecord.userId; + const currentAvailable = new Decimal(walletRecord.usdtAvailable.toString()); + const currentFrozen = new Decimal(walletRecord.usdtFrozen.toString()); + const currentVersion = walletRecord.version; - // 冻结资金 - wallet.freeze(amount); - await this.walletRepo.save(wallet); - this.logger.log(`[freezeForPlanting] 钱包已保存`); + this.logger.log(`[freezeForPlanting] 钱包信息:`); + this.logger.log(`[freezeForPlanting] walletId: ${walletRecord.id}`); + this.logger.log(`[freezeForPlanting] accountSequence: ${walletRecord.accountSequence}`); + this.logger.log(`[freezeForPlanting] USDT可用余额: ${currentAvailable.toString()}`); + this.logger.log(`[freezeForPlanting] USDT冻结余额: ${currentFrozen.toString()}`); + this.logger.log(`[freezeForPlanting] version: ${currentVersion}`); - // 记录冻结流水 - const ledgerEntry = LedgerEntry.create({ - accountSequence: wallet.accountSequence, - userId: wallet.userId, - entryType: LedgerEntryType.PLANT_FREEZE, - amount: Money.signed(-command.amount, 'USDT'), // Negative: 可用余额减少 - balanceAfter: wallet.balances.usdt.available, - refOrderId: command.orderId, - memo: 'Plant freeze', + // 检查余额是否足够 + if (currentAvailable.lessThan(amountDecimal)) { + this.logger.error(`[freezeForPlanting] 余额不足!`); + throw new BadRequestException( + `余额不足: 需要 ${command.amount} USDT, 当前可用 ${currentAvailable.toString()} USDT`, + ); + } + + // 计算新余额 + const newAvailable = currentAvailable.minus(amountDecimal); + const newFrozen = currentFrozen.plus(amountDecimal); + + // 使用乐观锁更新钱包 + const updateResult = await tx.walletAccount.updateMany({ + where: { + id: walletRecord.id, + version: currentVersion, + }, + data: { + usdtAvailable: newAvailable, + usdtFrozen: newFrozen, + version: currentVersion + 1, + updatedAt: new Date(), + }, + }); + + if (updateResult.count === 0) { + throw new OptimisticLockError(`Optimistic lock conflict for wallet ${walletRecord.id}`); + } + + // 记录冻结流水 + await tx.ledgerEntry.create({ + data: { + accountSequence: walletRecord.accountSequence, + userId: walletRecord.userId, + entryType: LedgerEntryType.PLANT_FREEZE, + amount: amountDecimal.negated(), // Negative: 可用余额减少 + assetType: 'USDT', + balanceAfter: newAvailable, + refOrderId: command.orderId, + memo: 'Plant freeze', + }, + }); + + this.logger.log(`[freezeForPlanting] 成功冻结 ${command.amount} USDT (version: ${currentVersion} -> ${currentVersion + 1})`); }); - await this.ledgerRepo.save(ledgerEntry); - this.logger.log(`[freezeForPlanting] 流水已记录`); - await this.walletCacheService.invalidateWallet(wallet.userId.value); + // 事务成功后,使缓存失效 + if (walletUserId) { + await this.walletCacheService.invalidateWallet(walletUserId); + } - this.logger.log(`[freezeForPlanting] 成功冻结 ${command.amount} USDT for order ${command.orderId}`); + this.logger.log(`[freezeForPlanting] 订单 ${command.orderId} 冻结完成`); return { success: true, frozenAmount: command.amount }; } /** * 确认认种扣款(从冻结金额中正式扣除) * 幂等设计:如果订单已确认扣款,直接返回成功 + * + * 使用数据库事务 + 乐观锁确保: + * 1. 钱包冻结余额扣除和流水记录的一致性 + * 2. 并发安全(通过 version 字段) */ async confirmPlantingDeduction(command: ConfirmPlantingDeductionCommand): Promise { - // 查找冻结记录,获取冻结金额 - const existingEntries = await this.ledgerRepo.findByRefOrderId(command.orderId); + const MAX_RETRIES = 3; + let retries = 0; - // 幂等性检查:是否已经扣款 - const alreadyDeducted = existingEntries.some( - (entry) => entry.entryType === LedgerEntryType.PLANT_PAYMENT, - ); - if (alreadyDeducted) { - this.logger.warn( - `Order ${command.orderId} already confirmed deduction, returning success (idempotent)`, - ); - return true; - } - - // 查找冻结记录 - const freezeEntry = existingEntries.find( - (entry) => entry.entryType === LedgerEntryType.PLANT_FREEZE, - ); - if (!freezeEntry) { - throw new BadRequestException(`订单 ${command.orderId} 未找到冻结记录`); - } - - // 获取冻结金额(流水中是负数,取绝对值) - const frozenAmount = Money.USDT(Math.abs(freezeEntry.amount.value)); - - // 优先按 accountSequence 查找,如果未找到则按 userId 查找 - let wallet = await this.walletRepo.findByAccountSequence(command.userId); - if (!wallet) { - // 尝试将 userId 转换为 BigInt(如果不是 accountSequence 格式) - const isAccountSequence = command.userId.startsWith('D'); - if (!isAccountSequence) { - wallet = await this.walletRepo.findByUserId(BigInt(command.userId)); + while (retries < MAX_RETRIES) { + try { + return await this.executeConfirmPlantingDeduction(command); + } catch (error) { + if (this.isOptimisticLockError(error)) { + retries++; + this.logger.warn(`[confirmPlantingDeduction] Optimistic lock conflict for ${command.orderId}, retry ${retries}/${MAX_RETRIES}`); + if (retries >= MAX_RETRIES) { + this.logger.error(`[confirmPlantingDeduction] Max retries exceeded for ${command.orderId}`); + throw error; + } + await this.sleep(50 * retries); + } else { + throw error; + } } } - if (!wallet) { - throw new WalletNotFoundError(`userId/accountSequence: ${command.userId}`); + throw new Error('Unexpected: exited retry loop without result'); + } + + /** + * Execute confirm deduction logic within a transaction + */ + private async executeConfirmPlantingDeduction(command: ConfirmPlantingDeductionCommand): Promise { + const Decimal = (await import('decimal.js')).default; + let walletUserId: bigint | null = null; + + await this.prisma.$transaction(async (tx) => { + // 幂等性检查:是否已经扣款 + const existingPayment = await tx.ledgerEntry.findFirst({ + where: { + refOrderId: command.orderId, + entryType: LedgerEntryType.PLANT_PAYMENT, + }, + }); + + if (existingPayment) { + this.logger.warn(`Order ${command.orderId} already confirmed deduction, returning success (idempotent)`); + return; // 幂等返回 + } + + // 查找冻结记录 + const freezeEntry = await tx.ledgerEntry.findFirst({ + where: { + refOrderId: command.orderId, + entryType: LedgerEntryType.PLANT_FREEZE, + }, + }); + + if (!freezeEntry) { + throw new BadRequestException(`订单 ${command.orderId} 未找到冻结记录`); + } + + // 获取冻结金额(流水中是负数,取绝对值) + const frozenAmount = new Decimal(freezeEntry.amount.toString()).abs(); + + // 查找钱包 + const isAccountSequence = command.userId.startsWith('D'); + const walletRecord = isAccountSequence + ? await tx.walletAccount.findUnique({ where: { accountSequence: command.userId } }) + : await tx.walletAccount.findFirst({ where: { userId: BigInt(command.userId) } }); + + if (!walletRecord) { + throw new WalletNotFoundError(`userId/accountSequence: ${command.userId}`); + } + + walletUserId = walletRecord.userId; + const currentFrozen = new Decimal(walletRecord.usdtFrozen.toString()); + const currentVersion = walletRecord.version; + + // 检查冻结余额是否足够 + if (currentFrozen.lessThan(frozenAmount)) { + throw new BadRequestException( + `冻结余额不足: 需要 ${frozenAmount.toString()} USDT, 当前冻结 ${currentFrozen.toString()} USDT`, + ); + } + + // 计算新冻结余额(从冻结扣款,不影响可用) + const newFrozen = currentFrozen.minus(frozenAmount); + + // 使用乐观锁更新钱包 + const updateResult = await tx.walletAccount.updateMany({ + where: { + id: walletRecord.id, + version: currentVersion, + }, + data: { + usdtFrozen: newFrozen, + version: currentVersion + 1, + updatedAt: new Date(), + }, + }); + + if (updateResult.count === 0) { + throw new OptimisticLockError(`Optimistic lock conflict for wallet ${walletRecord.id}`); + } + + // 记录扣款流水 + await tx.ledgerEntry.create({ + data: { + accountSequence: walletRecord.accountSequence, + userId: walletRecord.userId, + entryType: LedgerEntryType.PLANT_PAYMENT, + amount: frozenAmount.negated(), // Negative: 冻结余额减少 + assetType: 'USDT', + balanceAfter: new Decimal(walletRecord.usdtAvailable.toString()), // 可用余额不变 + refOrderId: command.orderId, + memo: 'Plant payment (from frozen)', + }, + }); + + this.logger.log(`[confirmPlantingDeduction] 成功扣款 ${frozenAmount.toString()} USDT (version: ${currentVersion} -> ${currentVersion + 1})`); + }); + + // 事务成功后,使缓存失效 + if (walletUserId) { + await this.walletCacheService.invalidateWallet(walletUserId); } - // 从冻结金额扣款 - wallet.deductFrozen(frozenAmount, 'Plant payment confirmed', command.orderId); - await this.walletRepo.save(wallet); - - // 记录扣款流水 - const ledgerEntry = LedgerEntry.create({ - accountSequence: wallet.accountSequence, - userId: wallet.userId, - entryType: LedgerEntryType.PLANT_PAYMENT, - amount: Money.signed(-frozenAmount.value, 'USDT'), - balanceAfter: wallet.balances.usdt.available, - refOrderId: command.orderId, - memo: 'Plant payment (from frozen)', - }); - await this.ledgerRepo.save(ledgerEntry); - - await this.walletCacheService.invalidateWallet(wallet.userId.value); - - this.logger.log(`Confirmed deduction ${frozenAmount.value} USDT for order ${command.orderId}`); + this.logger.log(`[confirmPlantingDeduction] 订单 ${command.orderId} 扣款确认完成`); return true; } /** * 解冻认种资金(认种失败时回滚) * 幂等设计:如果订单已解冻或未冻结,直接返回成功 + * + * 使用数据库事务 + 乐观锁确保: + * 1. 钱包解冻和流水记录的一致性 + * 2. 并发安全(通过 version 字段) */ async unfreezeForPlanting(command: UnfreezeForPlantingCommand): Promise { - // 查找相关流水 - const existingEntries = await this.ledgerRepo.findByRefOrderId(command.orderId); + const MAX_RETRIES = 3; + let retries = 0; - // 幂等性检查:是否已经解冻 - const alreadyUnfrozen = existingEntries.some( - (entry) => entry.entryType === LedgerEntryType.PLANT_UNFREEZE, - ); - if (alreadyUnfrozen) { - this.logger.warn( - `Order ${command.orderId} already unfrozen, returning success (idempotent)`, - ); - return true; - } - - // 检查是否已经扣款(扣款后不能解冻) - const alreadyDeducted = existingEntries.some( - (entry) => entry.entryType === LedgerEntryType.PLANT_PAYMENT, - ); - if (alreadyDeducted) { - this.logger.warn( - `Order ${command.orderId} already deducted, cannot unfreeze`, - ); - throw new BadRequestException(`订单 ${command.orderId} 已扣款,无法解冻`); - } - - // 查找冻结记录 - const freezeEntry = existingEntries.find( - (entry) => entry.entryType === LedgerEntryType.PLANT_FREEZE, - ); - if (!freezeEntry) { - // 没有冻结记录,可能从未冻结,直接返回成功 - this.logger.warn( - `Order ${command.orderId} has no freeze record, returning success`, - ); - return true; - } - - // 获取冻结金额 - const frozenAmount = Money.USDT(Math.abs(freezeEntry.amount.value)); - - // 优先按 accountSequence 查找,如果未找到则按 userId 查找 - let wallet = await this.walletRepo.findByAccountSequence(command.userId); - if (!wallet) { - // 尝试将 userId 转换为 BigInt(如果不是 accountSequence 格式) - const isAccountSequence = command.userId.startsWith('D'); - if (!isAccountSequence) { - wallet = await this.walletRepo.findByUserId(BigInt(command.userId)); + while (retries < MAX_RETRIES) { + try { + return await this.executeUnfreezeForPlanting(command); + } catch (error) { + if (this.isOptimisticLockError(error)) { + retries++; + this.logger.warn(`[unfreezeForPlanting] Optimistic lock conflict for ${command.orderId}, retry ${retries}/${MAX_RETRIES}`); + if (retries >= MAX_RETRIES) { + this.logger.error(`[unfreezeForPlanting] Max retries exceeded for ${command.orderId}`); + throw error; + } + await this.sleep(50 * retries); + } else { + throw error; + } } } - if (!wallet) { - throw new WalletNotFoundError(`userId/accountSequence: ${command.userId}`); + throw new Error('Unexpected: exited retry loop without result'); + } + + /** + * Execute unfreeze logic within a transaction + */ + private async executeUnfreezeForPlanting(command: UnfreezeForPlantingCommand): Promise { + const Decimal = (await import('decimal.js')).default; + let walletUserId: bigint | null = null; + + await this.prisma.$transaction(async (tx) => { + // 幂等性检查:是否已经解冻 + const existingUnfreeze = await tx.ledgerEntry.findFirst({ + where: { + refOrderId: command.orderId, + entryType: LedgerEntryType.PLANT_UNFREEZE, + }, + }); + + if (existingUnfreeze) { + this.logger.warn(`Order ${command.orderId} already unfrozen, returning success (idempotent)`); + return; // 幂等返回 + } + + // 检查是否已经扣款(扣款后不能解冻) + const existingPayment = await tx.ledgerEntry.findFirst({ + where: { + refOrderId: command.orderId, + entryType: LedgerEntryType.PLANT_PAYMENT, + }, + }); + + if (existingPayment) { + this.logger.warn(`Order ${command.orderId} already deducted, cannot unfreeze`); + throw new BadRequestException(`订单 ${command.orderId} 已扣款,无法解冻`); + } + + // 查找冻结记录 + const freezeEntry = await tx.ledgerEntry.findFirst({ + where: { + refOrderId: command.orderId, + entryType: LedgerEntryType.PLANT_FREEZE, + }, + }); + + if (!freezeEntry) { + // 没有冻结记录,可能从未冻结,直接返回成功 + this.logger.warn(`Order ${command.orderId} has no freeze record, returning success`); + return; // 幂等返回 + } + + // 获取冻结金额 + const frozenAmount = new Decimal(freezeEntry.amount.toString()).abs(); + + // 查找钱包 + const isAccountSequence = command.userId.startsWith('D'); + const walletRecord = isAccountSequence + ? await tx.walletAccount.findUnique({ where: { accountSequence: command.userId } }) + : await tx.walletAccount.findFirst({ where: { userId: BigInt(command.userId) } }); + + if (!walletRecord) { + throw new WalletNotFoundError(`userId/accountSequence: ${command.userId}`); + } + + walletUserId = walletRecord.userId; + const currentAvailable = new Decimal(walletRecord.usdtAvailable.toString()); + const currentFrozen = new Decimal(walletRecord.usdtFrozen.toString()); + const currentVersion = walletRecord.version; + + // 计算新余额(解冻:冻结减少,可用增加) + const newAvailable = currentAvailable.plus(frozenAmount); + const newFrozen = currentFrozen.minus(frozenAmount); + + // 使用乐观锁更新钱包 + const updateResult = await tx.walletAccount.updateMany({ + where: { + id: walletRecord.id, + version: currentVersion, + }, + data: { + usdtAvailable: newAvailable, + usdtFrozen: newFrozen, + version: currentVersion + 1, + updatedAt: new Date(), + }, + }); + + if (updateResult.count === 0) { + throw new OptimisticLockError(`Optimistic lock conflict for wallet ${walletRecord.id}`); + } + + // 记录解冻流水 + await tx.ledgerEntry.create({ + data: { + accountSequence: walletRecord.accountSequence, + userId: walletRecord.userId, + entryType: LedgerEntryType.PLANT_UNFREEZE, + amount: frozenAmount, // Positive: 可用余额增加 + assetType: 'USDT', + balanceAfter: newAvailable, + refOrderId: command.orderId, + memo: 'Plant unfreeze (rollback)', + }, + }); + + this.logger.log(`[unfreezeForPlanting] 成功解冻 ${frozenAmount.toString()} USDT (version: ${currentVersion} -> ${currentVersion + 1})`); + }); + + // 事务成功后,使缓存失效 + if (walletUserId) { + await this.walletCacheService.invalidateWallet(walletUserId); } - // 解冻资金 - wallet.unfreeze(frozenAmount); - await this.walletRepo.save(wallet); - - // 记录解冻流水 - const ledgerEntry = LedgerEntry.create({ - accountSequence: wallet.accountSequence, - userId: wallet.userId, - entryType: LedgerEntryType.PLANT_UNFREEZE, - amount: frozenAmount, // Positive: 可用余额增加 - balanceAfter: wallet.balances.usdt.available, - refOrderId: command.orderId, - memo: 'Plant unfreeze (rollback)', - }); - await this.ledgerRepo.save(ledgerEntry); - - await this.walletCacheService.invalidateWallet(wallet.userId.value); - - this.logger.log(`Unfrozen ${frozenAmount.value} USDT for order ${command.orderId}`); + this.logger.log(`[unfreezeForPlanting] 订单 ${command.orderId} 解冻完成`); return true; } diff --git a/backend/services/wallet-service/src/infrastructure/kafka/planting-event-consumer.service.ts b/backend/services/wallet-service/src/infrastructure/kafka/planting-event-consumer.service.ts index 202ead2e..beb87b03 100644 --- a/backend/services/wallet-service/src/infrastructure/kafka/planting-event-consumer.service.ts +++ b/backend/services/wallet-service/src/infrastructure/kafka/planting-event-consumer.service.ts @@ -138,6 +138,9 @@ export class PlantingEventConsumerService implements OnModuleInit, OnModuleDestr } } catch (error) { this.logger.error(`[ERROR] Error processing planting event from ${topic}`, error); + // Re-throw to trigger Kafka retry mechanism + // This ensures messages are not marked as consumed until successfully processed + throw error; } }, });