feat(wallet-service): 三层保护机制确保内部转账接收方钱包存在

新增三层保护机制:
1. 用户注册时:监听 identity.UserAccountCreated 事件自动创建钱包
2. 发起转账时:检测内部转账后调用 ensureWalletExists() 预创建钱包
3. 链上确认时:原有 upsert 逻辑兜底(保持不变)

新增文件:
- identity-event-consumer.service.ts: 消费 identity 用户注册事件
- user-account-created.handler.ts: 处理用户注册事件创建钱包

新增 API:
- POST /wallets/ensure-wallet: 确保单个钱包存在
- POST /wallets/ensure-wallets: 批量确保钱包存在

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-08 07:33:47 -08:00
parent 68841abbf4
commit bc38ec6ec0
8 changed files with 407 additions and 1 deletions

View File

@ -12,7 +12,7 @@ import {
import { InternalWalletController } from './controllers/internal-wallet.controller'; import { InternalWalletController } from './controllers/internal-wallet.controller';
import { FiatWithdrawalController } from './controllers/fiat-withdrawal.controller'; import { FiatWithdrawalController } from './controllers/fiat-withdrawal.controller';
import { WalletApplicationService, FiatWithdrawalApplicationService, SystemWithdrawalApplicationService } from '@/application/services'; import { WalletApplicationService, FiatWithdrawalApplicationService, SystemWithdrawalApplicationService } from '@/application/services';
import { DepositConfirmedHandler, PlantingCreatedHandler } from '@/application/event-handlers'; import { DepositConfirmedHandler, PlantingCreatedHandler, UserAccountCreatedHandler } from '@/application/event-handlers';
import { WithdrawalStatusHandler } from '@/application/event-handlers/withdrawal-status.handler'; import { WithdrawalStatusHandler } from '@/application/event-handlers/withdrawal-status.handler';
import { SystemWithdrawalStatusHandler } from '@/application/event-handlers/system-withdrawal-status.handler'; import { SystemWithdrawalStatusHandler } from '@/application/event-handlers/system-withdrawal-status.handler';
import { ExpiredRewardsScheduler } from '@/application/schedulers'; import { ExpiredRewardsScheduler } from '@/application/schedulers';
@ -44,6 +44,8 @@ import { JwtStrategy } from '@/shared/strategies/jwt.strategy';
SystemWithdrawalApplicationService, SystemWithdrawalApplicationService,
DepositConfirmedHandler, DepositConfirmedHandler,
PlantingCreatedHandler, PlantingCreatedHandler,
// [2026-01-08] 新增:用户注册时创建钱包的事件处理器
UserAccountCreatedHandler,
WithdrawalStatusHandler, WithdrawalStatusHandler,
SystemWithdrawalStatusHandler, SystemWithdrawalStatusHandler,
ExpiredRewardsScheduler, ExpiredRewardsScheduler,

View File

@ -494,6 +494,45 @@ export class InternalWalletController {
return result; return result;
} }
// =============== 钱包预创建 API ===============
// [2026-01-08] 新增:确保用户钱包存在(用于用户注册时或内部转账前)
// 回滚方式:删除以下 API 方法即可
@Post('ensure-wallet')
@Public()
@ApiOperation({ summary: '确保用户钱包存在(内部API) - 幂等操作' })
@ApiResponse({ status: 200, description: '钱包信息' })
async ensureWalletExists(
@Body() dto: { accountSequence: string; userId: string },
) {
this.logger.log(`========== ensure-wallet 请求 ==========`);
this.logger.log(`请求参数: ${JSON.stringify(dto)}`);
const result = await this.walletService.ensureWalletExists({
accountSequence: dto.accountSequence,
userId: dto.userId,
});
this.logger.log(`确保钱包结果: ${JSON.stringify(result)}`);
return result;
}
@Post('ensure-wallets')
@Public()
@ApiOperation({ summary: '批量确保用户钱包存在(内部API) - 幂等操作' })
@ApiResponse({ status: 200, description: '钱包信息列表' })
async ensureWalletsExist(
@Body() dto: { wallets: Array<{ accountSequence: string; userId: string }> },
) {
this.logger.log(`========== ensure-wallets 请求 ==========`);
this.logger.log(`请求参数: ${dto.wallets.length} 个钱包`);
const results = await this.walletService.ensureWalletsExist(dto.wallets);
this.logger.log(`批量确保钱包结果: ${results.length} 个已处理`);
return { results };
}
// [2026-01-06] 新增:手续费归集账户统计 API // [2026-01-06] 新增:手续费归集账户统计 API
// 用于系统账户报表中的"手续费账户汇总" Tab // 用于系统账户报表中的"手续费账户汇总" Tab
// 回滚方式:删除以下 API 方法即可 // 回滚方式:删除以下 API 方法即可

