feat(wallet/blockchain/identity): implement system account withdrawal feature

- Add SystemWithdrawalApplicationService to handle system account transfers
- Add SystemWithdrawalController with endpoints for request, query, and account listing
- Add SystemWithdrawalStatusHandler to process blockchain confirmation/failure events
- Add SystemWithdrawalRequestedHandler in blockchain-service to execute ERC20 transfers
- Add getUserByAccountSequence endpoint in identity-service for user lookup
- Support dynamic memo generation based on actual source account name
- Dual-sided ledger entries for system account transfers

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-06 10:22:15 -08:00
parent fa1931b3b6
commit 64bd82b77b
13 changed files with 1217 additions and 1 deletions

View File

@ -11,6 +11,7 @@ import {
MpcTransferInitializerService,
} from './services';
import { MpcKeygenCompletedHandler, WithdrawalRequestedHandler } from './event-handlers';
import { SystemWithdrawalRequestedHandler } from './event-handlers/system-withdrawal-requested.handler';
import { DepositAckConsumerService } from '@/infrastructure/kafka/deposit-ack-consumer.service';
import { HotWalletBalanceScheduler } from './schedulers';
@ -32,6 +33,7 @@ import { HotWalletBalanceScheduler } from './schedulers';
// 事件处理器
MpcKeygenCompletedHandler,
WithdrawalRequestedHandler,
SystemWithdrawalRequestedHandler,
// 定时任务
HotWalletBalanceScheduler,
@ -46,6 +48,7 @@ import { HotWalletBalanceScheduler } from './schedulers';
DepositAckConsumerService,
MpcKeygenCompletedHandler,
WithdrawalRequestedHandler,
SystemWithdrawalRequestedHandler,
],
})
export class ApplicationModule {}

View File

@ -0,0 +1,140 @@
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import {
WithdrawalEventConsumerService,
SystemWithdrawalRequestedPayload,
} from '@/infrastructure/kafka/withdrawal-event-consumer.service';
import { EventPublisherService } from '@/infrastructure/kafka/event-publisher.service';
import { Erc20TransferService } from '@/domain/services/erc20-transfer.service';
import { ChainTypeEnum } from '@/domain/enums';
/**
* System Withdrawal Requested Event Handler
*
* Handles system account withdrawal requests from wallet-service.
* Executes ERC20 USDT transfers from hot wallet to user's address.
*/
@Injectable()
export class SystemWithdrawalRequestedHandler implements OnModuleInit {
private readonly logger = new Logger(SystemWithdrawalRequestedHandler.name);
constructor(
private readonly withdrawalEventConsumer: WithdrawalEventConsumerService,
private readonly eventPublisher: EventPublisherService,
private readonly transferService: Erc20TransferService,
) {}
onModuleInit() {
this.withdrawalEventConsumer.onSystemWithdrawalRequested(
this.handleSystemWithdrawalRequested.bind(this),
);
this.logger.log(`[INIT] SystemWithdrawalRequestedHandler registered`);
}
/**
* Handle system withdrawal requested event from wallet-service
*
* Flow:
* 1. Receive system withdrawal request
* 2. Execute ERC20 transfer from hot wallet
* 3. Publish final status (CONFIRMED or FAILED)
*/
private async handleSystemWithdrawalRequested(
payload: SystemWithdrawalRequestedPayload,
): Promise<void> {
this.logger.log(`[HANDLE] ========== System Withdrawal Request ==========`);
this.logger.log(`[HANDLE] orderNo: ${payload.orderNo}`);
this.logger.log(`[HANDLE] fromAccountSequence: ${payload.fromAccountSequence}`);
this.logger.log(`[HANDLE] fromAccountName: ${payload.fromAccountName}`);
this.logger.log(`[HANDLE] toAccountSequence: ${payload.toAccountSequence}`);
this.logger.log(`[HANDLE] toAddress: ${payload.toAddress}`);
this.logger.log(`[HANDLE] amount: ${payload.amount}`);
this.logger.log(`[HANDLE] chainType: ${payload.chainType}`);
try {
// Step 1: 验证链类型
const chainType = this.parseChainType(payload.chainType);
if (!chainType) {
throw new Error(`Unsupported chain type: ${payload.chainType}`);
}
// Step 2: 检查转账服务是否配置
if (!this.transferService.isConfigured(chainType)) {
throw new Error(`Hot wallet not configured for chain: ${chainType}`);
}
// Step 3: 执行 ERC20 转账
this.logger.log(`[PROCESS] Executing ERC20 transfer for system withdrawal...`);
const result = await this.transferService.transferUsdt(
chainType,
payload.toAddress,
payload.amount,
);
if (result.success && result.txHash) {
// Step 4a: 转账成功,发布确认状态
this.logger.log(`[SUCCESS] System withdrawal ${payload.orderNo} confirmed!`);
this.logger.log(`[SUCCESS] TxHash: ${result.txHash}`);
this.logger.log(`[SUCCESS] Block: ${result.blockNumber}`);
await this.eventPublisher.publish({
eventType: 'blockchain.system-withdrawal.confirmed',
toPayload: () => ({
orderNo: payload.orderNo,
fromAccountSequence: payload.fromAccountSequence,
fromAccountName: payload.fromAccountName,
toAccountSequence: payload.toAccountSequence,
status: 'CONFIRMED',
txHash: result.txHash,
blockNumber: result.blockNumber,
chainType: payload.chainType,
toAddress: payload.toAddress,
amount: payload.amount,
}),
eventId: `sys-wd-confirmed-${payload.orderNo}-${Date.now()}`,
occurredAt: new Date(),
});
this.logger.log(`[COMPLETE] System withdrawal ${payload.orderNo} completed successfully`);
} else {
// Step 4b: 转账失败
throw new Error(result.error || 'Transfer failed');
}
} catch (error) {
this.logger.error(
`[ERROR] Failed to process system withdrawal ${payload.orderNo}`,
error,
);
// 发布失败事件
await this.eventPublisher.publish({
eventType: 'blockchain.system-withdrawal.failed',
toPayload: () => ({
orderNo: payload.orderNo,
fromAccountSequence: payload.fromAccountSequence,
fromAccountName: payload.fromAccountName,
toAccountSequence: payload.toAccountSequence,
status: 'FAILED',
error: error instanceof Error ? error.message : 'Unknown error',
chainType: payload.chainType,
toAddress: payload.toAddress,
amount: payload.amount,
}),
eventId: `sys-wd-failed-${payload.orderNo}-${Date.now()}`,
occurredAt: new Date(),
});
throw error;
}
}
/**
*
*/
private parseChainType(chainType: string): ChainTypeEnum | null {
const normalized = chainType.toUpperCase();
if (normalized === 'KAVA') return ChainTypeEnum.KAVA;
if (normalized === 'BSC') return ChainTypeEnum.BSC;
return null;
}
}

