diff --git a/backend/services/wallet-service/src/api/api.module.ts b/backend/services/wallet-service/src/api/api.module.ts index e63d3903..5a1103cb 100644 --- a/backend/services/wallet-service/src/api/api.module.ts +++ b/backend/services/wallet-service/src/api/api.module.ts @@ -12,7 +12,7 @@ import { import { InternalWalletController } from './controllers/internal-wallet.controller'; import { FiatWithdrawalController } from './controllers/fiat-withdrawal.controller'; import { WalletApplicationService, FiatWithdrawalApplicationService, SystemWithdrawalApplicationService } from '@/application/services'; -import { DepositConfirmedHandler, PlantingCreatedHandler } from '@/application/event-handlers'; +import { DepositConfirmedHandler, PlantingCreatedHandler, UserAccountCreatedHandler } from '@/application/event-handlers'; import { WithdrawalStatusHandler } from '@/application/event-handlers/withdrawal-status.handler'; import { SystemWithdrawalStatusHandler } from '@/application/event-handlers/system-withdrawal-status.handler'; import { ExpiredRewardsScheduler } from '@/application/schedulers'; @@ -44,6 +44,8 @@ import { JwtStrategy } from '@/shared/strategies/jwt.strategy'; SystemWithdrawalApplicationService, DepositConfirmedHandler, PlantingCreatedHandler, + // [2026-01-08] 新增:用户注册时创建钱包的事件处理器 + UserAccountCreatedHandler, WithdrawalStatusHandler, SystemWithdrawalStatusHandler, ExpiredRewardsScheduler, diff --git a/backend/services/wallet-service/src/api/controllers/internal-wallet.controller.ts b/backend/services/wallet-service/src/api/controllers/internal-wallet.controller.ts index cf99d8c4..f4327056 100644 --- a/backend/services/wallet-service/src/api/controllers/internal-wallet.controller.ts +++ b/backend/services/wallet-service/src/api/controllers/internal-wallet.controller.ts @@ -494,6 +494,45 @@ export class InternalWalletController { return result; } + // =============== 钱包预创建 API =============== + // [2026-01-08] 新增:确保用户钱包存在(用于用户注册时或内部转账前) + // 回滚方式:删除以下 API 方法即可 + + @Post('ensure-wallet') + @Public() + @ApiOperation({ summary: '确保用户钱包存在(内部API) - 幂等操作' }) + @ApiResponse({ status: 200, description: '钱包信息' }) + async ensureWalletExists( + @Body() dto: { accountSequence: string; userId: string }, + ) { + this.logger.log(`========== ensure-wallet 请求 ==========`); + this.logger.log(`请求参数: ${JSON.stringify(dto)}`); + + const result = await this.walletService.ensureWalletExists({ + accountSequence: dto.accountSequence, + userId: dto.userId, + }); + + this.logger.log(`确保钱包结果: ${JSON.stringify(result)}`); + return result; + } + + @Post('ensure-wallets') + @Public() + @ApiOperation({ summary: '批量确保用户钱包存在(内部API) - 幂等操作' }) + @ApiResponse({ status: 200, description: '钱包信息列表' }) + async ensureWalletsExist( + @Body() dto: { wallets: Array<{ accountSequence: string; userId: string }> }, + ) { + this.logger.log(`========== ensure-wallets 请求 ==========`); + this.logger.log(`请求参数: ${dto.wallets.length} 个钱包`); + + const results = await this.walletService.ensureWalletsExist(dto.wallets); + + this.logger.log(`批量确保钱包结果: ${results.length} 个已处理`); + return { results }; + } + // [2026-01-06] 新增:手续费归集账户统计 API // 用于系统账户报表中的"手续费账户汇总" Tab // 回滚方式:删除以下 API 方法即可 diff --git a/backend/services/wallet-service/src/application/event-handlers/index.ts b/backend/services/wallet-service/src/application/event-handlers/index.ts index 1bafeb97..250bc722 100644 --- a/backend/services/wallet-service/src/application/event-handlers/index.ts +++ b/backend/services/wallet-service/src/application/event-handlers/index.ts @@ -1,2 +1,3 @@ export * from './deposit-confirmed.handler'; export * from './planting-created.handler'; +export * from './user-account-created.handler'; diff --git a/backend/services/wallet-service/src/application/event-handlers/user-account-created.handler.ts b/backend/services/wallet-service/src/application/event-handlers/user-account-created.handler.ts new file mode 100644 index 00000000..e7af2a14 --- /dev/null +++ b/backend/services/wallet-service/src/application/event-handlers/user-account-created.handler.ts @@ -0,0 +1,72 @@ +/** + * User Account Created Event Handler + * + * [2026-01-08] 新增:处理用户注册事件,自动创建钱包 + * 确保内部转账时接收方钱包一定存在 + * + * 回滚方式: 删除此文件 + */ + +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { WalletApplicationService } from '@/application/services'; +import { + IdentityEventConsumerService, + UserAccountCreatedPayload, +} from '@/infrastructure/kafka/identity-event-consumer.service'; + +@Injectable() +export class UserAccountCreatedHandler implements OnModuleInit { + private readonly logger = new Logger(UserAccountCreatedHandler.name); + + constructor( + private readonly identityEventConsumer: IdentityEventConsumerService, + private readonly walletService: WalletApplicationService, + ) {} + + onModuleInit() { + // 注册事件处理器 + this.identityEventConsumer.onUserAccountCreated( + this.handleUserAccountCreated.bind(this), + ); + this.logger.log(`[INIT] UserAccountCreatedHandler registered`); + } + + /** + * 处理用户账户创建事件 + * 在用户注册时自动创建钱包 + */ + private async handleUserAccountCreated( + payload: UserAccountCreatedPayload, + ): Promise { + const { userId, accountSequence } = payload; + + this.logger.log( + `[HANDLE] Processing user account created: accountSequence=${accountSequence}, userId=${userId}`, + ); + + try { + // 调用 walletService 确保钱包存在 + const result = await this.walletService.ensureWalletExists({ + accountSequence, + userId, + }); + + if (result.existed) { + this.logger.log( + `[HANDLE] Wallet already exists for ${accountSequence}, walletId=${result.walletId}`, + ); + } else { + this.logger.log( + `[HANDLE] Created new wallet for ${accountSequence}, walletId=${result.walletId}`, + ); + } + } catch (error) { + this.logger.error( + `[ERROR] Failed to create wallet for ${accountSequence}: ${error.message}`, + error.stack, + ); + // 重新抛出异常以触发 Kafka 重试 + throw error; + } + } +} diff --git a/backend/services/wallet-service/src/application/services/wallet-application.service.ts b/backend/services/wallet-service/src/application/services/wallet-application.service.ts index b8ddc693..2fd50f81 100644 --- a/backend/services/wallet-service/src/application/services/wallet-application.service.ts +++ b/backend/services/wallet-service/src/application/services/wallet-application.service.ts @@ -1575,6 +1575,23 @@ export class WalletApplicationService { this.logger.log( `Internal transfer detected: ${wallet.accountSequence} -> ${toAccountSequence}`, ); + + // [2026-01-08] 兜底保障:确保接收方钱包存在 + // 即使用户注册时已创建钱包,这里再次确认以防万一 + try { + const ensureResult = await this.ensureWalletExists({ + accountSequence: toAccountSequence, + userId: targetUser.userId, + }); + this.logger.log( + `[WITHDRAWAL] Receiver wallet ensured: ${toAccountSequence}, existed=${ensureResult.existed}, walletId=${ensureResult.walletId}`, + ); + } catch (error) { + this.logger.error( + `[WITHDRAWAL] Failed to ensure receiver wallet: ${toAccountSequence}, error: ${error.message}`, + ); + // 不抛出异常,让后续流程继续(withdrawal-status.handler 也会兜底创建) + } } // 创建提现订单 @@ -3815,4 +3832,117 @@ export class WalletApplicationService { totalPages: Math.ceil(total / pageSize), }; } + + // =============== 钱包预创建 API =============== + // [2026-01-08] 新增:用户注册时预创建钱包,确保内部转账时接收方钱包一定存在 + // 回滚方式:删除此方法即可 + + /** + * 确保用户钱包存在(幂等操作) + * + * 此方法用于: + * 1. 用户注册时预创建钱包(主要保障) + * 2. 内部转账前预校验并创建接收方钱包(兜底保障) + * + * 使用 upsert 确保幂等性,避免并发创建冲突 + * + * @param accountSequence 账户序列号(跨服务唯一标识) + * @param userId 用户ID + * @returns 钱包是否已存在(existed)和当前钱包ID + */ + async ensureWalletExists(params: { + accountSequence: string; + userId: string; + }): Promise<{ + existed: boolean; + walletId: string; + accountSequence: string; + }> { + const { accountSequence, userId } = params; + const userIdBigint = BigInt(userId); + + this.logger.log(`[ensureWalletExists] 确保钱包存在: accountSequence=${accountSequence}, userId=${userId}`); + + // 先检查是否已存在 + const existingWallet = await this.prisma.walletAccount.findUnique({ + where: { accountSequence }, + }); + + if (existingWallet) { + this.logger.log(`[ensureWalletExists] 钱包已存在: id=${existingWallet.id}, accountSequence=${accountSequence}`); + return { + existed: true, + walletId: existingWallet.id.toString(), + accountSequence, + }; + } + + // 使用 upsert 创建钱包(避免并发冲突) + const wallet = await this.prisma.walletAccount.upsert({ + where: { accountSequence }, + create: { + accountSequence, + userId: userIdBigint, + usdtAvailable: 0, + usdtFrozen: 0, + dstAvailable: 0, + dstFrozen: 0, + bnbAvailable: 0, + bnbFrozen: 0, + ogAvailable: 0, + ogFrozen: 0, + rwadAvailable: 0, + rwadFrozen: 0, + hashpower: 0, + pendingUsdt: 0, + pendingHashpower: 0, + settleableUsdt: 0, + settleableHashpower: 0, + settledTotalUsdt: 0, + settledTotalHashpower: 0, + expiredTotalUsdt: 0, + expiredTotalHashpower: 0, + status: 'ACTIVE', + hasPlanted: false, + version: 0, + }, + update: {}, // 如果已存在,不做任何更新 + }); + + // 判断是新创建还是已存在 + const wasCreated = wallet.createdAt.getTime() > Date.now() - 1000; // 1秒内创建的认为是新创建 + + this.logger.log(`[ensureWalletExists] ${wasCreated ? '新创建' : '已存在'}钱包: id=${wallet.id}, accountSequence=${accountSequence}`); + + return { + existed: !wasCreated, + walletId: wallet.id.toString(), + accountSequence, + }; + } + + /** + * 批量确保钱包存在(用于批量操作场景) + */ + async ensureWalletsExist(params: Array<{ + accountSequence: string; + userId: string; + }>): Promise> { + const results: Array<{ + existed: boolean; + walletId: string; + accountSequence: string; + }> = []; + + for (const param of params) { + const result = await this.ensureWalletExists(param); + results.push(result); + } + + return results; + } } diff --git a/backend/services/wallet-service/src/infrastructure/kafka/identity-event-consumer.service.ts b/backend/services/wallet-service/src/infrastructure/kafka/identity-event-consumer.service.ts new file mode 100644 index 00000000..d0063371 --- /dev/null +++ b/backend/services/wallet-service/src/infrastructure/kafka/identity-event-consumer.service.ts @@ -0,0 +1,155 @@ +/** + * Identity Event Consumer Service for Wallet Service + * + * [2026-01-08] 新增:监听 identity-service 的用户注册事件 + * 在用户注册时自动创建钱包,确保内部转账时接收方钱包一定存在 + * + * 消费的事件: + * - identity.UserAccountCreated: 用户账户创建事件 + * - identity.UserAccountAutoCreated: 自动创建账户事件 + * + * 回滚方式: 删除此文件,并从 kafka.module.ts 中移除引用 + */ + +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { Kafka, Consumer, logLevel, EachMessagePayload } from 'kafkajs'; + +// identity-service 发布的用户事件 Topic +export const IDENTITY_TOPICS = { + USER_ACCOUNT_CREATED: 'identity.UserAccountCreated', + USER_ACCOUNT_AUTO_CREATED: 'identity.UserAccountAutoCreated', +} as const; + +// 用户账户创建事件的 payload 结构 +export interface UserAccountCreatedPayload { + userId: string; + accountSequence: string; + phoneNumber?: string; + nickname?: string; + referralCode?: string; + inviterUserId?: string; + createdAt: string; +} + +export type UserAccountCreatedHandler = (payload: UserAccountCreatedPayload) => Promise; + +@Injectable() +export class IdentityEventConsumerService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(IdentityEventConsumerService.name); + private kafka: Kafka; + private consumer: Consumer; + private isConnected = false; + + private userAccountCreatedHandler?: UserAccountCreatedHandler; + + constructor(private readonly configService: ConfigService) {} + + async onModuleInit() { + const brokers = this.configService.get('KAFKA_BROKERS')?.split(',') || ['localhost:9092']; + const clientId = this.configService.get('KAFKA_CLIENT_ID') || 'wallet-service'; + const groupId = 'wallet-service-identity-events'; + + this.logger.log(`[INIT] Identity Event Consumer initializing...`); + this.logger.log(`[INIT] ClientId: ${clientId}`); + this.logger.log(`[INIT] GroupId: ${groupId}`); + this.logger.log(`[INIT] Brokers: ${brokers.join(', ')}`); + this.logger.log(`[INIT] Topics: ${Object.values(IDENTITY_TOPICS).join(', ')}`); + + // 企业级重试配置 + this.kafka = new Kafka({ + clientId, + brokers, + logLevel: logLevel.WARN, + retry: { + initialRetryTime: 1000, // 1 秒 + maxRetryTime: 300000, // 最大 5 分钟 + retries: 15, // 最多 15 次 + multiplier: 2, // 指数退避因子 + restartOnFailure: async () => true, + }, + }); + + this.consumer = this.kafka.consumer({ + groupId, + sessionTimeout: 30000, + heartbeatInterval: 3000, + }); + + try { + this.logger.log(`[CONNECT] Connecting Identity Event consumer...`); + await this.consumer.connect(); + this.isConnected = true; + this.logger.log(`[CONNECT] Identity Event consumer connected successfully`); + + await this.consumer.subscribe({ + topics: Object.values(IDENTITY_TOPICS), + fromBeginning: false, + }); + this.logger.log(`[SUBSCRIBE] Subscribed to identity topics`); + + await this.startConsuming(); + } catch (error) { + this.logger.error(`[ERROR] Failed to connect Identity Event consumer`, error); + } + } + + async onModuleDestroy() { + if (this.isConnected) { + await this.consumer.disconnect(); + this.logger.log('Identity Event consumer disconnected'); + } + } + + /** + * 注册用户账户创建事件处理器 + */ + onUserAccountCreated(handler: UserAccountCreatedHandler): void { + this.userAccountCreatedHandler = handler; + this.logger.log('[HANDLER] User account created handler registered'); + } + + private async startConsuming(): Promise { + this.logger.log(`[CONSUME] Starting to consume identity events...`); + + await this.consumer.run({ + eachMessage: async (messagePayload: EachMessagePayload) => { + const { topic, partition, message } = messagePayload; + + try { + if (!message.value) { + this.logger.warn(`[SKIP] Empty message from ${topic}`); + return; + } + + const rawValue = message.value.toString(); + this.logger.debug(`[RAW] Topic: ${topic}, Partition: ${partition}, Value: ${rawValue}`); + + const eventMessage = JSON.parse(rawValue); + const payload = eventMessage.payload || eventMessage; + + this.logger.log(`[RECEIVED] Event from ${topic}: userId=${payload.userId}, accountSequence=${payload.accountSequence}`); + + // 处理用户账户创建事件 + if ( + topic === IDENTITY_TOPICS.USER_ACCOUNT_CREATED || + topic === IDENTITY_TOPICS.USER_ACCOUNT_AUTO_CREATED + ) { + if (this.userAccountCreatedHandler) { + await this.userAccountCreatedHandler(payload as UserAccountCreatedPayload); + this.logger.log(`[SUCCESS] Processed user account created event: ${payload.accountSequence}`); + } else { + this.logger.warn(`[SKIP] No handler registered for user account created event`); + } + } else { + this.logger.warn(`[SKIP] Unknown topic: ${topic}`); + } + } catch (error) { + this.logger.error(`[ERROR] Error processing identity event from ${topic}`, error); + // 重新抛出异常以触发 Kafka 重试机制 + throw error; + } + }, + }); + } +} diff --git a/backend/services/wallet-service/src/infrastructure/kafka/index.ts b/backend/services/wallet-service/src/infrastructure/kafka/index.ts index d09e5652..7190d8d3 100644 --- a/backend/services/wallet-service/src/infrastructure/kafka/index.ts +++ b/backend/services/wallet-service/src/infrastructure/kafka/index.ts @@ -2,3 +2,4 @@ export * from './kafka.module'; export * from './event-publisher.service'; export * from './deposit-event-consumer.service'; export * from './planting-event-consumer.service'; +export * from './identity-event-consumer.service'; diff --git a/backend/services/wallet-service/src/infrastructure/kafka/kafka.module.ts b/backend/services/wallet-service/src/infrastructure/kafka/kafka.module.ts index 964e163e..3c2d3a1e 100644 --- a/backend/services/wallet-service/src/infrastructure/kafka/kafka.module.ts +++ b/backend/services/wallet-service/src/infrastructure/kafka/kafka.module.ts @@ -5,6 +5,8 @@ import { EventPublisherService } from './event-publisher.service'; import { DepositEventConsumerService } from './deposit-event-consumer.service'; import { PlantingEventConsumerService } from './planting-event-consumer.service'; import { WithdrawalEventConsumerService } from './withdrawal-event-consumer.service'; +// [2026-01-08] 新增:Identity 事件消费者 - 用户注册时创建钱包 +import { IdentityEventConsumerService } from './identity-event-consumer.service'; // [2026-01-07] 新增:Outbox Pattern 实现 import { OutboxPublisherService } from './outbox-publisher.service'; import { OutboxRepository } from '../persistence/repositories/outbox.repository'; @@ -45,6 +47,8 @@ import { PrismaService } from '../persistence/prisma/prisma.service'; DepositEventConsumerService, PlantingEventConsumerService, WithdrawalEventConsumerService, + // [2026-01-08] 新增:Identity 事件消费者 + IdentityEventConsumerService, // [2026-01-07] 新增:Outbox Pattern OutboxRepository, OutboxPublisherService, @@ -54,6 +58,8 @@ import { PrismaService } from '../persistence/prisma/prisma.service'; DepositEventConsumerService, PlantingEventConsumerService, WithdrawalEventConsumerService, + // [2026-01-08] 新增:导出 Identity 事件消费者 + IdentityEventConsumerService, ClientsModule, // [2026-01-07] 新增:导出 Outbox 相关服务 OutboxRepository,