View File

@ -1,2 +1,3 @@
export * from './deposit-confirmed.handler'; export * from './deposit-confirmed.handler';
export * from './planting-created.handler'; export * from './planting-created.handler';
export * from './user-account-created.handler';

View File

@ -0,0 +1,72 @@
/**
* User Account Created Event Handler
*
* [2026-01-08]
*
*
* 回滚方式: 删除此文件
*/
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { WalletApplicationService } from '@/application/services';
import {
IdentityEventConsumerService,
UserAccountCreatedPayload,
} from '@/infrastructure/kafka/identity-event-consumer.service';
@Injectable()
export class UserAccountCreatedHandler implements OnModuleInit {
private readonly logger = new Logger(UserAccountCreatedHandler.name);
constructor(
private readonly identityEventConsumer: IdentityEventConsumerService,
private readonly walletService: WalletApplicationService,
) {}
onModuleInit() {
// 注册事件处理器
this.identityEventConsumer.onUserAccountCreated(
this.handleUserAccountCreated.bind(this),
);
this.logger.log(`[INIT] UserAccountCreatedHandler registered`);
}
/**
*
*
*/
private async handleUserAccountCreated(
payload: UserAccountCreatedPayload,
): Promise<void> {
const { userId, accountSequence } = payload;
this.logger.log(
`[HANDLE] Processing user account created: accountSequence=${accountSequence}, userId=${userId}`,
);
try {
// 调用 walletService 确保钱包存在
const result = await this.walletService.ensureWalletExists({
accountSequence,
userId,
});
if (result.existed) {
this.logger.log(
`[HANDLE] Wallet already exists for ${accountSequence}, walletId=${result.walletId}`,
);
} else {
this.logger.log(
`[HANDLE] Created new wallet for ${accountSequence}, walletId=${result.walletId}`,
);
}
} catch (error) {
this.logger.error(
`[ERROR] Failed to create wallet for ${accountSequence}: ${error.message}`,
error.stack,
);
// 重新抛出异常以触发 Kafka 重试
throw error;
}
}
}

View File

@ -1575,6 +1575,23 @@ export class WalletApplicationService {
this.logger.log( this.logger.log(
`Internal transfer detected: ${wallet.accountSequence} -> ${toAccountSequence}`, `Internal transfer detected: ${wallet.accountSequence} -> ${toAccountSequence}`,
); );
// [2026-01-08] 兜底保障:确保接收方钱包存在
// 即使用户注册时已创建钱包,这里再次确认以防万一
try {
const ensureResult = await this.ensureWalletExists({
accountSequence: toAccountSequence,
userId: targetUser.userId,
});
this.logger.log(
`[WITHDRAWAL] Receiver wallet ensured: ${toAccountSequence}, existed=${ensureResult.existed}, walletId=${ensureResult.walletId}`,
);
} catch (error) {
this.logger.error(
`[WITHDRAWAL] Failed to ensure receiver wallet: ${toAccountSequence}, error: ${error.message}`,
);
// 不抛出异常让后续流程继续withdrawal-status.handler 也会兜底创建)
}
} }
// 创建提现订单 // 创建提现订单
@ -3815,4 +3832,117 @@ export class WalletApplicationService {
totalPages: Math.ceil(total / pageSize), totalPages: Math.ceil(total / pageSize),
}; };
} }
// =============== 钱包预创建 API ===============
// [2026-01-08] 新增:用户注册时预创建钱包,确保内部转账时接收方钱包一定存在
// 回滚方式:删除此方法即可
/**
*
*
*
* 1.
* 2.
*
* 使 upsert
*
* @param accountSequence
* @param userId ID
* @returns existedID
*/
async ensureWalletExists(params: {
accountSequence: string;
userId: string;
}): Promise<{
existed: boolean;
walletId: string;
accountSequence: string;
}> {
const { accountSequence, userId } = params;
const userIdBigint = BigInt(userId);
this.logger.log(`[ensureWalletExists] 确保钱包存在: accountSequence=${accountSequence}, userId=${userId}`);
// 先检查是否已存在
const existingWallet = await this.prisma.walletAccount.findUnique({
where: { accountSequence },
});
if (existingWallet) {
this.logger.log(`[ensureWalletExists] 钱包已存在: id=${existingWallet.id}, accountSequence=${accountSequence}`);
return {
existed: true,
walletId: existingWallet.id.toString(),
accountSequence,
};
}
// 使用 upsert 创建钱包(避免并发冲突)
const wallet = await this.prisma.walletAccount.upsert({
where: { accountSequence },
create: {
accountSequence,
userId: userIdBigint,
usdtAvailable: 0,
usdtFrozen: 0,
dstAvailable: 0,
dstFrozen: 0,
bnbAvailable: 0,
bnbFrozen: 0,
ogAvailable: 0,
ogFrozen: 0,
rwadAvailable: 0,
rwadFrozen: 0,
hashpower: 0,
pendingUsdt: 0,
pendingHashpower: 0,
settleableUsdt: 0,
settleableHashpower: 0,
settledTotalUsdt: 0,
settledTotalHashpower: 0,
expiredTotalUsdt: 0,
expiredTotalHashpower: 0,
status: 'ACTIVE',
hasPlanted: false,
version: 0,
},
update: {}, // 如果已存在,不做任何更新
});
// 判断是新创建还是已存在
const wasCreated = wallet.createdAt.getTime() > Date.now() - 1000; // 1秒内创建的认为是新创建
this.logger.log(`[ensureWalletExists] ${wasCreated ? '新创建' : '已存在'}钱包: id=${wallet.id}, accountSequence=${accountSequence}`);
return {
existed: !wasCreated,
walletId: wallet.id.toString(),
accountSequence,
};
}
/**
*
*/
async ensureWalletsExist(params: Array<{
accountSequence: string;
userId: string;
}>): Promise<Array<{
existed: boolean;
walletId: string;
accountSequence: string;
}>> {
const results: Array<{
existed: boolean;
walletId: string;
accountSequence: string;
}> = [];
for (const param of params) {
const result = await this.ensureWalletExists(param);
results.push(result);
}
return results;
}
} }