View File

@ -26,7 +26,18 @@ export interface WithdrawalRequestedPayload {
toAddress: string;
}
export interface SystemWithdrawalRequestedPayload {
orderNo: string;
fromAccountSequence: string;
fromAccountName: string;
toAccountSequence: string;
toAddress: string;
amount: string;
chainType: string;
}
export type WithdrawalEventHandler = (payload: WithdrawalRequestedPayload) => Promise<void>;
export type SystemWithdrawalEventHandler = (payload: SystemWithdrawalRequestedPayload) => Promise<void>;
@Injectable()
export class WithdrawalEventConsumerService implements OnModuleInit, OnModuleDestroy {
@ -36,6 +47,7 @@ export class WithdrawalEventConsumerService implements OnModuleInit, OnModuleDes
private isConnected = false;
private withdrawalRequestedHandler?: WithdrawalEventHandler;
private systemWithdrawalRequestedHandler?: SystemWithdrawalEventHandler;
constructor(private readonly configService: ConfigService) {}
@ -103,6 +115,14 @@ export class WithdrawalEventConsumerService implements OnModuleInit, OnModuleDes
this.logger.log(`[REGISTER] WithdrawalRequested handler registered`);
}
/**
* Register handler for system withdrawal requested events
*/
onSystemWithdrawalRequested(handler: SystemWithdrawalEventHandler): void {
this.systemWithdrawalRequestedHandler = handler;
this.logger.log(`[REGISTER] SystemWithdrawalRequested handler registered`);
}
private async startConsuming(): Promise<void> {
await this.consumer.run({
eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
@ -137,6 +157,20 @@ export class WithdrawalEventConsumerService implements OnModuleInit, OnModuleDes
} else {
this.logger.warn(`[HANDLE] No handler registered for WithdrawalRequested`);
}
} else if (eventType === 'wallet.system-withdrawal.requested') {
this.logger.log(`[HANDLE] Processing SystemWithdrawalRequested event`);
this.logger.log(`[HANDLE] orderNo: ${payload.orderNo}`);
this.logger.log(`[HANDLE] fromAccountSequence: ${payload.fromAccountSequence}`);
this.logger.log(`[HANDLE] toAccountSequence: ${payload.toAccountSequence}`);
this.logger.log(`[HANDLE] toAddress: ${payload.toAddress}`);
this.logger.log(`[HANDLE] amount: ${payload.amount}`);
if (this.systemWithdrawalRequestedHandler) {
await this.systemWithdrawalRequestedHandler(payload as SystemWithdrawalRequestedPayload);
this.logger.log(`[HANDLE] SystemWithdrawalRequested handler completed`);
} else {
this.logger.warn(`[HANDLE] No handler registered for SystemWithdrawalRequested`);
}
} else {
this.logger.warn(`[RECEIVE] Unknown event type: ${eventType}`);
}

View File

@ -21,6 +21,7 @@ import {
ApiConsumes,
ApiBody,
ApiQuery,
ApiParam,
} from '@nestjs/swagger';
import { UserApplicationService } from '@/application/services/user-application.service';
import { StorageService } from '@/infrastructure/external/storage/storage.service';
@ -799,6 +800,31 @@ export class UserAccountController {
};
}
@Get('internal/users/by-account-sequence/:accountSequence')
@Public()
@ApiOperation({
summary: '通过 accountSequence 查询用户信息(内部调用)',
description: '通过用户的 accountSequence 查询详细信息,包括钱包地址',
})
@ApiParam({ name: 'accountSequence', description: '账户序列号 (如 D25121400005)' })
@ApiResponse({ status: 200, description: '返回用户信息' })
@ApiResponse({ status: 404, description: '找不到用户' })
async getUserByAccountSequence(
@Param('accountSequence') accountSequence: string,
) {
const result = await this.userService.findUserByAccountSequence(accountSequence);
if (!result) {
return { found: false, accountSequence: null, userId: null, realName: null, walletAddress: null };
}
return {
found: true,
accountSequence: result.accountSequence,
userId: result.userId.toString(),
realName: result.realName,
walletAddress: result.walletAddress,
};
}
@Post('upload-avatar')
@ApiBearerAuth()
@ApiOperation({ summary: '上传用户头像' })

View File

