import { Injectable, Logger, OnModuleInit, Inject } from '@nestjs/common'; import { WithdrawalEventConsumerService, WithdrawalConfirmedPayload, WithdrawalFailedPayload, } from '@/infrastructure/kafka/withdrawal-event-consumer.service'; import { IWithdrawalOrderRepository, WITHDRAWAL_ORDER_REPOSITORY, IWalletAccountRepository, WALLET_ACCOUNT_REPOSITORY, ILedgerEntryRepository, LEDGER_ENTRY_REPOSITORY, } from '@/domain/repositories'; import { PrismaService } from '@/infrastructure/persistence/prisma/prisma.service'; import { WithdrawalOrder, WalletAccount, LedgerEntry } from '@/domain/aggregates'; import { WithdrawalStatus, Money, UserId, LedgerEntryType } from '@/domain/value-objects'; import { OptimisticLockError } from '@/shared/exceptions/domain.exception'; import Decimal from 'decimal.js'; /** * Withdrawal Status Handler * * Handles withdrawal status events from blockchain-service. * Updates withdrawal order status and handles fund refunds on failure. * * IMPORTANT: * - All operations use database transactions for atomicity. * - Wallet balance updates use optimistic locking to prevent concurrent modification issues. */ @Injectable() export class WithdrawalStatusHandler implements OnModuleInit { private readonly logger = new Logger(WithdrawalStatusHandler.name); // Max retry count for optimistic lock conflicts private readonly MAX_RETRIES = 3; constructor( private readonly withdrawalEventConsumer: WithdrawalEventConsumerService, @Inject(WITHDRAWAL_ORDER_REPOSITORY) private readonly withdrawalRepo: IWithdrawalOrderRepository, @Inject(WALLET_ACCOUNT_REPOSITORY) private readonly walletRepo: IWalletAccountRepository, @Inject(LEDGER_ENTRY_REPOSITORY) private readonly ledgerRepo: ILedgerEntryRepository, private readonly prisma: PrismaService, ) {} onModuleInit() { this.withdrawalEventConsumer.onWithdrawalConfirmed( this.handleWithdrawalConfirmed.bind(this), ); this.withdrawalEventConsumer.onWithdrawalFailed( this.handleWithdrawalFailed.bind(this), ); this.logger.log(`[INIT] WithdrawalStatusHandler registered`); } /** * Handle withdrawal confirmed event * Update order status to CONFIRMED, store txHash, and deduct frozen balance * * Uses database transaction + optimistic locking to ensure atomicity and prevent race conditions. */ private async handleWithdrawalConfirmed( payload: WithdrawalConfirmedPayload, ): Promise { this.logger.log(`[CONFIRMED] Processing withdrawal confirmation`); this.logger.log(`[CONFIRMED] orderNo: ${payload.orderNo}`); this.logger.log(`[CONFIRMED] txHash: ${payload.txHash}`); let retries = 0; while (retries < this.MAX_RETRIES) { try { await this.executeWithdrawalConfirmed(payload); return; // Success, exit } catch (error) { if (this.isOptimisticLockError(error)) { retries++; this.logger.warn(`[CONFIRMED] Optimistic lock conflict for ${payload.orderNo}, retry ${retries}/${this.MAX_RETRIES}`); if (retries >= this.MAX_RETRIES) { this.logger.error(`[CONFIRMED] Max retries exceeded for ${payload.orderNo}`); throw error; } // Brief delay before retry await this.sleep(50 * retries); } else { throw error; } } } } /** * Execute the withdrawal confirmed logic within a transaction */ private async executeWithdrawalConfirmed( payload: WithdrawalConfirmedPayload, ): Promise { try { // Use transaction to ensure atomicity await this.prisma.$transaction(async (tx) => { // Find the withdrawal order const orderRecord = await tx.withdrawalOrder.findUnique({ where: { orderNo: payload.orderNo }, }); if (!orderRecord) { this.logger.error(`[CONFIRMED] Order not found: ${payload.orderNo}`); return; } // Check if already confirmed (idempotency) if (orderRecord.status === WithdrawalStatus.CONFIRMED) { this.logger.log(`[CONFIRMED] Order ${payload.orderNo} already confirmed, skipping`); return; } // Determine new status based on current status let newStatus = orderRecord.status; let txHash = orderRecord.txHash; let broadcastedAt = orderRecord.broadcastedAt; let confirmedAt = orderRecord.confirmedAt; // FROZEN -> BROADCASTED -> CONFIRMED if (orderRecord.status === WithdrawalStatus.FROZEN) { newStatus = WithdrawalStatus.BROADCASTED; txHash = payload.txHash; broadcastedAt = new Date(); } if (newStatus === WithdrawalStatus.BROADCASTED || orderRecord.status === WithdrawalStatus.BROADCASTED) { newStatus = WithdrawalStatus.CONFIRMED; confirmedAt = new Date(); } // Update order status await tx.withdrawalOrder.update({ where: { id: orderRecord.id }, data: { status: newStatus, txHash, broadcastedAt, confirmedAt, }, }); // Find wallet and deduct frozen balance with optimistic lock let walletRecord = await tx.walletAccount.findUnique({ where: { accountSequence: orderRecord.accountSequence }, }); if (!walletRecord) { walletRecord = await tx.walletAccount.findUnique({ where: { userId: orderRecord.userId }, }); } if (walletRecord) { // Deduct the total frozen amount (amount + fee) const totalAmount = new Decimal(orderRecord.amount.toString()).add(new Decimal(orderRecord.fee.toString())); const currentFrozen = new Decimal(walletRecord.usdtFrozen.toString()); if (currentFrozen.lessThan(totalAmount)) { this.logger.error(`[CONFIRMED] Insufficient frozen balance: have ${currentFrozen}, need ${totalAmount}`); throw new Error(`Insufficient frozen balance for withdrawal ${payload.orderNo}`); } const newFrozen = currentFrozen.minus(totalAmount); const currentVersion = walletRecord.version; // Optimistic lock: update only if version matches const updateResult = await tx.walletAccount.updateMany({ where: { id: walletRecord.id, version: currentVersion, // Optimistic lock condition }, data: { usdtFrozen: newFrozen, version: currentVersion + 1, // Increment version updatedAt: new Date(), }, }); if (updateResult.count === 0) { // Version mismatch - another transaction modified the record throw new OptimisticLockError(`Optimistic lock conflict for wallet ${walletRecord.id}`); } this.logger.log(`[CONFIRMED] Deducted ${totalAmount.toString()} USDT from frozen balance for ${orderRecord.accountSequence} (version: ${currentVersion} -> ${currentVersion + 1})`); // 记录流水:根据是否内部转账决定流水类型 if (orderRecord.isInternalTransfer && orderRecord.toAccountSequence) { // 内部转账:给转出方记录 TRANSFER_OUT await tx.ledgerEntry.create({ data: { accountSequence: orderRecord.accountSequence, userId: orderRecord.userId, entryType: LedgerEntryType.TRANSFER_OUT, amount: new Decimal(orderRecord.amount.toString()).negated(), assetType: 'USDT', balanceAfter: walletRecord.usdtAvailable, // 冻结余额扣除后可用余额不变 refOrderId: orderRecord.orderNo, refTxHash: payload.txHash, memo: `转账至 ${orderRecord.toAccountSequence}`, payloadJson: { toAccountSequence: orderRecord.toAccountSequence, toUserId: orderRecord.toUserId?.toString(), fee: orderRecord.fee.toString(), }, }, }); // 内部转账:给接收方记录 TRANSFER_IN 并增加余额 if (orderRecord.toUserId) { // 查找接收方钱包 let toWalletRecord = await tx.walletAccount.findUnique({ where: { accountSequence: orderRecord.toAccountSequence }, }); if (!toWalletRecord) { toWalletRecord = await tx.walletAccount.findUnique({ where: { userId: orderRecord.toUserId }, }); } // 如果接收方钱包不存在,自动创建(使用 upsert 避免并发问题) if (!toWalletRecord) { this.logger.log(`[CONFIRMED] Receiver wallet not found, auto-creating for: ${orderRecord.toAccountSequence}`); toWalletRecord = await tx.walletAccount.upsert({ where: { accountSequence: orderRecord.toAccountSequence }, create: { accountSequence: orderRecord.toAccountSequence, userId: orderRecord.toUserId, usdtAvailable: new Decimal(0), usdtFrozen: new Decimal(0), dstAvailable: new Decimal(0), dstFrozen: new Decimal(0), bnbAvailable: new Decimal(0), bnbFrozen: new Decimal(0), ogAvailable: new Decimal(0), ogFrozen: new Decimal(0), rwadAvailable: new Decimal(0), rwadFrozen: new Decimal(0), hashpower: new Decimal(0), pendingUsdt: new Decimal(0), pendingHashpower: new Decimal(0), settleableUsdt: new Decimal(0), settleableHashpower: new Decimal(0), settledTotalUsdt: new Decimal(0), settledTotalHashpower: new Decimal(0), expiredTotalUsdt: new Decimal(0), expiredTotalHashpower: new Decimal(0), status: 'ACTIVE', hasPlanted: false, version: 0, }, update: {}, // 如果已存在,不做任何更新 }); this.logger.log(`[CONFIRMED] Auto-created/found wallet for receiver: ${orderRecord.toAccountSequence} (id=${toWalletRecord.id})`); } const transferAmount = new Decimal(orderRecord.amount.toString()); const toCurrentAvailable = new Decimal(toWalletRecord.usdtAvailable.toString()); const toNewAvailable = toCurrentAvailable.add(transferAmount); const toCurrentVersion = toWalletRecord.version; // 更新接收方余额 const toUpdateResult = await tx.walletAccount.updateMany({ where: { id: toWalletRecord.id, version: toCurrentVersion, }, data: { usdtAvailable: toNewAvailable, version: toCurrentVersion + 1, updatedAt: new Date(), }, }); if (toUpdateResult.count === 0) { throw new OptimisticLockError(`Optimistic lock conflict for receiver wallet ${toWalletRecord.id}`); } // 给接收方记录 TRANSFER_IN 流水 await tx.ledgerEntry.create({ data: { accountSequence: orderRecord.toAccountSequence, userId: orderRecord.toUserId, entryType: LedgerEntryType.TRANSFER_IN, amount: transferAmount, assetType: 'USDT', balanceAfter: toNewAvailable, refOrderId: orderRecord.orderNo, refTxHash: payload.txHash, memo: `来自 ${orderRecord.accountSequence} 的转账`, payloadJson: { fromAccountSequence: orderRecord.accountSequence, fromUserId: orderRecord.userId.toString(), }, }, }); this.logger.log(`[CONFIRMED] Internal transfer: ${orderRecord.accountSequence} -> ${orderRecord.toAccountSequence}, amount: ${transferAmount.toString()}`); } } else { // 普通提现:记录 WITHDRAWAL await tx.ledgerEntry.create({ data: { accountSequence: orderRecord.accountSequence, userId: orderRecord.userId, entryType: LedgerEntryType.WITHDRAWAL, amount: new Decimal(orderRecord.amount.toString()).negated(), assetType: 'USDT', balanceAfter: walletRecord.usdtAvailable, refOrderId: orderRecord.orderNo, refTxHash: payload.txHash, memo: `提现至 ${orderRecord.toAddress}`, payloadJson: { toAddress: orderRecord.toAddress, chainType: orderRecord.chainType, fee: orderRecord.fee.toString(), }, }, }); } } else { this.logger.error(`[CONFIRMED] Wallet not found for accountSequence: ${orderRecord.accountSequence}, userId: ${orderRecord.userId}`); } }); this.logger.log(`[CONFIRMED] Order ${payload.orderNo} confirmed successfully`); } catch (error) { this.logger.error(`[CONFIRMED] Failed to process confirmation for ${payload.orderNo}`, error); throw error; } } /** * Handle withdrawal failed event * Update order status to FAILED and refund frozen funds (amount + fee) * * Uses database transaction + optimistic locking to ensure atomicity and prevent race conditions. */ private async handleWithdrawalFailed( payload: WithdrawalFailedPayload, ): Promise { this.logger.log(`[FAILED] Processing withdrawal failure`); this.logger.log(`[FAILED] orderNo: ${payload.orderNo}`); this.logger.log(`[FAILED] error: ${payload.error}`); let retries = 0; while (retries < this.MAX_RETRIES) { try { await this.executeWithdrawalFailed(payload); return; // Success, exit } catch (error) { if (this.isOptimisticLockError(error)) { retries++; this.logger.warn(`[FAILED] Optimistic lock conflict for ${payload.orderNo}, retry ${retries}/${this.MAX_RETRIES}`); if (retries >= this.MAX_RETRIES) { this.logger.error(`[FAILED] Max retries exceeded for ${payload.orderNo}`); throw error; } // Brief delay before retry await this.sleep(50 * retries); } else { throw error; } } } } /** * Execute the withdrawal failed logic within a transaction */ private async executeWithdrawalFailed( payload: WithdrawalFailedPayload, ): Promise { try { // Use transaction to ensure atomicity await this.prisma.$transaction(async (tx) => { // Find the withdrawal order const orderRecord = await tx.withdrawalOrder.findUnique({ where: { orderNo: payload.orderNo }, }); if (!orderRecord) { this.logger.error(`[FAILED] Order not found: ${payload.orderNo}`); return; } // Check if already in terminal state (idempotency) if (orderRecord.status === WithdrawalStatus.CONFIRMED || orderRecord.status === WithdrawalStatus.FAILED || orderRecord.status === WithdrawalStatus.CANCELLED) { this.logger.log(`[FAILED] Order ${payload.orderNo} already in terminal state: ${orderRecord.status}, skipping`); return; } // Check if needs unfreeze (was frozen) const needsUnfreeze = orderRecord.frozenAt !== null; // Update order status to FAILED await tx.withdrawalOrder.update({ where: { id: orderRecord.id }, data: { status: WithdrawalStatus.FAILED, errorMessage: payload.error, }, }); // Refund frozen funds back to available balance if needed if (needsUnfreeze) { let walletRecord = await tx.walletAccount.findUnique({ where: { accountSequence: orderRecord.accountSequence }, }); if (!walletRecord) { walletRecord = await tx.walletAccount.findUnique({ where: { userId: orderRecord.userId }, }); } if (walletRecord) { // Unfreeze the total amount (amount + fee) const totalAmount = new Decimal(orderRecord.amount.toString()).add(new Decimal(orderRecord.fee.toString())); const currentFrozen = new Decimal(walletRecord.usdtFrozen.toString()); const currentAvailable = new Decimal(walletRecord.usdtAvailable.toString()); const currentVersion = walletRecord.version; // Validate frozen balance let newFrozen: Decimal; let newAvailable: Decimal; if (currentFrozen.lessThan(totalAmount)) { this.logger.warn(`[FAILED] Frozen balance (${currentFrozen}) less than refund amount (${totalAmount}), refunding what's available`); // Refund whatever is frozen (shouldn't happen in normal flow) const refundAmount = Decimal.min(currentFrozen, totalAmount); newFrozen = currentFrozen.minus(refundAmount); newAvailable = currentAvailable.add(refundAmount); } else { newFrozen = currentFrozen.minus(totalAmount); newAvailable = currentAvailable.add(totalAmount); } // Optimistic lock: update only if version matches const updateResult = await tx.walletAccount.updateMany({ where: { id: walletRecord.id, version: currentVersion, // Optimistic lock condition }, data: { usdtFrozen: newFrozen, usdtAvailable: newAvailable, version: currentVersion + 1, // Increment version updatedAt: new Date(), }, }); if (updateResult.count === 0) { // Version mismatch - another transaction modified the record throw new OptimisticLockError(`Optimistic lock conflict for wallet ${walletRecord.id}`); } this.logger.log(`[FAILED] Refunded ${totalAmount.toString()} USDT (amount + fee) to account ${orderRecord.accountSequence} (version: ${currentVersion} -> ${currentVersion + 1})`); } else { this.logger.error(`[FAILED] Wallet not found for accountSequence: ${orderRecord.accountSequence}, userId: ${orderRecord.userId}`); } } }); this.logger.log(`[FAILED] Order ${payload.orderNo} marked as failed`); } catch (error) { this.logger.error(`[FAILED] Failed to process failure for ${payload.orderNo}`, error); throw error; } } /** * Check if error is an optimistic lock error */ private isOptimisticLockError(error: unknown): boolean { return error instanceof OptimisticLockError; } /** * Sleep for specified milliseconds */ private sleep(ms: number): Promise { return new Promise(resolve => setTimeout(resolve, ms)); } }