View File

@ -0,0 +1,155 @@
/**
* Identity Event Consumer Service for Wallet Service
*
* [2026-01-08] identity-service
*
*
* :
* - identity.UserAccountCreated: 用户账户创建事件
* - identity.UserAccountAutoCreated: 自动创建账户事件
*
* 回滚方式: 删除此文件 kafka.module.ts
*/
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Kafka, Consumer, logLevel, EachMessagePayload } from 'kafkajs';
// identity-service 发布的用户事件 Topic
export const IDENTITY_TOPICS = {
USER_ACCOUNT_CREATED: 'identity.UserAccountCreated',
USER_ACCOUNT_AUTO_CREATED: 'identity.UserAccountAutoCreated',
} as const;
// 用户账户创建事件的 payload 结构
export interface UserAccountCreatedPayload {
userId: string;
accountSequence: string;
phoneNumber?: string;
nickname?: string;
referralCode?: string;
inviterUserId?: string;
createdAt: string;
}
export type UserAccountCreatedHandler = (payload: UserAccountCreatedPayload) => Promise<void>;
@Injectable()
export class IdentityEventConsumerService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(IdentityEventConsumerService.name);
private kafka: Kafka;
private consumer: Consumer;
private isConnected = false;
private userAccountCreatedHandler?: UserAccountCreatedHandler;
constructor(private readonly configService: ConfigService) {}
async onModuleInit() {
const brokers = this.configService.get<string>('KAFKA_BROKERS')?.split(',') || ['localhost:9092'];
const clientId = this.configService.get<string>('KAFKA_CLIENT_ID') || 'wallet-service';
const groupId = 'wallet-service-identity-events';
this.logger.log(`[INIT] Identity Event Consumer initializing...`);
this.logger.log(`[INIT] ClientId: ${clientId}`);
this.logger.log(`[INIT] GroupId: ${groupId}`);
this.logger.log(`[INIT] Brokers: ${brokers.join(', ')}`);
this.logger.log(`[INIT] Topics: ${Object.values(IDENTITY_TOPICS).join(', ')}`);
// 企业级重试配置
this.kafka = new Kafka({
clientId,
brokers,
logLevel: logLevel.WARN,
retry: {
initialRetryTime: 1000, // 1 秒
maxRetryTime: 300000, // 最大 5 分钟
retries: 15, // 最多 15 次
multiplier: 2, // 指数退避因子
restartOnFailure: async () => true,
},
});
this.consumer = this.kafka.consumer({
groupId,
sessionTimeout: 30000,
heartbeatInterval: 3000,
});
try {
this.logger.log(`[CONNECT] Connecting Identity Event consumer...`);
await this.consumer.connect();
this.isConnected = true;
this.logger.log(`[CONNECT] Identity Event consumer connected successfully`);
await this.consumer.subscribe({
topics: Object.values(IDENTITY_TOPICS),
fromBeginning: false,
});
this.logger.log(`[SUBSCRIBE] Subscribed to identity topics`);
await this.startConsuming();
} catch (error) {
this.logger.error(`[ERROR] Failed to connect Identity Event consumer`, error);
}
}
async onModuleDestroy() {
if (this.isConnected) {
await this.consumer.disconnect();
this.logger.log('Identity Event consumer disconnected');
}
}
/**
*
*/
onUserAccountCreated(handler: UserAccountCreatedHandler): void {
this.userAccountCreatedHandler = handler;
this.logger.log('[HANDLER] User account created handler registered');
}
private async startConsuming(): Promise<void> {
this.logger.log(`[CONSUME] Starting to consume identity events...`);
await this.consumer.run({
eachMessage: async (messagePayload: EachMessagePayload) => {
const { topic, partition, message } = messagePayload;
try {
if (!message.value) {
this.logger.warn(`[SKIP] Empty message from ${topic}`);
return;
}
const rawValue = message.value.toString();
this.logger.debug(`[RAW] Topic: ${topic}, Partition: ${partition}, Value: ${rawValue}`);
const eventMessage = JSON.parse(rawValue);
const payload = eventMessage.payload || eventMessage;
this.logger.log(`[RECEIVED] Event from ${topic}: userId=${payload.userId}, accountSequence=${payload.accountSequence}`);
// 处理用户账户创建事件
if (
topic === IDENTITY_TOPICS.USER_ACCOUNT_CREATED ||
topic === IDENTITY_TOPICS.USER_ACCOUNT_AUTO_CREATED
) {
if (this.userAccountCreatedHandler) {
await this.userAccountCreatedHandler(payload as UserAccountCreatedPayload);
this.logger.log(`[SUCCESS] Processed user account created event: ${payload.accountSequence}`);
} else {
this.logger.warn(`[SKIP] No handler registered for user account created event`);
}
} else {
this.logger.warn(`[SKIP] Unknown topic: ${topic}`);
}
} catch (error) {
this.logger.error(`[ERROR] Error processing identity event from ${topic}`, error);
// 重新抛出异常以触发 Kafka 重试机制
throw error;
}
},
});
}
}