@ -2760,6 +2760,58 @@ export class UserApplicationService {
};
}
/**
* accountSequence
*
*/
async findUserByAccountSequence(
accountSequence: string,
): Promise<{
accountSequence: string;
userId: bigint;
realName: string | null;
walletAddress: string | null;
} | null> {
this.logger.log(`Finding user by accountSequence: ${accountSequence}`);
// 查询用户
const user = await this.prisma.userAccount.findUnique({
where: { accountSequence },
select: {
userId: true,
accountSequence: true,
realName: true,
},
});
if (!user) {
this.logger.debug(`No user found for accountSequence: ${accountSequence}`);
return null;
}
// 查询钱包地址(默认 KAVA 链)
const walletAddress = await this.prisma.walletAddress.findFirst({
where: {
userId: user.userId,
chainType: 'KAVA',
},
select: {
address: true,
},
});
this.logger.log(
`Found user ${user.accountSequence}: realName=${user.realName}, walletAddress=${walletAddress?.address}`,
);
return {
accountSequence: user.accountSequence,
userId: user.userId,
realName: user.realName,
walletAddress: walletAddress?.address || null,
};
}
/**
*
*

View File

@ -7,12 +7,14 @@ import {
LedgerController,
DepositController,
HealthController,
SystemWithdrawalController,
} from './controllers';
import { InternalWalletController } from './controllers/internal-wallet.controller';
import { FiatWithdrawalController } from './controllers/fiat-withdrawal.controller';
import { WalletApplicationService, FiatWithdrawalApplicationService } from '@/application/services';
import { WalletApplicationService, FiatWithdrawalApplicationService, SystemWithdrawalApplicationService } from '@/application/services';
import { DepositConfirmedHandler, PlantingCreatedHandler } from '@/application/event-handlers';
import { WithdrawalStatusHandler } from '@/application/event-handlers/withdrawal-status.handler';
import { SystemWithdrawalStatusHandler } from '@/application/event-handlers/system-withdrawal-status.handler';
import { ExpiredRewardsScheduler } from '@/application/schedulers';
import { JwtStrategy } from '@/shared/strategies/jwt.strategy';
@ -34,13 +36,16 @@ import { JwtStrategy } from '@/shared/strategies/jwt.strategy';
HealthController,
InternalWalletController,
FiatWithdrawalController,
SystemWithdrawalController,
],
providers: [
WalletApplicationService,
FiatWithdrawalApplicationService,
SystemWithdrawalApplicationService,
DepositConfirmedHandler,
PlantingCreatedHandler,
WithdrawalStatusHandler,
SystemWithdrawalStatusHandler,
ExpiredRewardsScheduler,
JwtStrategy,
],

View File

@ -2,3 +2,4 @@ export * from './wallet.controller';
export * from './ledger.controller';
export * from './deposit.controller';
export * from './health.controller';
export * from './system-withdrawal.controller';

View File

@ -0,0 +1,220 @@
/**
* System Withdrawal Controller
*
* API
*
*/
import {
Controller,
Get,
Post,
Body,
Query,
Logger,
BadRequestException,
} from '@nestjs/common';
import {
ApiTags,
ApiOperation,
ApiResponse,
ApiQuery,
ApiBody,
} from '@nestjs/swagger';
import { Public } from '@/shared/decorators';
import { SystemWithdrawalApplicationService } from '@/application/services';
// DTO 定义
class SystemWithdrawalRequestDTO {
fromAccountSequence: string;
toAccountSequence: string;
amount: number;
memo?: string;
operatorId: string;
operatorName?: string;
}
/**
*
*
*/
@ApiTags('System Withdrawal (Internal)')
@Controller('system-withdrawal')
export class SystemWithdrawalController {
private readonly logger = new Logger(SystemWithdrawalController.name);
constructor(
private readonly systemWithdrawalService: SystemWithdrawalApplicationService,
) {}
/**
*
*/
@Post('request')
@Public()
@ApiOperation({
summary: '发起系统账户转出内部API',
description: '从系统账户(总部、运营、区域等)转出资金到用户账户',
})
@ApiBody({
schema: {
type: 'object',
required: ['fromAccountSequence', 'toAccountSequence', 'amount', 'operatorId'],
properties: {
fromAccountSequence: {
type: 'string',
description: '转出方系统账户序列号',
example: 'S0000000003',
},
toAccountSequence: {
type: 'string',
description: '接收方用户充值ID',
example: 'D25122800032',
},
amount: {
type: 'number',
description: '转出金额(绿积分)',
example: 1000,
},
memo: {
type: 'string',
description: '备注',
example: '补发奖励',
},
operatorId: {
type: 'string',
description: '操作管理员ID',
example: 'admin_001',
},
operatorName: {
type: 'string',
description: '操作管理员姓名',
example: '管理员张三',
},
},
},
})
@ApiResponse({ status: 200, description: '转出订单创建成功' })
@ApiResponse({ status: 400, description: '参数错误或余额不足' })
async requestSystemWithdrawal(@Body() dto: SystemWithdrawalRequestDTO) {
this.logger.log(`[REQUEST] 系统账户转出请求: ${JSON.stringify(dto)}`);
// 验证必填参数
if (!dto.fromAccountSequence) {
throw new BadRequestException('转出账户不能为空');
}
if (!dto.toAccountSequence) {
throw new BadRequestException('接收账户不能为空');
}
if (!dto.amount || dto.amount <= 0) {
throw new BadRequestException('转出金额必须大于0');
}
if (!dto.operatorId) {
throw new BadRequestException('操作员ID不能为空');
}
const result = await this.systemWithdrawalService.requestSystemWithdrawal({
fromAccountSequence: dto.fromAccountSequence,
toAccountSequence: dto.toAccountSequence,
amount: dto.amount,
memo: dto.memo,
operatorId: dto.operatorId,
operatorName: dto.operatorName,
});
this.logger.log(`[REQUEST] 转出订单创建成功: ${result.orderNo}`);
return {
success: true,
data: result,
};
}
/**
*
*/
@Get('accounts')
@Public()
@ApiOperation({
summary: '获取可转出的系统账户列表内部API',
description: '获取所有允许转出的系统账户及其余额',
})
@ApiResponse({ status: 200, description: '系统账户列表' })
async getWithdrawableAccounts() {
this.logger.log('[ACCOUNTS] 查询可转出系统账户');
const accounts = await this.systemWithdrawalService.getWithdrawableSystemAccounts();
return {
success: true,
data: accounts,
};
}
/**
*
*/
@Get('orders')
@Public()
@ApiOperation({
summary: '查询系统账户转出订单列表内部API',
description: '分页查询系统账户转出订单',
})
@ApiQuery({ name: 'fromAccountSequence', required: false, description: '转出账户筛选' })
@ApiQuery({ name: 'toAccountSequence', required: false, description: '接收账户筛选' })
@ApiQuery({ name: 'status', required: false, description: '状态筛选 (PENDING/FROZEN/CONFIRMED/FAILED)' })
@ApiQuery({ name: 'page', required: false, description: '页码默认1' })
@ApiQuery({ name: 'pageSize', required: false, description: '每页数量默认20' })
@ApiResponse({ status: 200, description: '订单列表' })
async getOrders(
@Query('fromAccountSequence') fromAccountSequence?: string,
@Query('toAccountSequence') toAccountSequence?: string,
@Query('status') status?: string,
@Query('page') page?: string,
@Query('pageSize') pageSize?: string,
) {
this.logger.log(`[ORDERS] 查询转出订单: from=${fromAccountSequence}, to=${toAccountSequence}, status=${status}`);
const result = await this.systemWithdrawalService.getSystemWithdrawalOrders({
fromAccountSequence,
toAccountSequence,
status,
page: page ? parseInt(page, 10) : 1,
pageSize: pageSize ? parseInt(pageSize, 10) : 20,
});
return {
success: true,
data: result,
};
}
/**
*
*/
@Get('account-name')
@Public()
@ApiOperation({
summary: '获取系统账户名称内部API',
description: '根据账户序列号获取系统账户的显示名称',
})
@ApiQuery({ name: 'accountSequence', required: true, description: '账户序列号' })
@ApiResponse({ status: 200, description: '账户名称' })
async getAccountName(@Query('accountSequence') accountSequence: string) {
if (!accountSequence) {
throw new BadRequestException('账户序列号不能为空');
}
const name = this.systemWithdrawalService.getSystemAccountName(accountSequence);
const isAllowed = this.systemWithdrawalService.isWithdrawalAllowed(accountSequence);
return {
success: true,
data: {
accountSequence,
name,
isWithdrawalAllowed: isAllowed,
},
};
}
}

