feat(wallet/mpc): 增强提现和充值流程可靠性

## 主要改进

### MPC 签名系统 (mpc-system)
- 添加签名缓存机制,避免重复签名请求
- 修复 yParity 恢复逻辑,确保签名格式正确
- 优化签名完成报告流程

### 区块链服务 (blockchain-service)
- EIP-1559 降级为 Legacy 交易(KAVA 测试网兼容)
- 修复 gas 估算逻辑

### 钱包服务 (wallet-service)
- 添加乐观锁机制 (version 字段) 防止并发修改
- 提现确认流程添加事务保护 + 乐观锁
- 提现失败时正确解冻 amount + fee
- 充值流程添加事务保护 + 乐观锁
- Kafka consumer 添加错误重抛,触发重试机制

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2025-12-15 19:47:20 -08:00
parent bbd6f2ee38
commit 5c931ccdd8
11 changed files with 1133 additions and 638 deletions

View File

@ -0,0 +1,59 @@
package memory
import (
"sync"
"time"
"github.com/google/uuid"
)
const (
// SignatureCacheTTL is the time-to-live for cached signatures (24 hours)
SignatureCacheTTL = 24 * time.Hour
)
// signatureCacheEntry represents a cached signature with expiration
type signatureCacheEntry struct {
Signature []byte
ExpiresAt time.Time
}
// SignatureCache provides in-memory caching for signatures
type SignatureCache struct {
cache sync.Map
}
// Global signature cache instance
var globalSignatureCache = &SignatureCache{}
// GetSignatureCache returns the global signature cache instance
func GetSignatureCache() *SignatureCache {
return globalSignatureCache
}
// Set stores a signature in the cache with 24h TTL
func (c *SignatureCache) Set(sessionID uuid.UUID, signature []byte) {
entry := signatureCacheEntry{
Signature: signature,
ExpiresAt: time.Now().Add(SignatureCacheTTL),
}
c.cache.Store(sessionID.String(), entry)
}
// Get retrieves a signature from the cache
func (c *SignatureCache) Get(sessionID uuid.UUID) ([]byte, bool) {
value, ok := c.cache.Load(sessionID.String())
if !ok {
return nil, false
}
entry := value.(signatureCacheEntry)
// Check if expired
if time.Now().After(entry.ExpiresAt) {
c.cache.Delete(sessionID.String())
return nil, false
}
return entry.Signature, true
}

View File