View File

@ -2,3 +2,4 @@ export * from './kafka.module';
export * from './event-publisher.service'; export * from './event-publisher.service';
export * from './deposit-event-consumer.service'; export * from './deposit-event-consumer.service';
export * from './planting-event-consumer.service'; export * from './planting-event-consumer.service';
export * from './identity-event-consumer.service';

View File

@ -5,6 +5,8 @@ import { EventPublisherService } from './event-publisher.service';
import { DepositEventConsumerService } from './deposit-event-consumer.service'; import { DepositEventConsumerService } from './deposit-event-consumer.service';
import { PlantingEventConsumerService } from './planting-event-consumer.service'; import { PlantingEventConsumerService } from './planting-event-consumer.service';
import { WithdrawalEventConsumerService } from './withdrawal-event-consumer.service'; import { WithdrawalEventConsumerService } from './withdrawal-event-consumer.service';
// [2026-01-08] 新增Identity 事件消费者 - 用户注册时创建钱包
import { IdentityEventConsumerService } from './identity-event-consumer.service';
// [2026-01-07] 新增Outbox Pattern 实现 // [2026-01-07] 新增Outbox Pattern 实现
import { OutboxPublisherService } from './outbox-publisher.service'; import { OutboxPublisherService } from './outbox-publisher.service';
import { OutboxRepository } from '../persistence/repositories/outbox.repository'; import { OutboxRepository } from '../persistence/repositories/outbox.repository';
@ -45,6 +47,8 @@ import { PrismaService } from '../persistence/prisma/prisma.service';
DepositEventConsumerService, DepositEventConsumerService,
PlantingEventConsumerService, PlantingEventConsumerService,
WithdrawalEventConsumerService, WithdrawalEventConsumerService,
// [2026-01-08] 新增Identity 事件消费者
IdentityEventConsumerService,
// [2026-01-07] 新增Outbox Pattern // [2026-01-07] 新增Outbox Pattern
OutboxRepository, OutboxRepository,
OutboxPublisherService, OutboxPublisherService,
@ -54,6 +58,8 @@ import { PrismaService } from '../persistence/prisma/prisma.service';
DepositEventConsumerService, DepositEventConsumerService,
PlantingEventConsumerService, PlantingEventConsumerService,
WithdrawalEventConsumerService, WithdrawalEventConsumerService,
// [2026-01-08] 新增:导出 Identity 事件消费者
IdentityEventConsumerService,
ClientsModule, ClientsModule,
// [2026-01-07] 新增:导出 Outbox 相关服务 // [2026-01-07] 新增:导出 Outbox 相关服务
OutboxRepository, OutboxRepository,