View File

@ -0,0 +1,87 @@
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import {
WithdrawalEventConsumerService,
SystemWithdrawalConfirmedPayload,
SystemWithdrawalFailedPayload,
} from '@/infrastructure/kafka/withdrawal-event-consumer.service';
import { SystemWithdrawalApplicationService } from '@/application/services';
/**
* System Withdrawal Status Handler
*
* Handles system withdrawal status events from blockchain-service.
* - On CONFIRMED: Updates order status and credits receiver's wallet
* - On FAILED: Updates order status and refunds sender's wallet
*/
@Injectable()
export class SystemWithdrawalStatusHandler implements OnModuleInit {
private readonly logger = new Logger(SystemWithdrawalStatusHandler.name);
constructor(
private readonly withdrawalEventConsumer: WithdrawalEventConsumerService,
private readonly systemWithdrawalService: SystemWithdrawalApplicationService,
) {}
onModuleInit() {
this.withdrawalEventConsumer.onSystemWithdrawalConfirmed(
this.handleSystemWithdrawalConfirmed.bind(this),
);
this.withdrawalEventConsumer.onSystemWithdrawalFailed(
this.handleSystemWithdrawalFailed.bind(this),
);
this.logger.log(`[INIT] SystemWithdrawalStatusHandler registered`);
}
/**
* Handle system withdrawal confirmed event
* Update order status to CONFIRMED and credit receiver's wallet
*/
private async handleSystemWithdrawalConfirmed(
payload: SystemWithdrawalConfirmedPayload,
): Promise<void> {
this.logger.log(`[CONFIRMED] Processing system withdrawal confirmation`);
this.logger.log(`[CONFIRMED] orderNo: ${payload.orderNo}`);
this.logger.log(`[CONFIRMED] txHash: ${payload.txHash}`);
this.logger.log(`[CONFIRMED] toAccountSequence: ${payload.toAccountSequence}`);
try {
await this.systemWithdrawalService.handleWithdrawalConfirmed(
payload.orderNo,
payload.txHash,
);
this.logger.log(`[CONFIRMED] System withdrawal ${payload.orderNo} confirmed successfully`);
} catch (error) {
this.logger.error(
`[CONFIRMED] Failed to process system withdrawal confirmation: ${payload.orderNo}`,
error,
);
throw error;
}
}
/**
* Handle system withdrawal failed event
* Update order status to FAILED and refund sender's wallet
*/
private async handleSystemWithdrawalFailed(
payload: SystemWithdrawalFailedPayload,
): Promise<void> {
this.logger.log(`[FAILED] Processing system withdrawal failure`);
this.logger.log(`[FAILED] orderNo: ${payload.orderNo}`);
this.logger.log(`[FAILED] error: ${payload.error}`);
try {
await this.systemWithdrawalService.handleWithdrawalFailed(
payload.orderNo,
payload.error,
);
this.logger.log(`[FAILED] System withdrawal ${payload.orderNo} failure processed`);
} catch (error) {
this.logger.error(
`[FAILED] Failed to process system withdrawal failure: ${payload.orderNo}`,
error,
);
throw error;
}
}
}