@ -4,7 +4,9 @@ import (
"context"
"github.com/google/uuid"
"github.com/rwadurian/mpc-system/services/session-coordinator/adapters/output/memory"
"github.com/rwadurian/mpc-system/services/session-coordinator/application/ports/input"
"github.com/rwadurian/mpc-system/services/session-coordinator/domain/entities"
"github.com/rwadurian/mpc-system/services/session-coordinator/domain/repositories"
"github.com/rwadurian/mpc-system/services/session-coordinator/domain/value_objects"
)
@ -45,7 +47,13 @@ func (uc *GetSessionStatusUseCase) Execute(
}
}
// 3. Build response
// 3. For completed sign sessions, get signature from cache
var signature []byte
if session.SessionType == entities.SessionTypeSign && session.Status.String() == "completed" {
signature, _ = memory.GetSignatureCache().Get(sessionID)
}
// 4. Build response
// has_delegate is only meaningful for keygen sessions
hasDelegate := session.DelegatePartyID != "" && string(session.SessionType) == "keygen"
return &input.SessionStatusOutput{
@ -56,6 +64,7 @@ func (uc *GetSessionStatusUseCase) Execute(
ThresholdN: session.Threshold.N(),
Participants: participants,
PublicKey: session.PublicKey,
Signature: signature,
HasDelegate: hasDelegate,
DelegatePartyID: session.DelegatePartyID,
}, nil

View File

@ -7,6 +7,7 @@ import (
"time"
"github.com/rwadurian/mpc-system/pkg/logger"
"github.com/rwadurian/mpc-system/services/session-coordinator/adapters/output/memory"
"github.com/rwadurian/mpc-system/services/session-coordinator/application/ports/input"
"github.com/rwadurian/mpc-system/services/session-coordinator/application/ports/output"
"github.com/rwadurian/mpc-system/services/session-coordinator/domain/entities"
@ -169,6 +170,15 @@ func (uc *ReportCompletionUseCase) executeWithRetry(
return nil, err
}
// For sign sessions, cache the signature for HTTP API retrieval
// Note: server-party sends signature in PublicKey field (same field used for keygen public key)
if session.SessionType == entities.SessionTypeSign && len(inputData.PublicKey) > 0 {
memory.GetSignatureCache().Set(session.ID.UUID(), inputData.PublicKey)
logger.Info("cached signature for sign session",
zap.String("session_id", session.ID.String()),
zap.Int("signature_len", len(inputData.PublicKey)))
}
// Publish session completed event
completedEvent := output.SessionCompletedEvent{
SessionID: session.ID.String(),

View File

@ -7,6 +7,7 @@ import {
formatUnits,
Transaction,
Signature,
recoverAddress,
} from 'ethers';
import { ChainConfigService } from './chain-config.service';
import { ChainType } from '@/domain/value-objects';
@ -191,16 +192,42 @@ export class Erc20TransferService {
data: transferData,
});
const tx = Transaction.from({
type: 2, // EIP-1559
chainId: config.chainId,
nonce,
to: config.usdtContract,
data: transferData,
gasLimit: gasEstimate * BigInt(120) / BigInt(100), // 增加 20% buffer
maxFeePerGas: feeData.maxFeePerGas,
maxPriorityFeePerGas: feeData.maxPriorityFeePerGas,
});
const gasLimit = gasEstimate * BigInt(120) / BigInt(100); // 增加 20% buffer
// 检测链是否支持 EIP-1559
// 如果 maxFeePerGas 为 null 或 0则使用 legacy 交易
const supportsEip1559 = feeData.maxFeePerGas && feeData.maxFeePerGas > BigInt(0);
this.logger.log(`[TRANSFER] Chain supports EIP-1559: ${supportsEip1559}`);
this.logger.log(`[TRANSFER] Fee data: gasPrice=${feeData.gasPrice}, maxFeePerGas=${feeData.maxFeePerGas}`);
let tx: Transaction;
if (supportsEip1559) {
// EIP-1559 交易 (type 2)
tx = Transaction.from({
type: 2,
chainId: config.chainId,
nonce,
to: config.usdtContract,
data: transferData,
gasLimit,
maxFeePerGas: feeData.maxFeePerGas,
maxPriorityFeePerGas: feeData.maxPriorityFeePerGas,
});
this.logger.log(`[TRANSFER] Built EIP-1559 transaction`);
} else {
// Legacy 交易 (type 0)
const gasPrice = feeData.gasPrice || BigInt(1000000000); // 默认 1 gwei
tx = Transaction.from({
type: 0,
chainId: config.chainId,
nonce,
to: config.usdtContract,
data: transferData,
gasLimit,
gasPrice,
});
this.logger.log(`[TRANSFER] Built legacy transaction with gasPrice=${gasPrice}`);
}
this.logger.log(`[TRANSFER] Transaction built: nonce=${nonce}, gasLimit=${tx.gasLimit}`);
@ -213,8 +240,43 @@ export class Erc20TransferService {
const signatureHex = await this.mpcSigningClient.signMessage(unsignedTxHash);
this.logger.log(`[TRANSFER] MPC signature obtained: ${signatureHex.slice(0, 20)}...`);
// 解析签名
const signature = Signature.from(signatureHex);
// 解析签名 - MPC 返回 64 字节 (r+s),需要转换为 ethers.js 格式
// 确保有 0x 前缀
const normalizedSig = signatureHex.startsWith('0x') ? signatureHex : `0x${signatureHex}`;
this.logger.log(`[TRANSFER] Normalized signature: ${normalizedSig.slice(0, 22)}...`);
// MPC 签名是 64 字节 (r: 32 bytes + s: 32 bytes),需要添加 v (recovery id)
// 对于 EIP-1559 交易v = 0 或 1 (yParity)
// 我们需要尝试两个值来恢复正确的地址
const sigBytes = normalizedSig.slice(2); // 去掉 0x
const r = `0x${sigBytes.slice(0, 64)}`;
const s = `0x${sigBytes.slice(64, 128)}`;
this.logger.log(`[TRANSFER] Signature r: ${r.slice(0, 20)}...`);
this.logger.log(`[TRANSFER] Signature s: ${s.slice(0, 20)}...`);
// 尝试 yParity 0 和 1 来找到正确的 recovery id
let signature: Signature | null = null;
for (const yParity of [0, 1] as const) {
try {
const testSig = Signature.from({ r, s, yParity });
// 使用 recoverAddress 验证签名
const recoveredAddress = recoverAddress(unsignedTxHash, testSig);
this.logger.log(`[TRANSFER] Recovered address with yParity=${yParity}: ${recoveredAddress}`);
if (recoveredAddress.toLowerCase() === this.hotWalletAddress.toLowerCase()) {
this.logger.log(`[TRANSFER] Found correct yParity: ${yParity}`);
signature = testSig;
break;
}
} catch (e) {
this.logger.debug(`[TRANSFER] yParity=${yParity} failed: ${e}`);
}
}
if (!signature) {
throw new Error('Failed to recover correct signature - address mismatch');
}
// 创建已签名交易
const signedTx = tx.clone();

View File

@ -58,6 +58,9 @@ model WalletAccount {
// 状态
status String @default("ACTIVE") @map("status") @db.VarChar(20)
// 乐观锁版本号
version Int @default(0) @map("version")
createdAt DateTime @default(now()) @map("created_at")
updatedAt DateTime @updatedAt @map("updated_at")

View File

@ -10,23 +10,36 @@ import {
IWalletAccountRepository,
WALLET_ACCOUNT_REPOSITORY,
} from '@/domain/repositories';
import { PrismaService } from '@/infrastructure/persistence/prisma/prisma.service';
import { WithdrawalOrder, WalletAccount } from '@/domain/aggregates';
import { WithdrawalStatus, Money, UserId } from '@/domain/value-objects';
import { OptimisticLockError } from '@/shared/exceptions/domain.exception';
import Decimal from 'decimal.js';
/**
* Withdrawal Status Handler
*
* Handles withdrawal status events from blockchain-service.
* Updates withdrawal order status and handles fund refunds on failure.
*
* IMPORTANT:
* - All operations use database transactions for atomicity.
* - Wallet balance updates use optimistic locking to prevent concurrent modification issues.
*/
@Injectable()
export class WithdrawalStatusHandler implements OnModuleInit {
private readonly logger = new Logger(WithdrawalStatusHandler.name);
// Max retry count for optimistic lock conflicts
private readonly MAX_RETRIES = 3;
constructor(
private readonly withdrawalEventConsumer: WithdrawalEventConsumerService,
@Inject(WITHDRAWAL_ORDER_REPOSITORY)
private readonly withdrawalRepo: IWithdrawalOrderRepository,
@Inject(WALLET_ACCOUNT_REPOSITORY)
private readonly walletRepo: IWalletAccountRepository,
private readonly prisma: PrismaService,
) {}
onModuleInit() {
@ -41,7 +54,9 @@ export class WithdrawalStatusHandler implements OnModuleInit {
/**
* Handle withdrawal confirmed event
* Update order status to CONFIRMED and store txHash
* Update order status to CONFIRMED, store txHash, and deduct frozen balance
*
* Uses database transaction + optimistic locking to ensure atomicity and prevent race conditions.
*/
private async handleWithdrawalConfirmed(
payload: WithdrawalConfirmedPayload,
@ -50,26 +65,129 @@ export class WithdrawalStatusHandler implements OnModuleInit {
this.logger.log(`[CONFIRMED] orderNo: ${payload.orderNo}`);
this.logger.log(`[CONFIRMED] txHash: ${payload.txHash}`);
let retries = 0;
while (retries < this.MAX_RETRIES) {
try {
await this.executeWithdrawalConfirmed(payload);
return; // Success, exit
} catch (error) {
if (this.isOptimisticLockError(error)) {
retries++;
this.logger.warn(`[CONFIRMED] Optimistic lock conflict for ${payload.orderNo}, retry ${retries}/${this.MAX_RETRIES}`);
if (retries >= this.MAX_RETRIES) {
this.logger.error(`[CONFIRMED] Max retries exceeded for ${payload.orderNo}`);
throw error;
}
// Brief delay before retry
await this.sleep(50 * retries);
} else {
throw error;
}
}
}
}
/**
* Execute the withdrawal confirmed logic within a transaction
*/
private async executeWithdrawalConfirmed(
payload: WithdrawalConfirmedPayload,
): Promise<void> {
try {
// Find the withdrawal order
const order = await this.withdrawalRepo.findByOrderNo(payload.orderNo);
if (!order) {
this.logger.error(`[CONFIRMED] Order not found: ${payload.orderNo}`);
return;
}
// Use transaction to ensure atomicity
await this.prisma.$transaction(async (tx) => {
// Find the withdrawal order
const orderRecord = await tx.withdrawalOrder.findUnique({
where: { orderNo: payload.orderNo },
});
// Update order status: FROZEN -> BROADCASTED -> CONFIRMED
// If still FROZEN, first mark as broadcasted with txHash
if (order.isFrozen) {
order.markAsBroadcasted(payload.txHash);
}
if (!orderRecord) {
this.logger.error(`[CONFIRMED] Order not found: ${payload.orderNo}`);
return;
}
// Then mark as confirmed
if (order.isBroadcasted) {
order.markAsConfirmed();
}
// Check if already confirmed (idempotency)
if (orderRecord.status === WithdrawalStatus.CONFIRMED) {
this.logger.log(`[CONFIRMED] Order ${payload.orderNo} already confirmed, skipping`);
return;
}
await this.withdrawalRepo.save(order);
// Determine new status based on current status
let newStatus = orderRecord.status;
let txHash = orderRecord.txHash;
let broadcastedAt = orderRecord.broadcastedAt;
let confirmedAt = orderRecord.confirmedAt;
// FROZEN -> BROADCASTED -> CONFIRMED
if (orderRecord.status === WithdrawalStatus.FROZEN) {
newStatus = WithdrawalStatus.BROADCASTED;
txHash = payload.txHash;
broadcastedAt = new Date();
}
if (newStatus === WithdrawalStatus.BROADCASTED || orderRecord.status === WithdrawalStatus.BROADCASTED) {
newStatus = WithdrawalStatus.CONFIRMED;
confirmedAt = new Date();
}
// Update order status
await tx.withdrawalOrder.update({
where: { id: orderRecord.id },
data: {
status: newStatus,
txHash,
broadcastedAt,
confirmedAt,
},
});
// Find wallet and deduct frozen balance with optimistic lock
let walletRecord = await tx.walletAccount.findUnique({
where: { accountSequence: orderRecord.accountSequence },
});
if (!walletRecord) {
walletRecord = await tx.walletAccount.findUnique({
where: { userId: orderRecord.userId },
});
}
if (walletRecord) {
// Deduct the total frozen amount (amount + fee)
const totalAmount = new Decimal(orderRecord.amount.toString()).add(new Decimal(orderRecord.fee.toString()));
const currentFrozen = new Decimal(walletRecord.usdtFrozen.toString());
if (currentFrozen.lessThan(totalAmount)) {
this.logger.error(`[CONFIRMED] Insufficient frozen balance: have ${currentFrozen}, need ${totalAmount}`);
throw new Error(`Insufficient frozen balance for withdrawal ${payload.orderNo}`);
}
const newFrozen = currentFrozen.minus(totalAmount);
const currentVersion = walletRecord.version;
// Optimistic lock: update only if version matches
const updateResult = await tx.walletAccount.updateMany({
where: {
id: walletRecord.id,
version: currentVersion, // Optimistic lock condition
},
data: {
usdtFrozen: newFrozen,
version: currentVersion + 1, // Increment version
updatedAt: new Date(),
},
});
if (updateResult.count === 0) {
// Version mismatch - another transaction modified the record
throw new OptimisticLockError(`Optimistic lock conflict for wallet ${walletRecord.id}`);
}
this.logger.log(`[CONFIRMED] Deducted ${totalAmount.toString()} USDT from frozen balance for ${orderRecord.accountSequence} (version: ${currentVersion} -> ${currentVersion + 1})`);
} else {
this.logger.error(`[CONFIRMED] Wallet not found for accountSequence: ${orderRecord.accountSequence}, userId: ${orderRecord.userId}`);
}
});
this.logger.log(`[CONFIRMED] Order ${payload.orderNo} confirmed successfully`);
} catch (error) {
@ -80,7 +198,9 @@ export class WithdrawalStatusHandler implements OnModuleInit {
/**
* Handle withdrawal failed event
* Update order status to FAILED and refund frozen funds
* Update order status to FAILED and refund frozen funds (amount + fee)
*
* Uses database transaction + optimistic locking to ensure atomicity and prevent race conditions.
*/
private async handleWithdrawalFailed(
payload: WithdrawalFailedPayload,
@ -89,35 +209,126 @@ export class WithdrawalStatusHandler implements OnModuleInit {
this.logger.log(`[FAILED] orderNo: ${payload.orderNo}`);
this.logger.log(`[FAILED] error: ${payload.error}`);
try {
// Find the withdrawal order
const order = await this.withdrawalRepo.findByOrderNo(payload.orderNo);
if (!order) {
this.logger.error(`[FAILED] Order not found: ${payload.orderNo}`);
return;
}
// Mark order as failed
order.markAsFailed(payload.error);
await this.withdrawalRepo.save(order);
// Refund frozen funds back to available balance if needed
if (order.needsUnfreeze()) {
// 优先使用 accountSequence 查找钱包(更可靠,避免 userId 变化导致扣错人)
let wallet = await this.walletRepo.findByAccountSequence(order.accountSequence);
if (!wallet) {
// 兜底:使用 userId 查找
wallet = await this.walletRepo.findByUserId(order.userId.value);
}
if (wallet) {
// Unfreeze the amount (add back to available balance)
wallet.unfreeze(order.amount);
await this.walletRepo.save(wallet);
this.logger.log(`[FAILED] Refunded ${order.amount.value} USDT to account ${order.accountSequence}`);
let retries = 0;
while (retries < this.MAX_RETRIES) {
try {
await this.executeWithdrawalFailed(payload);
return; // Success, exit
} catch (error) {
if (this.isOptimisticLockError(error)) {
retries++;
this.logger.warn(`[FAILED] Optimistic lock conflict for ${payload.orderNo}, retry ${retries}/${this.MAX_RETRIES}`);
if (retries >= this.MAX_RETRIES) {
this.logger.error(`[FAILED] Max retries exceeded for ${payload.orderNo}`);
throw error;
}
// Brief delay before retry
await this.sleep(50 * retries);
} else {
this.logger.error(`[FAILED] Wallet not found for accountSequence: ${order.accountSequence}, userId: ${order.userId}`);
throw error;
}
}
}
}
/**
* Execute the withdrawal failed logic within a transaction
*/
private async executeWithdrawalFailed(
payload: WithdrawalFailedPayload,
): Promise<void> {
try {
// Use transaction to ensure atomicity
await this.prisma.$transaction(async (tx) => {
// Find the withdrawal order
const orderRecord = await tx.withdrawalOrder.findUnique({
where: { orderNo: payload.orderNo },
});
if (!orderRecord) {
this.logger.error(`[FAILED] Order not found: ${payload.orderNo}`);
return;
}
// Check if already in terminal state (idempotency)
if (orderRecord.status === WithdrawalStatus.CONFIRMED ||
orderRecord.status === WithdrawalStatus.FAILED ||
orderRecord.status === WithdrawalStatus.CANCELLED) {
this.logger.log(`[FAILED] Order ${payload.orderNo} already in terminal state: ${orderRecord.status}, skipping`);
return;
}
// Check if needs unfreeze (was frozen)
const needsUnfreeze = orderRecord.frozenAt !== null;
// Update order status to FAILED
await tx.withdrawalOrder.update({
where: { id: orderRecord.id },
data: {
status: WithdrawalStatus.FAILED,
errorMessage: payload.error,
},
});
// Refund frozen funds back to available balance if needed
if (needsUnfreeze) {
let walletRecord = await tx.walletAccount.findUnique({
where: { accountSequence: orderRecord.accountSequence },
});
if (!walletRecord) {
walletRecord = await tx.walletAccount.findUnique({
where: { userId: orderRecord.userId },
});
}
if (walletRecord) {
// Unfreeze the total amount (amount + fee)
const totalAmount = new Decimal(orderRecord.amount.toString()).add(new Decimal(orderRecord.fee.toString()));
const currentFrozen = new Decimal(walletRecord.usdtFrozen.toString());
const currentAvailable = new Decimal(walletRecord.usdtAvailable.toString());
const currentVersion = walletRecord.version;
// Validate frozen balance
let newFrozen: Decimal;
let newAvailable: Decimal;
if (currentFrozen.lessThan(totalAmount)) {
this.logger.warn(`[FAILED] Frozen balance (${currentFrozen}) less than refund amount (${totalAmount}), refunding what's available`);
// Refund whatever is frozen (shouldn't happen in normal flow)
const refundAmount = Decimal.min(currentFrozen, totalAmount);
newFrozen = currentFrozen.minus(refundAmount);
newAvailable = currentAvailable.add(refundAmount);
} else {
newFrozen = currentFrozen.minus(totalAmount);
newAvailable = currentAvailable.add(totalAmount);
}
// Optimistic lock: update only if version matches
const updateResult = await tx.walletAccount.updateMany({
where: {
id: walletRecord.id,
version: currentVersion, // Optimistic lock condition
},
data: {
usdtFrozen: newFrozen,
usdtAvailable: newAvailable,
version: currentVersion + 1, // Increment version
updatedAt: new Date(),
},
});
if (updateResult.count === 0) {
// Version mismatch - another transaction modified the record
throw new OptimisticLockError(`Optimistic lock conflict for wallet ${walletRecord.id}`);
}
this.logger.log(`[FAILED] Refunded ${totalAmount.toString()} USDT (amount + fee) to account ${orderRecord.accountSequence} (version: ${currentVersion} -> ${currentVersion + 1})`);
} else {
this.logger.error(`[FAILED] Wallet not found for accountSequence: ${orderRecord.accountSequence}, userId: ${orderRecord.userId}`);
}
}
});
this.logger.log(`[FAILED] Order ${payload.orderNo} marked as failed`);
} catch (error) {
@ -125,4 +336,18 @@ export class WithdrawalStatusHandler implements OnModuleInit {
throw error;
}
}
/**
* Check if error is an optimistic lock error
*/
private isOptimisticLockError(error: unknown): boolean {
return error instanceof OptimisticLockError;
}
/**
* Sleep for specified milliseconds
*/
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}

View File

@ -19,7 +19,7 @@ import {
FreezeForPlantingCommand, ConfirmPlantingDeductionCommand, UnfreezeForPlantingCommand,
} from '@/application/commands';
import { GetMyWalletQuery, GetMyLedgerQuery } from '@/application/queries';
import { DuplicateTransactionError, WalletNotFoundError } from '@/shared/exceptions/domain.exception';
import { DuplicateTransactionError, WalletNotFoundError, OptimisticLockError } from '@/shared/exceptions/domain.exception';
import { WalletCacheService } from '@/infrastructure/redis';
import { EventPublisherService } from '@/infrastructure/kafka';
import { WithdrawalRequestedEvent } from '@/domain/events';
@ -93,53 +93,165 @@ export class WalletApplicationService {
// =============== Commands ===============
/**
* Handle deposit with transaction protection and optimistic locking
*
* Uses database transaction to ensure atomicity of:
* 1. Deposit order creation
* 2. Wallet balance update (with optimistic lock)
* 3. Ledger entry creation
*/
async handleDeposit(command: HandleDepositCommand): Promise<void> {
// Check for duplicate transaction
const exists = await this.depositRepo.existsByTxHash(command.txHash);
if (exists) {
throw new DuplicateTransactionError(command.txHash);
const MAX_RETRIES = 3;
let retries = 0;
while (retries < MAX_RETRIES) {
try {
await this.executeHandleDeposit(command);
return; // Success, exit
} catch (error) {
if (this.isOptimisticLockError(error)) {
retries++;
this.logger.warn(`[DEPOSIT] Optimistic lock conflict for ${command.txHash}, retry ${retries}/${MAX_RETRIES}`);
if (retries >= MAX_RETRIES) {
this.logger.error(`[DEPOSIT] Max retries exceeded for ${command.txHash}`);
throw error;
}
// Brief delay before retry
await this.sleep(50 * retries);
} else {
throw error;
}
}
}
}
/**
* Execute deposit logic within a transaction
*/
private async executeHandleDeposit(command: HandleDepositCommand): Promise<void> {
const accountSequence = command.accountSequence;
const userId = BigInt(command.userId);
const amount = Money.USDT(command.amount);
const userIdBigint = BigInt(command.userId);
const amountDecimal = new (await import('decimal.js')).default(command.amount);
// Get or create wallet by accountSequence (跨服务关联标识)
const wallet = await this.walletRepo.getOrCreate(accountSequence, userId);
await this.prisma.$transaction(async (tx) => {
// Check for duplicate transaction within transaction
const existingDeposit = await tx.depositOrder.findUnique({
where: { txHash: command.txHash },
});
if (existingDeposit) {
throw new DuplicateTransactionError(command.txHash);
}
// Create deposit order
const depositOrder = DepositOrder.create({
accountSequence,
userId: UserId.create(userId),
chainType: command.chainType,
amount,
txHash: command.txHash,
// Get or create wallet
let walletRecord = await tx.walletAccount.findUnique({
where: { accountSequence },
});
if (!walletRecord) {
walletRecord = await tx.walletAccount.create({
data: {
accountSequence,
userId: userIdBigint,
usdtAvailable: 0,
usdtFrozen: 0,
dstAvailable: 0,
dstFrozen: 0,
bnbAvailable: 0,
bnbFrozen: 0,
ogAvailable: 0,
ogFrozen: 0,
rwadAvailable: 0,
rwadFrozen: 0,
hashpower: 0,
pendingUsdt: 0,
pendingHashpower: 0,
settleableUsdt: 0,
settleableHashpower: 0,
settledTotalUsdt: 0,
settledTotalHashpower: 0,
expiredTotalUsdt: 0,
expiredTotalHashpower: 0,
status: 'ACTIVE',
version: 0,
},
});
}
// Create deposit order
await tx.depositOrder.create({
data: {
accountSequence,
userId: userIdBigint,
chainType: command.chainType,
amount: amountDecimal,
txHash: command.txHash,
status: 'CONFIRMED',
confirmedAt: new Date(),
},
});
// Update wallet balance with optimistic lock
const Decimal = (await import('decimal.js')).default;
const currentAvailable = new Decimal(walletRecord.usdtAvailable.toString());
const newAvailable = currentAvailable.add(amountDecimal);
const currentVersion = walletRecord.version;
const updateResult = await tx.walletAccount.updateMany({
where: {
id: walletRecord.id,
version: currentVersion, // Optimistic lock condition
},
data: {
usdtAvailable: newAvailable,
version: currentVersion + 1, // Increment version
updatedAt: new Date(),
},
});
if (updateResult.count === 0) {
// Version mismatch - another transaction modified the record
throw new OptimisticLockError(`Optimistic lock conflict for wallet ${walletRecord.id}`);
}
// Record ledger entry
const entryType = command.chainType === ChainType.KAVA
? LedgerEntryType.DEPOSIT_KAVA
: LedgerEntryType.DEPOSIT_BSC;
await tx.ledgerEntry.create({
data: {
accountSequence,
userId: userIdBigint,
entryType,
amount: amountDecimal,
assetType: 'USDT',
balanceAfter: newAvailable,
refTxHash: command.txHash,
memo: `Deposit from ${command.chainType}`,
},
});
this.logger.log(`[DEPOSIT] Credited ${amountDecimal.toString()} USDT to ${accountSequence} (version: ${currentVersion} -> ${currentVersion + 1})`);
});
depositOrder.confirm();
await this.depositRepo.save(depositOrder);
// Credit wallet
wallet.deposit(amount, command.chainType, command.txHash);
await this.walletRepo.save(wallet);
// Invalidate wallet cache after deposit (outside transaction)
await this.walletCacheService.invalidateWallet(userIdBigint);
}
// Record ledger entry (append-only, 可审计)
const entryType = command.chainType === ChainType.KAVA
? LedgerEntryType.DEPOSIT_KAVA
: LedgerEntryType.DEPOSIT_BSC;
/**
* Check if error is an optimistic lock error
*/
private isOptimisticLockError(error: unknown): boolean {
return error instanceof OptimisticLockError ||
(error instanceof Error && error.message.includes('Optimistic lock conflict'));
}
const ledgerEntry = LedgerEntry.create({
accountSequence,
userId: UserId.create(userId),
entryType,
amount,
balanceAfter: wallet.balances.usdt.available,
refTxHash: command.txHash,
memo: `Deposit from ${command.chainType}`,
});
await this.ledgerRepo.save(ledgerEntry);
// Invalidate wallet cache after deposit
await this.walletCacheService.invalidateWallet(userId);
/**
* Sleep for specified milliseconds
*/
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
async deductForPlanting(command: DeductForPlantingCommand): Promise<boolean> {

View File

@ -143,6 +143,9 @@ export class DepositEventConsumerService implements OnModuleInit, OnModuleDestro
}
} catch (error) {
this.logger.error(`[ERROR] Error processing deposit event from ${topic}`, error);
// Re-throw to trigger Kafka retry mechanism
// This ensures messages are not marked as consumed until successfully processed
throw error;
}
},
});

View File

@ -171,6 +171,9 @@ export class WithdrawalEventConsumerService implements OnModuleInit, OnModuleDes
}
} catch (error) {
this.logger.error(`[ERROR] Error processing withdrawal event from ${topic}`, error);
// Re-throw to trigger Kafka retry mechanism
// This ensures messages are not marked as consumed until successfully processed
throw error;
}
},
});

View File

@ -39,3 +39,10 @@ export class InvalidOperationError extends DomainError {
this.name = 'InvalidOperationError';
}
}
export class OptimisticLockError extends DomainError {
constructor(message: string) {
super(message);
this.name = 'OptimisticLockError';
}
}