diff --git a/backend/services/blockchain-service/src/application/application.module.ts b/backend/services/blockchain-service/src/application/application.module.ts index e46655cd..55c4d518 100644 --- a/backend/services/blockchain-service/src/application/application.module.ts +++ b/backend/services/blockchain-service/src/application/application.module.ts @@ -12,6 +12,7 @@ import { } from './services'; import { MpcKeygenCompletedHandler, WithdrawalRequestedHandler } from './event-handlers'; import { DepositAckConsumerService } from '@/infrastructure/kafka/deposit-ack-consumer.service'; +import { HotWalletBalanceScheduler } from './schedulers'; @Module({ imports: [InfrastructureModule, DomainModule], @@ -31,6 +32,9 @@ import { DepositAckConsumerService } from '@/infrastructure/kafka/deposit-ack-co // 事件处理器 MpcKeygenCompletedHandler, WithdrawalRequestedHandler, + + // 定时任务 + HotWalletBalanceScheduler, ], exports: [ AddressDerivationService, diff --git a/backend/services/blockchain-service/src/application/schedulers/hot-wallet-balance.scheduler.ts b/backend/services/blockchain-service/src/application/schedulers/hot-wallet-balance.scheduler.ts new file mode 100644 index 00000000..2a222144 --- /dev/null +++ b/backend/services/blockchain-service/src/application/schedulers/hot-wallet-balance.scheduler.ts @@ -0,0 +1,94 @@ +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { Cron } from '@nestjs/schedule'; +import { Erc20TransferService } from '@/domain/services/erc20-transfer.service'; +import { ChainTypeEnum } from '@/domain/enums'; +import Redis from 'ioredis'; + +/** + * 热钱包 dUSDT (绿积分) 余额定时更新调度器 + * + * 每 5 秒查询热钱包在各链上的 dUSDT 余额,并更新到 Redis 缓存。 + * wallet-service 在用户发起转账时读取此缓存,预检查热钱包余额是否足够。 + * + * 注意:使用 Redis DB 0(公共数据库),以便所有服务都能读取。 + * + * Redis Key 格式: hot_wallet:dusdt_balance:{chainType} + * Redis Value: 余额字符串(如 "10000.00") + * TTL: 30 秒(防止服务故障时缓存过期) + */ +@Injectable() +export class HotWalletBalanceScheduler implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(HotWalletBalanceScheduler.name); + + // Redis key 前缀 + private readonly REDIS_KEY_PREFIX = 'hot_wallet:dusdt_balance:'; + + // 缓存过期时间(秒) + private readonly CACHE_TTL_SECONDS = 30; + + // 支持的链类型 + private readonly SUPPORTED_CHAINS = [ChainTypeEnum.KAVA, ChainTypeEnum.BSC]; + + // 使用独立的 Redis 连接,连接到 DB 0(公共数据库) + private readonly sharedRedis: Redis; + + constructor( + private readonly configService: ConfigService, + private readonly transferService: Erc20TransferService, + ) { + // 创建连接到 DB 0 的 Redis 客户端(公共数据库,所有服务可读取) + this.sharedRedis = new Redis({ + host: this.configService.get('redis.host') || 'localhost', + port: this.configService.get('redis.port') || 6379, + password: this.configService.get('redis.password') || undefined, + db: 0, // 使用 DB 0 作为公共数据库 + }); + + this.sharedRedis.on('connect', () => { + this.logger.log('[REDIS] Connected to shared Redis DB 0 for hot wallet balance'); + }); + + this.sharedRedis.on('error', (err) => { + this.logger.error('[REDIS] Shared Redis connection error', err); + }); + } + + onModuleDestroy() { + this.sharedRedis.disconnect(); + } + + async onModuleInit() { + this.logger.log('[INIT] HotWalletBalanceScheduler initialized'); + // 启动时立即执行一次 + await this.updateHotWalletBalances(); + } + + /** + * 每 5 秒更新热钱包余额到 Redis + */ + @Cron('*/5 * * * * *') // 每 5 秒执行 + async updateHotWalletBalances(): Promise { + for (const chainType of this.SUPPORTED_CHAINS) { + try { + // 检查该链是否已配置 + if (!this.transferService.isConfigured(chainType)) { + this.logger.debug(`[SKIP] Chain ${chainType} not configured, skipping balance update`); + continue; + } + + // 查询链上余额 + const balance = await this.transferService.getHotWalletBalance(chainType); + + // 更新到 Redis DB 0 + const redisKey = `${this.REDIS_KEY_PREFIX}${chainType}`; + await this.sharedRedis.setex(redisKey, this.CACHE_TTL_SECONDS, balance); + + this.logger.debug(`[UPDATE] ${chainType} hot wallet dUSDT balance: ${balance}`); + } catch (error) { + this.logger.error(`[ERROR] Failed to update ${chainType} hot wallet balance`, error); + // 单链失败不影响其他链的更新 + } + } + } +} diff --git a/backend/services/blockchain-service/src/application/schedulers/index.ts b/backend/services/blockchain-service/src/application/schedulers/index.ts new file mode 100644 index 00000000..f6299b40 --- /dev/null +++ b/backend/services/blockchain-service/src/application/schedulers/index.ts @@ -0,0 +1 @@ +export * from './hot-wallet-balance.scheduler'; diff --git a/backend/services/wallet-service/src/application/event-handlers/withdrawal-status.handler.ts b/backend/services/wallet-service/src/application/event-handlers/withdrawal-status.handler.ts index 161906a7..ab5f529e 100644 --- a/backend/services/wallet-service/src/application/event-handlers/withdrawal-status.handler.ts +++ b/backend/services/wallet-service/src/application/event-handlers/withdrawal-status.handler.ts @@ -224,52 +224,84 @@ export class WithdrawalStatusHandler implements OnModuleInit { }); } - if (toWalletRecord) { - const transferAmount = new Decimal(orderRecord.amount.toString()); - const toCurrentAvailable = new Decimal(toWalletRecord.usdtAvailable.toString()); - const toNewAvailable = toCurrentAvailable.add(transferAmount); - const toCurrentVersion = toWalletRecord.version; - - // 更新接收方余额 - const toUpdateResult = await tx.walletAccount.updateMany({ - where: { - id: toWalletRecord.id, - version: toCurrentVersion, - }, - data: { - usdtAvailable: toNewAvailable, - version: toCurrentVersion + 1, - updatedAt: new Date(), - }, - }); - - if (toUpdateResult.count === 0) { - throw new OptimisticLockError(`Optimistic lock conflict for receiver wallet ${toWalletRecord.id}`); - } - - // 给接收方记录 TRANSFER_IN 流水 - await tx.ledgerEntry.create({ - data: { + // 如果接收方钱包不存在,自动创建(使用 upsert 避免并发问题) + if (!toWalletRecord) { + this.logger.log(`[CONFIRMED] Receiver wallet not found, auto-creating for: ${orderRecord.toAccountSequence}`); + toWalletRecord = await tx.walletAccount.upsert({ + where: { accountSequence: orderRecord.toAccountSequence }, + create: { accountSequence: orderRecord.toAccountSequence, userId: orderRecord.toUserId, - entryType: LedgerEntryType.TRANSFER_IN, - amount: transferAmount, - assetType: 'USDT', - balanceAfter: toNewAvailable, - refOrderId: orderRecord.orderNo, - refTxHash: payload.txHash, - memo: `来自 ${orderRecord.accountSequence} 的转账`, - payloadJson: { - fromAccountSequence: orderRecord.accountSequence, - fromUserId: orderRecord.userId.toString(), - }, + usdtAvailable: new Decimal(0), + usdtFrozen: new Decimal(0), + dstAvailable: new Decimal(0), + dstFrozen: new Decimal(0), + bnbAvailable: new Decimal(0), + bnbFrozen: new Decimal(0), + ogAvailable: new Decimal(0), + ogFrozen: new Decimal(0), + rwadAvailable: new Decimal(0), + rwadFrozen: new Decimal(0), + hashpower: new Decimal(0), + pendingUsdt: new Decimal(0), + pendingHashpower: new Decimal(0), + settleableUsdt: new Decimal(0), + settleableHashpower: new Decimal(0), + settledTotalUsdt: new Decimal(0), + settledTotalHashpower: new Decimal(0), + expiredTotalUsdt: new Decimal(0), + expiredTotalHashpower: new Decimal(0), + status: 'ACTIVE', + hasPlanted: false, + version: 0, }, + update: {}, // 如果已存在,不做任何更新 }); - - this.logger.log(`[CONFIRMED] Internal transfer: ${orderRecord.accountSequence} -> ${orderRecord.toAccountSequence}, amount: ${transferAmount.toString()}`); - } else { - this.logger.error(`[CONFIRMED] Receiver wallet not found: ${orderRecord.toAccountSequence}`); + this.logger.log(`[CONFIRMED] Auto-created/found wallet for receiver: ${orderRecord.toAccountSequence} (id=${toWalletRecord.id})`); } + + const transferAmount = new Decimal(orderRecord.amount.toString()); + const toCurrentAvailable = new Decimal(toWalletRecord.usdtAvailable.toString()); + const toNewAvailable = toCurrentAvailable.add(transferAmount); + const toCurrentVersion = toWalletRecord.version; + + // 更新接收方余额 + const toUpdateResult = await tx.walletAccount.updateMany({ + where: { + id: toWalletRecord.id, + version: toCurrentVersion, + }, + data: { + usdtAvailable: toNewAvailable, + version: toCurrentVersion + 1, + updatedAt: new Date(), + }, + }); + + if (toUpdateResult.count === 0) { + throw new OptimisticLockError(`Optimistic lock conflict for receiver wallet ${toWalletRecord.id}`); + } + + // 给接收方记录 TRANSFER_IN 流水 + await tx.ledgerEntry.create({ + data: { + accountSequence: orderRecord.toAccountSequence, + userId: orderRecord.toUserId, + entryType: LedgerEntryType.TRANSFER_IN, + amount: transferAmount, + assetType: 'USDT', + balanceAfter: toNewAvailable, + refOrderId: orderRecord.orderNo, + refTxHash: payload.txHash, + memo: `来自 ${orderRecord.accountSequence} 的转账`, + payloadJson: { + fromAccountSequence: orderRecord.accountSequence, + fromUserId: orderRecord.userId.toString(), + }, + }, + }); + + this.logger.log(`[CONFIRMED] Internal transfer: ${orderRecord.accountSequence} -> ${orderRecord.toAccountSequence}, amount: ${transferAmount.toString()}`); } } else { // 普通提现:记录 WITHDRAWAL diff --git a/backend/services/wallet-service/src/application/services/wallet-application.service.ts b/backend/services/wallet-service/src/application/services/wallet-application.service.ts index aa36df36..795f0423 100644 --- a/backend/services/wallet-service/src/application/services/wallet-application.service.ts +++ b/backend/services/wallet-service/src/application/services/wallet-application.service.ts @@ -20,7 +20,7 @@ import { } from '@/application/commands'; import { GetMyWalletQuery, GetMyLedgerQuery } from '@/application/queries'; import { DuplicateTransactionError, WalletNotFoundError, OptimisticLockError } from '@/shared/exceptions/domain.exception'; -import { WalletCacheService } from '@/infrastructure/redis'; +import { WalletCacheService, HotWalletCacheService } from '@/infrastructure/redis'; import { EventPublisherService } from '@/infrastructure/kafka'; import { WithdrawalRequestedEvent } from '@/domain/events'; import { FeeConfigRepositoryImpl } from '@/infrastructure/persistence/repositories'; @@ -91,6 +91,7 @@ export class WalletApplicationService { @Inject(PENDING_REWARD_REPOSITORY) private readonly pendingRewardRepo: IPendingRewardRepository, private readonly walletCacheService: WalletCacheService, + private readonly hotWalletCacheService: HotWalletCacheService, private readonly eventPublisher: EventPublisherService, private readonly prisma: PrismaService, private readonly feeConfigRepo: FeeConfigRepositoryImpl, @@ -1518,6 +1519,16 @@ export class WalletApplicationService { throw new Error(`最小提现金额为 ${this.MIN_WITHDRAWAL_AMOUNT} USDT`); } + // 检查热钱包余额是否足够(预检查,防止用户资金被冻结后链上执行失败) + const hotWalletCheck = await this.hotWalletCacheService.checkSufficientBalance( + command.chainType, + amount.toDecimal(), + ); + if (!hotWalletCheck.sufficient) { + this.logger.warn(`[WITHDRAWAL] Hot wallet balance check failed for ${command.chainType}: ${hotWalletCheck.error}`); + throw new BadRequestException(hotWalletCheck.error || '财务系统审计中,请稍后再试'); + } + // 优先按 accountSequence 查找,如果未找到则按 userId 查找 let wallet = await this.walletRepo.findByAccountSequence(command.userId); if (!wallet) { diff --git a/backend/services/wallet-service/src/infrastructure/redis/hot-wallet-cache.service.ts b/backend/services/wallet-service/src/infrastructure/redis/hot-wallet-cache.service.ts new file mode 100644 index 00000000..eb87d1ff --- /dev/null +++ b/backend/services/wallet-service/src/infrastructure/redis/hot-wallet-cache.service.ts @@ -0,0 +1,108 @@ +import { Injectable, Logger, OnModuleDestroy } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import Redis from 'ioredis'; +import Decimal from 'decimal.js'; + +/** + * 热钱包 dUSDT (绿积分) 余额缓存服务 + * + * 从 Redis DB 0 读取 blockchain-service 写入的热钱包 dUSDT 余额缓存。 + * 用于在用户发起转账前预检查热钱包余额是否足够。 + * + * Redis Key 格式: hot_wallet:dusdt_balance:{chainType} + * + * 策略:如果无法获取余额(Redis 故障、缓存过期等),返回 null, + * 调用方应拒绝转账并提示用户稍后重试。 + */ +@Injectable() +export class HotWalletCacheService implements OnModuleDestroy { + private readonly logger = new Logger(HotWalletCacheService.name); + + // Redis key 前缀(与 blockchain-service 保持一致) + private readonly REDIS_KEY_PREFIX = 'hot_wallet:dusdt_balance:'; + + // 使用独立的 Redis 连接,连接到 DB 0(公共数据库) + private readonly sharedRedis: Redis; + + constructor(private readonly configService: ConfigService) { + // 创建连接到 DB 0 的 Redis 客户端(公共数据库) + this.sharedRedis = new Redis({ + host: this.configService.get('REDIS_HOST') || 'localhost', + port: this.configService.get('REDIS_PORT') || 6379, + password: this.configService.get('REDIS_PASSWORD') || undefined, + db: 0, // 使用 DB 0 作为公共数据库 + }); + + this.sharedRedis.on('connect', () => { + this.logger.log('[REDIS] Connected to shared Redis DB 0 for hot wallet balance cache'); + }); + + this.sharedRedis.on('error', (err) => { + this.logger.error('[REDIS] Shared Redis connection error', err); + }); + } + + onModuleDestroy() { + this.sharedRedis.disconnect(); + } + + /** + * 获取指定链的热钱包余额 + * + * @param chainType 链类型 (KAVA, BSC) + * @returns 余额(Decimal),如果无法获取则返回 null + */ + async getHotWalletBalance(chainType: string): Promise { + try { + const redisKey = `${this.REDIS_KEY_PREFIX}${chainType.toUpperCase()}`; + const balance = await this.sharedRedis.get(redisKey); + + if (balance === null) { + this.logger.warn(`[CACHE] Hot wallet balance not found for ${chainType}`); + return null; + } + + const balanceDecimal = new Decimal(balance); + this.logger.debug(`[CACHE] Hot wallet dUSDT balance for ${chainType}: ${balanceDecimal.toString()}`); + return balanceDecimal; + } catch (error) { + this.logger.error(`[CACHE] Failed to get hot wallet balance for ${chainType}`, error); + return null; + } + } + + /** + * 检查热钱包余额是否足够支付转账 + * + * @param chainType 链类型 + * @param requiredAmount 所需金额 + * @returns { sufficient: boolean, balance: Decimal | null, error?: string } + */ + async checkSufficientBalance( + chainType: string, + requiredAmount: Decimal, + ): Promise<{ sufficient: boolean; balance: Decimal | null; error?: string }> { + const balance = await this.getHotWalletBalance(chainType); + + if (balance === null) { + return { + sufficient: false, + balance: null, + error: '财务系统审计中,请稍后再试', + }; + } + + if (balance.lessThan(requiredAmount)) { + this.logger.warn( + `[CHECK] Insufficient hot wallet balance for ${chainType}: need ${requiredAmount.toString()}, have ${balance.toString()}`, + ); + return { + sufficient: false, + balance, + error: '财务系统审计中,请稍后再试', + }; + } + + return { sufficient: true, balance }; + } +} diff --git a/backend/services/wallet-service/src/infrastructure/redis/index.ts b/backend/services/wallet-service/src/infrastructure/redis/index.ts index 918d9d7e..8a748b2a 100644 --- a/backend/services/wallet-service/src/infrastructure/redis/index.ts +++ b/backend/services/wallet-service/src/infrastructure/redis/index.ts @@ -1,3 +1,4 @@ export * from './redis.service'; export * from './redis.module'; export * from './wallet-cache.service'; +export * from './hot-wallet-cache.service'; diff --git a/backend/services/wallet-service/src/infrastructure/redis/redis.module.ts b/backend/services/wallet-service/src/infrastructure/redis/redis.module.ts index 7c102adf..ea2320a4 100644 --- a/backend/services/wallet-service/src/infrastructure/redis/redis.module.ts +++ b/backend/services/wallet-service/src/infrastructure/redis/redis.module.ts @@ -2,11 +2,12 @@ import { Module, Global } from '@nestjs/common'; import { ConfigModule } from '@nestjs/config'; import { RedisService } from './redis.service'; import { WalletCacheService } from './wallet-cache.service'; +import { HotWalletCacheService } from './hot-wallet-cache.service'; @Global() @Module({ imports: [ConfigModule], - providers: [RedisService, WalletCacheService], - exports: [RedisService, WalletCacheService], + providers: [RedisService, WalletCacheService, HotWalletCacheService], + exports: [RedisService, WalletCacheService, HotWalletCacheService], }) export class RedisModule {}