View File

@ -1,2 +1,3 @@
export * from './wallet-application.service';
export * from './fiat-withdrawal-application.service';
export * from './system-withdrawal-application.service';

View File

@ -0,0 +1,530 @@
/**
* System Withdrawal Application Service
*
*
* -
* -
* - +
*/
import { Injectable, Logger, BadRequestException, Inject } from '@nestjs/common';
import { PrismaService } from '@/infrastructure/persistence/prisma/prisma.service';
import { EventPublisherService } from '@/infrastructure/kafka';
import { IdentityClientService } from '@/infrastructure/external/identity/identity-client.service';
import { HotWalletCacheService } from '@/infrastructure/redis';
import { LedgerEntryType } from '@/domain/value-objects/ledger-entry-type.enum';
import Decimal from 'decimal.js';
// 系统账户名称映射
const SYSTEM_ACCOUNT_NAMES: Record<string, string> = {
'S0000000001': '总部账户',
'S0000000002': '成本账户',
'S0000000003': '运营账户',
'S0000000004': 'RWAD底池',
'S0000000005': '分享权益池',
'S0000000006': '手续费归集',
};
// 允许转出的系统账户白名单
const ALLOWED_WITHDRAWAL_ACCOUNTS = new Set([
'S0000000001', // 总部账户
'S0000000003', // 运营账户
'S0000000005', // 分享权益池
'S0000000006', // 手续费归集
]);
export interface SystemWithdrawalCommand {
fromAccountSequence: string; // 系统账户序列号
toAccountSequence: string; // 接收方充值ID
amount: number; // 转出金额
memo?: string; // 备注
operatorId: string; // 操作管理员ID
operatorName?: string; // 操作管理员姓名
}
export interface SystemWithdrawalResult {
orderNo: string;
fromAccountSequence: string;
fromAccountName: string;
toAccountSequence: string;
toUserName: string | null;
toAddress: string;
amount: number;
status: string;
}
@Injectable()
export class SystemWithdrawalApplicationService {
private readonly logger = new Logger(SystemWithdrawalApplicationService.name);
constructor(
private readonly prisma: PrismaService,
private readonly eventPublisher: EventPublisherService,
private readonly identityClient: IdentityClientService,
private readonly hotWalletCacheService: HotWalletCacheService,
) {}
/**
*
*/
getSystemAccountName(accountSequence: string): string {
// 固定系统账户
if (SYSTEM_ACCOUNT_NAMES[accountSequence]) {
return SYSTEM_ACCOUNT_NAMES[accountSequence];
}
// 省区域账户: 9 + 省代码
if (accountSequence.startsWith('9') && accountSequence.length === 7) {
return `省区域(${accountSequence.substring(1)})`;
}
// 市区域账户: 8 + 市代码
if (accountSequence.startsWith('8') && accountSequence.length === 7) {
return `市区域(${accountSequence.substring(1)})`;
}
return `系统账户(${accountSequence})`;
}
/**
*
*/
isWithdrawalAllowed(accountSequence: string): boolean {
// 固定系统账户白名单
if (ALLOWED_WITHDRAWAL_ACCOUNTS.has(accountSequence)) {
return true;
}
// 省区域账户: 9 + 省代码
if (accountSequence.startsWith('9') && accountSequence.length === 7) {
return true;
}
// 市区域账户: 8 + 市代码
if (accountSequence.startsWith('8') && accountSequence.length === 7) {
return true;
}
return false;
}
/**
*
*/
async requestSystemWithdrawal(command: SystemWithdrawalCommand): Promise<SystemWithdrawalResult> {
this.logger.log(`[SYSTEM_WITHDRAWAL] 发起转出: ${command.fromAccountSequence} -> ${command.toAccountSequence}, 金额: ${command.amount}`);
// 1. 验证转出账户是否在白名单中
if (!this.isWithdrawalAllowed(command.fromAccountSequence)) {
throw new BadRequestException(`账户 ${command.fromAccountSequence} 不允许转出`);
}
// 2. 获取系统账户名称
const fromAccountName = this.getSystemAccountName(command.fromAccountSequence);
// 3. 验证接收方账户(必须是 D 开头的用户账户)
if (!command.toAccountSequence.startsWith('D')) {
throw new BadRequestException('接收方必须是用户账户D开头');
}
// 4. 获取接收方用户信息和区块链地址
const toUserInfo = await this.identityClient.getUserInfoByAccountSequence(command.toAccountSequence);
if (!toUserInfo) {
throw new BadRequestException(`未找到接收方用户: ${command.toAccountSequence}`);
}
// 5. 检查热钱包余额
const hotWalletCheck = await this.hotWalletCacheService.checkSufficientBalance(
'KAVA',
new Decimal(command.amount),
);
if (!hotWalletCheck.sufficient) {
this.logger.warn(`[SYSTEM_WITHDRAWAL] 热钱包余额不足: ${hotWalletCheck.error}`);
throw new BadRequestException(hotWalletCheck.error || '财务系统审计中,请稍后再试');
}
// 6. 在事务中执行
const result = await this.prisma.$transaction(async (tx) => {
// 6.1 查找系统账户并验证余额
const systemWallet = await tx.walletAccount.findUnique({
where: { accountSequence: command.fromAccountSequence },
});
if (!systemWallet) {
throw new BadRequestException(`系统账户不存在: ${command.fromAccountSequence}`);
}
const currentBalance = new Decimal(systemWallet.usdtAvailable.toString());
const withdrawAmount = new Decimal(command.amount);
if (currentBalance.lessThan(withdrawAmount)) {
throw new BadRequestException(
`余额不足: 当前 ${currentBalance.toFixed(2)} 绿积分, 需要 ${withdrawAmount.toFixed(2)} 绿积分`
);
}
// 6.2 生成订单号
const orderNo = this.generateOrderNo();
// 6.3 扣减系统账户余额
const newBalance = currentBalance.minus(withdrawAmount);
await tx.walletAccount.update({
where: { id: systemWallet.id },
data: {
usdtAvailable: newBalance,
updatedAt: new Date(),
},
});
// 6.4 记录系统账户转出流水
await tx.ledgerEntry.create({
data: {
accountSequence: command.fromAccountSequence,
userId: systemWallet.userId,
entryType: LedgerEntryType.SYSTEM_TRANSFER_OUT,
amount: withdrawAmount.negated(),
assetType: 'USDT',
balanceAfter: newBalance,
refOrderId: orderNo,
memo: `转账至 ${command.toAccountSequence}${toUserInfo.realName ? ` (${toUserInfo.realName})` : ''}${command.memo ? ` - ${command.memo}` : ''}`,
payloadJson: {
toAccountSequence: command.toAccountSequence,
toUserId: toUserInfo.userId,
toUserName: toUserInfo.realName,
operatorId: command.operatorId,
operatorName: command.operatorName,
},
},
});
// 6.5 创建转出订单
const order = await tx.systemWithdrawalOrder.create({
data: {
orderNo,
fromAccountSequence: command.fromAccountSequence,
fromAccountName,
toAccountSequence: command.toAccountSequence,
toUserId: BigInt(toUserInfo.userId),
toUserName: toUserInfo.realName,
toAddress: toUserInfo.walletAddress,
amount: withdrawAmount,
chainType: 'KAVA',
status: 'FROZEN',
operatorId: command.operatorId,
operatorName: command.operatorName,
memo: command.memo,
frozenAt: new Date(),
},
});
return {
orderNo: order.orderNo,
fromAccountSequence: command.fromAccountSequence,
fromAccountName,
toAccountSequence: command.toAccountSequence,
toUserName: toUserInfo.realName,
toAddress: toUserInfo.walletAddress,
amount: command.amount,
status: order.status,
};
});
// 7. 发布事件通知 blockchain-service 执行链上转账
await this.eventPublisher.publish({
eventType: 'wallet.system-withdrawal.requested',
payload: {
orderNo: result.orderNo,
fromAccountSequence: result.fromAccountSequence,
fromAccountName: result.fromAccountName,
toAccountSequence: result.toAccountSequence,
toAddress: result.toAddress,
amount: command.amount.toString(),
chainType: 'KAVA',
},
});
this.logger.log(`[SYSTEM_WITHDRAWAL] 转出订单创建成功: ${result.orderNo}`);
return result;
}
/**
* blockchain-service
*/
async handleWithdrawalConfirmed(orderNo: string, txHash: string): Promise<void> {
this.logger.log(`[SYSTEM_WITHDRAWAL] 处理转账确认: orderNo=${orderNo}, txHash=${txHash}`);
await this.prisma.$transaction(async (tx) => {
// 1. 查找订单
const order = await tx.systemWithdrawalOrder.findUnique({
where: { orderNo },
});
if (!order) {
throw new Error(`订单不存在: ${orderNo}`);
}
if (order.status === 'CONFIRMED') {
this.logger.warn(`[SYSTEM_WITHDRAWAL] 订单已确认,跳过: ${orderNo}`);
return;
}
// 2. 更新订单状态
await tx.systemWithdrawalOrder.update({
where: { orderNo },
data: {
status: 'CONFIRMED',
txHash,
confirmedAt: new Date(),
},
});
// 3. 查找接收方钱包(如果不存在则创建)
let toWallet = await tx.walletAccount.findUnique({
where: { accountSequence: order.toAccountSequence },
});
if (!toWallet) {
this.logger.log(`[SYSTEM_WITHDRAWAL] 接收方钱包不存在,自动创建: ${order.toAccountSequence}`);
toWallet = await tx.walletAccount.upsert({
where: { accountSequence: order.toAccountSequence },
create: {
accountSequence: order.toAccountSequence,
userId: order.toUserId,
usdtAvailable: new Decimal(0),
usdtFrozen: new Decimal(0),
dstAvailable: new Decimal(0),
dstFrozen: new Decimal(0),
bnbAvailable: new Decimal(0),
bnbFrozen: new Decimal(0),
ogAvailable: new Decimal(0),
ogFrozen: new Decimal(0),
rwadAvailable: new Decimal(0),
rwadFrozen: new Decimal(0),
hashpower: new Decimal(0),
pendingUsdt: new Decimal(0),
pendingHashpower: new Decimal(0),
settleableUsdt: new Decimal(0),
settleableHashpower: new Decimal(0),
settledTotalUsdt: new Decimal(0),
settledTotalHashpower: new Decimal(0),
expiredTotalUsdt: new Decimal(0),
expiredTotalHashpower: new Decimal(0),
status: 'ACTIVE',
hasPlanted: false,
version: 0,
},
update: {},
});
}
// 4. 增加接收方余额
const transferAmount = new Decimal(order.amount.toString());
const toCurrentBalance = new Decimal(toWallet.usdtAvailable.toString());
const toNewBalance = toCurrentBalance.plus(transferAmount);
await tx.walletAccount.update({
where: { id: toWallet.id },
data: {
usdtAvailable: toNewBalance,
updatedAt: new Date(),
},
});
// 5. 记录接收方转入流水
await tx.ledgerEntry.create({
data: {
accountSequence: order.toAccountSequence,
userId: order.toUserId,
entryType: LedgerEntryType.SYSTEM_TRANSFER_IN,
amount: transferAmount,
assetType: 'USDT',
balanceAfter: toNewBalance,
refOrderId: orderNo,
refTxHash: txHash,
memo: `来自${order.fromAccountName}的转入${order.memo ? ` - ${order.memo}` : ''}`,
payloadJson: {
fromAccountSequence: order.fromAccountSequence,
fromAccountName: order.fromAccountName,
},
},
});
this.logger.log(`[SYSTEM_WITHDRAWAL] 转账确认完成: ${orderNo}, 接收方余额: ${toNewBalance}`);
});
}
/**
* blockchain-service
*/
async handleWithdrawalFailed(orderNo: string, error: string): Promise<void> {
this.logger.log(`[SYSTEM_WITHDRAWAL] 处理转账失败: orderNo=${orderNo}, error=${error}`);
await this.prisma.$transaction(async (tx) => {
// 1. 查找订单
const order = await tx.systemWithdrawalOrder.findUnique({
where: { orderNo },
});
if (!order) {
throw new Error(`订单不存在: ${orderNo}`);
}
if (order.status === 'FAILED' || order.status === 'CONFIRMED') {
this.logger.warn(`[SYSTEM_WITHDRAWAL] 订单状态已终结,跳过: ${orderNo}, status=${order.status}`);
return;
}
// 2. 更新订单状态
await tx.systemWithdrawalOrder.update({
where: { orderNo },
data: {
status: 'FAILED',
errorMessage: error,
},
});
// 3. 退还系统账户余额
const systemWallet = await tx.walletAccount.findUnique({
where: { accountSequence: order.fromAccountSequence },
});
if (systemWallet) {
const refundAmount = new Decimal(order.amount.toString());
const currentBalance = new Decimal(systemWallet.usdtAvailable.toString());
const newBalance = currentBalance.plus(refundAmount);
await tx.walletAccount.update({
where: { id: systemWallet.id },
data: {
usdtAvailable: newBalance,
updatedAt: new Date(),
},
});
// 记录退款流水
await tx.ledgerEntry.create({
data: {
accountSequence: order.fromAccountSequence,
userId: systemWallet.userId,
entryType: LedgerEntryType.UNFREEZE,
amount: refundAmount,
assetType: 'USDT',
balanceAfter: newBalance,
refOrderId: orderNo,
memo: `转账失败退款: ${error}`,
payloadJson: {
toAccountSequence: order.toAccountSequence,
error,
},
},
});
}
this.logger.log(`[SYSTEM_WITHDRAWAL] 转账失败处理完成: ${orderNo}`);
});
}
/**
*
*/
async getSystemWithdrawalOrders(params: {
fromAccountSequence?: string;
toAccountSequence?: string;
status?: string;
page?: number;
pageSize?: number;
}): Promise<{
orders: any[];
total: number;
page: number;
pageSize: number;
}> {
const page = params.page || 1;
const pageSize = params.pageSize || 20;
const skip = (page - 1) * pageSize;
const where: any = {};
if (params.fromAccountSequence) {
where.fromAccountSequence = params.fromAccountSequence;
}
if (params.toAccountSequence) {
where.toAccountSequence = params.toAccountSequence;
}
if (params.status) {
where.status = params.status;
}
const [orders, total] = await Promise.all([
this.prisma.systemWithdrawalOrder.findMany({
where,
orderBy: { createdAt: 'desc' },
skip,
take: pageSize,
}),
this.prisma.systemWithdrawalOrder.count({ where }),
]);
return {
orders: orders.map((o) => ({
...o,
id: o.id.toString(),
toUserId: o.toUserId.toString(),
amount: o.amount.toString(),
})),
total,
page,
pageSize,
};
}
/**
*
*/
async getWithdrawableSystemAccounts(): Promise<{
accountSequence: string;
accountName: string;
balance: string;
}[]> {
const accounts: string[] = [
'S0000000001', // 总部账户
'S0000000003', // 运营账户
'S0000000005', // 分享权益池
'S0000000006', // 手续费归集
];
// 查询固定系统账户
const wallets = await this.prisma.walletAccount.findMany({
where: {
accountSequence: { in: accounts },
},
});
// 查询区域账户(省区域 9 开头,市区域 8 开头)
const regionWallets = await this.prisma.walletAccount.findMany({
where: {
OR: [
{ accountSequence: { startsWith: '9' } },
{ accountSequence: { startsWith: '8' } },
],
},
});
const allWallets = [...wallets, ...regionWallets];
return allWallets.map((w) => ({
accountSequence: w.accountSequence,
accountName: this.getSystemAccountName(w.accountSequence),
balance: w.usdtAvailable.toString(),
}));
}
/**
*
*/
private generateOrderNo(): string {
const timestamp = Date.now().toString(36).toUpperCase();
const random = Math.random().toString(36).substring(2, 8).toUpperCase();
return `SWD${timestamp}${random}`;
}
}

View File

@ -261,6 +261,55 @@ export class IdentityClientService {
}
}
/**
* accountSequence
*
*
* @param accountSequence ( D25121400005)
* @returns null
*/
async getUserInfoByAccountSequence(
accountSequence: string,
): Promise<{
accountSequence: string;
userId: string;
realName: string | null;
walletAddress: string;
} | null> {
try {
this.logger.log(`查询用户信息: accountSequence=${accountSequence}`);
const response = await this.httpClient.get(
`/user/internal/users/by-account-sequence/${accountSequence}`,
);
// identity-service 响应格式: { success: true, data: { found: true, ... } }
const data = response.data?.data;
if (!data?.found) {
this.logger.debug(`未找到用户: ${accountSequence}`);
return null;
}
this.logger.log(`用户信息: ${accountSequence} -> userId=${data.userId}, realName=${data.realName}`);
return {
accountSequence: data.accountSequence,
userId: data.userId,
realName: data.realName || null,
walletAddress: data.walletAddress,
};
} catch (error: any) {
this.logger.error(
`查询用户信息失败: ${accountSequence}, error=${error.message}`,
);
if (error.response?.status === 404) {
return null;
}
throw new HttpException('无法查询用户信息', HttpStatus.SERVICE_UNAVAILABLE);
}
}
/**
*
*

View File

@ -36,8 +36,35 @@ export interface WithdrawalFailedPayload {
netAmount: number;
}
export interface SystemWithdrawalConfirmedPayload {
orderNo: string;
fromAccountSequence: string;
fromAccountName: string;
toAccountSequence: string;
status: 'CONFIRMED';
txHash: string;
blockNumber?: number;
chainType: string;
toAddress: string;
amount: string;
}
export interface SystemWithdrawalFailedPayload {
orderNo: string;
fromAccountSequence: string;
fromAccountName: string;
toAccountSequence: string;
status: 'FAILED';
error: string;
chainType: string;
toAddress: string;
amount: string;
}
export type WithdrawalConfirmedHandler = (payload: WithdrawalConfirmedPayload) => Promise<void>;
export type WithdrawalFailedHandler = (payload: WithdrawalFailedPayload) => Promise<void>;
export type SystemWithdrawalConfirmedHandler = (payload: SystemWithdrawalConfirmedPayload) => Promise<void>;
export type SystemWithdrawalFailedHandler = (payload: SystemWithdrawalFailedPayload) => Promise<void>;
@Injectable()
export class WithdrawalEventConsumerService implements OnModuleInit, OnModuleDestroy {
@ -48,6 +75,8 @@ export class WithdrawalEventConsumerService implements OnModuleInit, OnModuleDes
private withdrawalConfirmedHandler?: WithdrawalConfirmedHandler;
private withdrawalFailedHandler?: WithdrawalFailedHandler;
private systemWithdrawalConfirmedHandler?: SystemWithdrawalConfirmedHandler;
private systemWithdrawalFailedHandler?: SystemWithdrawalFailedHandler;
constructor(private readonly configService: ConfigService) {}
@ -122,6 +151,22 @@ export class WithdrawalEventConsumerService implements OnModuleInit, OnModuleDes
this.logger.log(`[REGISTER] WithdrawalFailed handler registered`);
}
/**
* Register handler for system withdrawal confirmed events
*/
onSystemWithdrawalConfirmed(handler: SystemWithdrawalConfirmedHandler): void {
this.systemWithdrawalConfirmedHandler = handler;
this.logger.log(`[REGISTER] SystemWithdrawalConfirmed handler registered`);
}
/**
* Register handler for system withdrawal failed events
*/
onSystemWithdrawalFailed(handler: SystemWithdrawalFailedHandler): void {
this.systemWithdrawalFailedHandler = handler;
this.logger.log(`[REGISTER] SystemWithdrawalFailed handler registered`);
}
private async startConsuming(): Promise<void> {
await this.consumer.run({
eachMessage: async ({ topic, partition, message }: EachMessagePayload) => {
@ -166,6 +211,29 @@ export class WithdrawalEventConsumerService implements OnModuleInit, OnModuleDes
} else {
this.logger.warn(`[HANDLE] No handler registered for WithdrawalFailed`);
}
} else if (eventType === 'blockchain.system-withdrawal.confirmed') {
this.logger.log(`[HANDLE] Processing SystemWithdrawalConfirmed event`);
this.logger.log(`[HANDLE] orderNo: ${payload.orderNo}`);
this.logger.log(`[HANDLE] txHash: ${payload.txHash}`);
this.logger.log(`[HANDLE] toAccountSequence: ${payload.toAccountSequence}`);
if (this.systemWithdrawalConfirmedHandler) {
await this.systemWithdrawalConfirmedHandler(payload as SystemWithdrawalConfirmedPayload);
this.logger.log(`[HANDLE] SystemWithdrawalConfirmed handler completed`);
} else {
this.logger.warn(`[HANDLE] No handler registered for SystemWithdrawalConfirmed`);
}
} else if (eventType === 'blockchain.system-withdrawal.failed') {
this.logger.log(`[HANDLE] Processing SystemWithdrawalFailed event`);
this.logger.log(`[HANDLE] orderNo: ${payload.orderNo}`);
this.logger.log(`[HANDLE] error: ${payload.error}`);
if (this.systemWithdrawalFailedHandler) {
await this.systemWithdrawalFailedHandler(payload as SystemWithdrawalFailedPayload);
this.logger.log(`[HANDLE] SystemWithdrawalFailed handler completed`);
} else {
this.logger.warn(`[HANDLE] No handler registered for SystemWithdrawalFailed`);
}
} else if (eventType === 'blockchain.withdrawal.status' || eventType === 'blockchain.withdrawal.received') {
// Log status updates but don't process them (informational only)
this.logger.log(`[INFO] Withdrawal status update: ${payload.status} for ${payload.orderNo}`);