From 2e815cec6eee73508ed4025071e79ca78794b774 Mon Sep 17 00:00:00 2001 From: hailin Date: Sat, 6 Dec 2025 21:08:21 -0800 Subject: [PATCH] feat: move address derivation from identity-service to blockchain-service MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add Cosmos address derivation (bech32) to blockchain-service - KAVA: kava1... format - DST: dst1... format - BSC: 0x... EVM format - Create MpcEventConsumerService in blockchain-service to consume mpc.KeygenCompleted events - Create BlockchainEventConsumerService in identity-service to consume blockchain.WalletAddressCreated events - Simplify identity-service MpcKeygenCompletedHandler to only manage status updates - Add CosmosAddress value object for Cosmos chain addresses Event flow: 1. identity-service -> mpc.KeygenRequested 2. mpc-service -> mpc.KeygenCompleted (with publicKey) 3. blockchain-service consumes mpc.KeygenCompleted, derives addresses 4. blockchain-service -> blockchain.WalletAddressCreated (with all chain addresses) 5. identity-service consumes blockchain.WalletAddressCreated, saves to user account 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../mpc-keygen-completed.handler.ts | 60 ++++-- .../services/address-derivation.service.ts | 77 +++++--- .../src/domain/enums/chain-type.enum.ts | 1 + .../domain/value-objects/cosmos-address.vo.ts | 50 +++++ .../src/domain/value-objects/index.ts | 1 + .../blockchain/address-derivation.adapter.ts | 60 +++++- .../infrastructure/infrastructure.module.ts | 4 +- .../src/infrastructure/kafka/index.ts | 1 + .../kafka/mpc-event-consumer.service.ts | 186 ++++++++++++++++++ .../controllers/user-account.controller.ts | 15 +- .../identity-service/src/api/dto/index.ts | 57 ++++-- .../dto/request/auto-create-account.dto.ts | 41 ++-- .../src/application/application.module.ts | 3 + .../auto-create-account.command.ts | 6 +- .../auto-create-account.handler.ts | 84 ++++---- .../src/application/commands/index.ts | 40 +++- .../blockchain-wallet.handler.ts | 144 ++++++++++++++ .../src/application/event-handlers/index.ts | 1 + .../mpc-keygen-completed.handler.ts | 178 ++++------------- .../services/user-application.service.ts | 162 +++++++++------ .../user-account/user-account.aggregate.ts | 25 ++- .../src/domain/value-objects/index.ts | 39 ++++ .../blockchain-event-consumer.service.ts | 144 ++++++++++++++ .../src/infrastructure/kafka/index.ts | 1 + .../src/infrastructure/kafka/kafka.module.ts | 3 + .../entities/user-account.entity.ts | 10 + .../mappers/user-account.mapper.ts | 24 ++- .../user-account.repository.impl.ts | 32 ++- 28 files changed, 1096 insertions(+), 353 deletions(-) create mode 100644 backend/services/blockchain-service/src/domain/value-objects/cosmos-address.vo.ts create mode 100644 backend/services/blockchain-service/src/infrastructure/kafka/mpc-event-consumer.service.ts create mode 100644 backend/services/identity-service/src/application/event-handlers/blockchain-wallet.handler.ts create mode 100644 backend/services/identity-service/src/infrastructure/kafka/blockchain-event-consumer.service.ts diff --git a/backend/services/blockchain-service/src/application/event-handlers/mpc-keygen-completed.handler.ts b/backend/services/blockchain-service/src/application/event-handlers/mpc-keygen-completed.handler.ts index 1b893f7e..7eb20ac4 100644 --- a/backend/services/blockchain-service/src/application/event-handlers/mpc-keygen-completed.handler.ts +++ b/backend/services/blockchain-service/src/application/event-handlers/mpc-keygen-completed.handler.ts @@ -1,37 +1,65 @@ -import { Injectable, Logger } from '@nestjs/common'; +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; import { AddressDerivationService } from '../services/address-derivation.service'; - -export interface MpcKeygenCompletedPayload { - userId: string; - deviceId: string; - publicKey: string; - keyType: string; -} +import { MpcEventConsumerService, KeygenCompletedPayload } from '@/infrastructure/kafka/mpc-event-consumer.service'; /** * MPC 密钥生成完成事件处理器 + * + * 监听 mpc.KeygenCompleted 事件,从公钥派生多链钱包地址, + * 并发布 blockchain.WalletAddressCreated 事件通知 identity-service */ @Injectable() -export class MpcKeygenCompletedHandler { +export class MpcKeygenCompletedHandler implements OnModuleInit { private readonly logger = new Logger(MpcKeygenCompletedHandler.name); - constructor(private readonly addressDerivationService: AddressDerivationService) {} + constructor( + private readonly addressDerivationService: AddressDerivationService, + private readonly mpcEventConsumer: MpcEventConsumerService, + ) {} + + onModuleInit() { + // Register handler for keygen completed events + this.mpcEventConsumer.onKeygenCompleted(this.handleKeygenCompleted.bind(this)); + this.logger.log(`[INIT] MpcKeygenCompletedHandler registered with MpcEventConsumer`); + } /** * 处理 MPC 密钥生成完成事件 + * 从 mpc-service 的 KeygenCompleted 事件中提取 publicKey 和 userId */ - async handle(payload: MpcKeygenCompletedPayload): Promise { - this.logger.log(`Handling MPC keygen completed for user: ${payload.userId}`); + private async handleKeygenCompleted(payload: KeygenCompletedPayload): Promise { + this.logger.log(`[HANDLE] Received KeygenCompleted event`); + this.logger.log(`[HANDLE] sessionId: ${payload.sessionId}`); + this.logger.log(`[HANDLE] publicKey: ${payload.publicKey?.substring(0, 30)}...`); + this.logger.log(`[HANDLE] extraPayload: ${JSON.stringify(payload.extraPayload)}`); + + // Extract userId from extraPayload + const userId = payload.extraPayload?.userId; + if (!userId) { + this.logger.error(`[ERROR] Missing userId in extraPayload, cannot derive addresses`); + return; + } + + const publicKey = payload.publicKey; + if (!publicKey) { + this.logger.error(`[ERROR] Missing publicKey in payload, cannot derive addresses`); + return; + } try { + this.logger.log(`[DERIVE] Starting address derivation for user: ${userId}`); + const result = await this.addressDerivationService.deriveAndRegister( - BigInt(payload.userId), - payload.publicKey, + BigInt(userId), + publicKey, ); - this.logger.log(`Derived ${result.addresses.length} addresses for user ${payload.userId}`); + this.logger.log(`[DERIVE] Successfully derived ${result.addresses.length} addresses for user ${userId}`); + result.addresses.forEach((addr) => { + this.logger.log(`[DERIVE] - ${addr.chainType}: ${addr.address}`); + }); } catch (error) { - this.logger.error(`Failed to derive addresses for user ${payload.userId}:`, error); + this.logger.error(`[ERROR] Failed to derive addresses for user ${userId}:`, error); throw error; } } diff --git a/backend/services/blockchain-service/src/application/services/address-derivation.service.ts b/backend/services/blockchain-service/src/application/services/address-derivation.service.ts index 3e476791..4d59e9db 100644 --- a/backend/services/blockchain-service/src/application/services/address-derivation.service.ts +++ b/backend/services/blockchain-service/src/application/services/address-derivation.service.ts @@ -12,6 +12,7 @@ import { import { MonitoredAddress } from '@/domain/aggregates/monitored-address'; import { WalletAddressCreatedEvent } from '@/domain/events'; import { ChainType, EvmAddress } from '@/domain/value-objects'; +import { ChainTypeEnum } from '@/domain/enums'; export interface DeriveAddressResult { userId: bigint; @@ -22,11 +23,23 @@ export interface DeriveAddressResult { /** * 地址派生服务 * 处理从 MPC 公钥派生钱包地址的业务逻辑 + * + * 派生策略: + * - KAVA: Cosmos bech32 格式 (kava1...) + * - DST: Cosmos bech32 格式 (dst1...) + * - BSC: EVM 格式 (0x...) + * + * 监控策略: + * - 只有 EVM 链 (BSC) 的地址会被注册到监控列表用于充值检测 + * - Cosmos 链 (KAVA, DST) 需要不同的监控机制 */ @Injectable() export class AddressDerivationService { private readonly logger = new Logger(AddressDerivationService.name); + // EVM 链类型列表,用于判断是否需要注册监控 + private readonly evmChains = new Set([ChainTypeEnum.BSC]); + constructor( private readonly addressDerivation: AddressDerivationAdapter, private readonly addressCache: AddressCacheService, @@ -39,38 +52,23 @@ export class AddressDerivationService { * 从公钥派生地址并注册监控 */ async deriveAndRegister(userId: bigint, publicKey: string): Promise { - this.logger.log(`Deriving addresses for user ${userId} from public key`); + this.logger.log(`[DERIVE] Starting address derivation for user ${userId}`); + this.logger.log(`[DERIVE] Public key: ${publicKey.substring(0, 30)}...`); - // 1. 派生所有链的地址 + // 1. 派生所有链的地址 (包括 Cosmos 和 EVM) const derivedAddresses = this.addressDerivation.deriveAllAddresses(publicKey); + this.logger.log(`[DERIVE] Derived ${derivedAddresses.length} addresses`); - // 2. 为每个链注册监控地址 + // 2. 只为 EVM 链注册监控地址 (用于充值检测) for (const derived of derivedAddresses) { - const chainType = ChainType.fromEnum(derived.chainType); - const address = EvmAddress.create(derived.address); - - // 检查是否已存在 - const exists = await this.monitoredAddressRepo.existsByChainAndAddress(chainType, address); - if (!exists) { - // 创建监控地址 - const monitored = MonitoredAddress.create({ - chainType, - address, - userId, - }); - - await this.monitoredAddressRepo.save(monitored); - - // 添加到缓存 - await this.addressCache.addAddress(chainType, address.lowercase); - - this.logger.log(`Registered address: ${derived.chainType} - ${derived.address}`); + if (this.evmChains.has(derived.chainType)) { + await this.registerEvmAddressForMonitoring(userId, derived); } else { - this.logger.debug(`Address already registered: ${derived.chainType} - ${derived.address}`); + this.logger.log(`[DERIVE] Skipping monitoring registration for Cosmos chain: ${derived.chainType} - ${derived.address}`); } } - // 3. 发布钱包地址创建事件 + // 3. 发布钱包地址创建事件 (包含所有链的地址) const event = new WalletAddressCreatedEvent({ userId: userId.toString(), publicKey, @@ -80,7 +78,10 @@ export class AddressDerivationService { })), }); + this.logger.log(`[PUBLISH] Publishing WalletAddressCreated event for user ${userId}`); + this.logger.log(`[PUBLISH] Addresses: ${JSON.stringify(derivedAddresses)}`); await this.eventPublisher.publish(event); + this.logger.log(`[PUBLISH] WalletAddressCreated event published successfully`); return { userId, @@ -89,6 +90,34 @@ export class AddressDerivationService { }; } + /** + * 注册 EVM 地址用于充值监控 + */ + private async registerEvmAddressForMonitoring(userId: bigint, derived: DerivedAddress): Promise { + const chainType = ChainType.fromEnum(derived.chainType); + const address = EvmAddress.create(derived.address); + + // 检查是否已存在 + const exists = await this.monitoredAddressRepo.existsByChainAndAddress(chainType, address); + if (!exists) { + // 创建监控地址 + const monitored = MonitoredAddress.create({ + chainType, + address, + userId, + }); + + await this.monitoredAddressRepo.save(monitored); + + // 添加到缓存 + await this.addressCache.addAddress(chainType, address.lowercase); + + this.logger.log(`[MONITOR] Registered EVM address for monitoring: ${derived.chainType} - ${derived.address}`); + } else { + this.logger.debug(`[MONITOR] Address already registered: ${derived.chainType} - ${derived.address}`); + } + } + /** * 获取用户的所有地址 */ diff --git a/backend/services/blockchain-service/src/domain/enums/chain-type.enum.ts b/backend/services/blockchain-service/src/domain/enums/chain-type.enum.ts index d24ff8cc..13a9c4ef 100644 --- a/backend/services/blockchain-service/src/domain/enums/chain-type.enum.ts +++ b/backend/services/blockchain-service/src/domain/enums/chain-type.enum.ts @@ -3,5 +3,6 @@ */ export enum ChainTypeEnum { KAVA = 'KAVA', + DST = 'DST', BSC = 'BSC', } diff --git a/backend/services/blockchain-service/src/domain/value-objects/cosmos-address.vo.ts b/backend/services/blockchain-service/src/domain/value-objects/cosmos-address.vo.ts new file mode 100644 index 00000000..80ffb06f --- /dev/null +++ b/backend/services/blockchain-service/src/domain/value-objects/cosmos-address.vo.ts @@ -0,0 +1,50 @@ +import { bech32 } from 'bech32'; + +/** + * Cosmos 地址值对象 (bech32 格式) + * 支持 kava1..., dst1... 等地址格式 + */ +export class CosmosAddress { + private readonly _value: string; + private readonly _prefix: string; + + private constructor(value: string, prefix: string) { + this._value = value; + this._prefix = prefix; + } + + static create(value: string): CosmosAddress { + try { + const decoded = bech32.decode(value); + return new CosmosAddress(value, decoded.prefix); + } catch { + throw new Error(`Invalid Cosmos address: ${value}`); + } + } + + static fromPrefixAndHash(prefix: string, hash20Bytes: Uint8Array): CosmosAddress { + const words = bech32.toWords(Buffer.from(hash20Bytes)); + const address = bech32.encode(prefix, words); + return new CosmosAddress(address, prefix); + } + + get value(): string { + return this._value; + } + + get prefix(): string { + return this._prefix; + } + + get lowercase(): string { + return this._value.toLowerCase(); + } + + equals(other: CosmosAddress): boolean { + return this._value.toLowerCase() === other._value.toLowerCase(); + } + + toString(): string { + return this._value; + } +} diff --git a/backend/services/blockchain-service/src/domain/value-objects/index.ts b/backend/services/blockchain-service/src/domain/value-objects/index.ts index 26f2d72e..ac31c374 100644 --- a/backend/services/blockchain-service/src/domain/value-objects/index.ts +++ b/backend/services/blockchain-service/src/domain/value-objects/index.ts @@ -1,5 +1,6 @@ export * from './chain-type.vo'; export * from './evm-address.vo'; +export * from './cosmos-address.vo'; export * from './tx-hash.vo'; export * from './token-amount.vo'; export * from './block-number.vo'; diff --git a/backend/services/blockchain-service/src/infrastructure/blockchain/address-derivation.adapter.ts b/backend/services/blockchain-service/src/infrastructure/blockchain/address-derivation.adapter.ts index c48748ad..8eb01c2a 100644 --- a/backend/services/blockchain-service/src/infrastructure/blockchain/address-derivation.adapter.ts +++ b/backend/services/blockchain-service/src/infrastructure/blockchain/address-derivation.adapter.ts @@ -1,5 +1,6 @@ import { Injectable, Logger } from '@nestjs/common'; -import { keccak256, getBytes } from 'ethers'; +import { keccak256, getBytes, sha256, ripemd160 } from 'ethers'; +import { bech32 } from 'bech32'; import { EvmAddress } from '@/domain/value-objects'; import { ChainTypeEnum } from '@/domain/enums'; @@ -99,28 +100,77 @@ export class AddressDerivationAdapter { return result; } + /** + * 从压缩公钥派生 Cosmos 地址 (bech32 格式) + * + * @param compressedPublicKey 压缩格式的公钥 (33 bytes, 0x02/0x03 开头) + * @param prefix bech32 地址前缀 (如 'kava', 'dst') + * @returns bech32 格式的地址 + */ + deriveCosmosAddress(compressedPublicKey: string, prefix: string): string { + // 移除 0x 前缀 + const pubKeyHex = compressedPublicKey.replace('0x', ''); + + // 验证压缩公钥格式 + if (pubKeyHex.length !== 66) { + throw new Error(`Invalid compressed public key length: ${pubKeyHex.length}, expected 66`); + } + + // SHA256 哈希 + const pubKeyBytes = getBytes('0x' + pubKeyHex); + const sha256Hash = sha256(pubKeyBytes); + + // RIPEMD160 哈希 (得到 20 bytes) + const ripemd160Hash = ripemd160(sha256Hash); + + // 转换为 5-bit words 用于 bech32 编码 + const hashBytes = getBytes(ripemd160Hash); + const words = bech32.toWords(Buffer.from(hashBytes)); + + // bech32 编码 + const address = bech32.encode(prefix, words); + + this.logger.debug(`Derived Cosmos address with prefix '${prefix}': ${address}`); + + return address; + } + /** * 从公钥派生所有支持链的地址 */ deriveAllAddresses(compressedPublicKey: string): DerivedAddress[] { const addresses: DerivedAddress[] = []; + this.logger.log(`[DERIVE] Starting address derivation for public key: ${compressedPublicKey.slice(0, 20)}...`); + // EVM 链共用同一个地址 const evmAddress = this.deriveEvmAddress(compressedPublicKey); + this.logger.log(`[DERIVE] EVM address derived: ${evmAddress}`); - // KAVA (EVM) + // KAVA (Cosmos bech32 格式 - kava1...) + const kavaAddress = this.deriveCosmosAddress(compressedPublicKey, 'kava'); addresses.push({ chainType: ChainTypeEnum.KAVA, - address: evmAddress, + address: kavaAddress, }); + this.logger.log(`[DERIVE] KAVA address (Cosmos): ${kavaAddress}`); - // BSC (EVM) + // DST (Cosmos bech32 格式 - dst1...) + const dstAddress = this.deriveCosmosAddress(compressedPublicKey, 'dst'); + addresses.push({ + chainType: ChainTypeEnum.DST, + address: dstAddress, + }); + this.logger.log(`[DERIVE] DST address (Cosmos): ${dstAddress}`); + + // BSC (EVM 格式 - 0x...) addresses.push({ chainType: ChainTypeEnum.BSC, address: evmAddress, }); + this.logger.log(`[DERIVE] BSC address (EVM): ${evmAddress}`); - this.logger.log(`Derived addresses from public key: ${addresses.length} chains`); + this.logger.log(`[DERIVE] Successfully derived ${addresses.length} addresses from public key`); return addresses; } diff --git a/backend/services/blockchain-service/src/infrastructure/infrastructure.module.ts b/backend/services/blockchain-service/src/infrastructure/infrastructure.module.ts index c99eb5ea..04c632d3 100644 --- a/backend/services/blockchain-service/src/infrastructure/infrastructure.module.ts +++ b/backend/services/blockchain-service/src/infrastructure/infrastructure.module.ts @@ -1,7 +1,7 @@ import { Global, Module } from '@nestjs/common'; import { PrismaService } from './persistence/prisma/prisma.service'; import { RedisService, AddressCacheService } from './redis'; -import { EventPublisherService } from './kafka'; +import { EventPublisherService, MpcEventConsumerService } from './kafka'; import { EvmProviderAdapter, AddressDerivationAdapter, BlockScannerService } from './blockchain'; import { DomainModule } from '@/domain/domain.module'; import { @@ -25,6 +25,7 @@ import { PrismaService, RedisService, EventPublisherService, + MpcEventConsumerService, // 区块链适配器 EvmProviderAdapter, @@ -56,6 +57,7 @@ import { PrismaService, RedisService, EventPublisherService, + MpcEventConsumerService, EvmProviderAdapter, AddressDerivationAdapter, BlockScannerService, diff --git a/backend/services/blockchain-service/src/infrastructure/kafka/index.ts b/backend/services/blockchain-service/src/infrastructure/kafka/index.ts index a19216ed..07da6b8d 100644 --- a/backend/services/blockchain-service/src/infrastructure/kafka/index.ts +++ b/backend/services/blockchain-service/src/infrastructure/kafka/index.ts @@ -1,2 +1,3 @@ export * from './event-publisher.service'; export * from './event-consumer.controller'; +export * from './mpc-event-consumer.service'; diff --git a/backend/services/blockchain-service/src/infrastructure/kafka/mpc-event-consumer.service.ts b/backend/services/blockchain-service/src/infrastructure/kafka/mpc-event-consumer.service.ts new file mode 100644 index 00000000..1d0ec019 --- /dev/null +++ b/backend/services/blockchain-service/src/infrastructure/kafka/mpc-event-consumer.service.ts @@ -0,0 +1,186 @@ +/** + * MPC Event Consumer Service for Blockchain Service + * + * Consumes MPC keygen completion events from mpc-service via Kafka. + * Derives wallet addresses from public keys and publishes WalletAddressCreated events. + */ + +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { Kafka, Consumer, logLevel, EachMessagePayload } from 'kafkajs'; + +// MPC Event Topics (events from mpc-service) +export const MPC_TOPICS = { + KEYGEN_COMPLETED: 'mpc.KeygenCompleted', + SESSION_FAILED: 'mpc.SessionFailed', +} as const; + +export interface KeygenCompletedPayload { + sessionId: string; + partyId: string; + publicKey: string; + shareId: string; + threshold: string; + extraPayload?: { + userId: string; + username: string; + delegateShare?: { + partyId: string; + partyIndex: number; + encryptedShare: string; + }; + serverParties?: string[]; + }; +} + +export interface SessionFailedPayload { + sessionId: string; + partyId: string; + sessionType: string; + errorMessage: string; + errorCode?: string; + extraPayload?: { + userId: string; + username: string; + }; +} + +export type MpcEventHandler = (payload: T) => Promise; + +@Injectable() +export class MpcEventConsumerService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(MpcEventConsumerService.name); + private kafka: Kafka; + private consumer: Consumer; + private isConnected = false; + + private keygenCompletedHandler?: MpcEventHandler; + private sessionFailedHandler?: MpcEventHandler; + + constructor(private readonly configService: ConfigService) {} + + async onModuleInit() { + const brokers = this.configService.get('KAFKA_BROKERS')?.split(',') || ['localhost:9092']; + const clientId = this.configService.get('KAFKA_CLIENT_ID') || 'blockchain-service'; + const groupId = 'blockchain-service-mpc-events'; + + this.logger.log(`[INIT] MPC Event Consumer for blockchain-service initializing...`); + this.logger.log(`[INIT] ClientId: ${clientId}`); + this.logger.log(`[INIT] GroupId: ${groupId}`); + this.logger.log(`[INIT] Brokers: ${brokers.join(', ')}`); + this.logger.log(`[INIT] Topics to subscribe: ${Object.values(MPC_TOPICS).join(', ')}`); + + this.kafka = new Kafka({ + clientId, + brokers, + logLevel: logLevel.WARN, + retry: { + initialRetryTime: 100, + retries: 8, + }, + }); + + this.consumer = this.kafka.consumer({ + groupId, + sessionTimeout: 30000, + heartbeatInterval: 3000, + }); + + try { + this.logger.log(`[CONNECT] Connecting MPC Event consumer...`); + await this.consumer.connect(); + this.isConnected = true; + this.logger.log(`[CONNECT] MPC Event Kafka consumer connected successfully`); + + // Subscribe to MPC topics + await this.consumer.subscribe({ topics: Object.values(MPC_TOPICS), fromBeginning: false }); + this.logger.log(`[SUBSCRIBE] Subscribed to MPC topics: ${Object.values(MPC_TOPICS).join(', ')}`); + + // Start consuming + await this.startConsuming(); + } catch (error) { + this.logger.error(`[ERROR] Failed to connect MPC Event Kafka consumer`, error); + } + } + + async onModuleDestroy() { + if (this.isConnected) { + await this.consumer.disconnect(); + this.logger.log('MPC Event Kafka consumer disconnected'); + } + } + + /** + * Register handler for keygen completed events + */ + onKeygenCompleted(handler: MpcEventHandler): void { + this.keygenCompletedHandler = handler; + this.logger.log(`[REGISTER] KeygenCompleted handler registered`); + } + + /** + * Register handler for session failed events + */ + onSessionFailed(handler: MpcEventHandler): void { + this.sessionFailedHandler = handler; + this.logger.log(`[REGISTER] SessionFailed handler registered`); + } + + private async startConsuming(): Promise { + await this.consumer.run({ + eachMessage: async ({ topic, partition, message }: EachMessagePayload) => { + const offset = message.offset; + this.logger.log(`[RECEIVE] Message received: topic=${topic}, partition=${partition}, offset=${offset}`); + + try { + const value = message.value?.toString(); + if (!value) { + this.logger.warn(`[RECEIVE] Empty message received on ${topic}`); + return; + } + + this.logger.log(`[RECEIVE] Raw message value: ${value.substring(0, 500)}...`); + + const parsed = JSON.parse(value); + const payload = parsed.payload || parsed; + + this.logger.log(`[RECEIVE] Parsed event: eventType=${parsed.eventType || 'unknown'}`); + this.logger.log(`[RECEIVE] Payload keys: ${Object.keys(payload).join(', ')}`); + + switch (topic) { + case MPC_TOPICS.KEYGEN_COMPLETED: + this.logger.log(`[HANDLE] Processing KeygenCompleted event for blockchain-service`); + this.logger.log(`[HANDLE] publicKey: ${(payload as KeygenCompletedPayload).publicKey?.substring(0, 20)}...`); + this.logger.log(`[HANDLE] extraPayload.userId: ${(payload as KeygenCompletedPayload).extraPayload?.userId}`); + if (this.keygenCompletedHandler) { + await this.keygenCompletedHandler(payload as KeygenCompletedPayload); + this.logger.log(`[HANDLE] KeygenCompleted handler completed successfully`); + } else { + this.logger.warn(`[HANDLE] No handler registered for KeygenCompleted`); + } + break; + + case MPC_TOPICS.SESSION_FAILED: + this.logger.log(`[HANDLE] Processing SessionFailed event`); + this.logger.log(`[HANDLE] sessionType: ${(payload as SessionFailedPayload).sessionType}`); + this.logger.log(`[HANDLE] errorMessage: ${(payload as SessionFailedPayload).errorMessage}`); + if (this.sessionFailedHandler) { + await this.sessionFailedHandler(payload as SessionFailedPayload); + this.logger.log(`[HANDLE] SessionFailed handler completed`); + } else { + this.logger.warn(`[HANDLE] No handler registered for SessionFailed`); + } + break; + + default: + this.logger.warn(`[RECEIVE] Unknown MPC topic: ${topic}`); + } + } catch (error) { + this.logger.error(`[ERROR] Error processing MPC event from ${topic}`, error); + } + }, + }); + + this.logger.log(`[START] Started consuming MPC events for address derivation`); + } +} diff --git a/backend/services/identity-service/src/api/controllers/user-account.controller.ts b/backend/services/identity-service/src/api/controllers/user-account.controller.ts index b746b02c..cc470584 100644 --- a/backend/services/identity-service/src/api/controllers/user-account.controller.ts +++ b/backend/services/identity-service/src/api/controllers/user-account.controller.ts @@ -6,7 +6,7 @@ import { AutoCreateAccountCommand, RecoverByMnemonicCommand, RecoverByPhoneCommand, AutoLoginCommand, RegisterCommand, LoginCommand, BindPhoneNumberCommand, UpdateProfileCommand, SubmitKYCCommand, RemoveDeviceCommand, SendSmsCodeCommand, - GetMyProfileQuery, GetMyDevicesQuery, GetUserByReferralCodeQuery, + GetMyProfileQuery, GetMyDevicesQuery, GetUserByReferralCodeQuery, GetWalletStatusQuery, } from '@/application/commands'; import { AutoCreateAccountDto, RecoverByMnemonicDto, RecoverByPhoneDto, AutoLoginDto, @@ -14,6 +14,7 @@ import { BindWalletDto, SubmitKYCDto, RemoveDeviceDto, AutoCreateAccountResponseDto, RecoverAccountResponseDto, LoginResponseDto, UserProfileResponseDto, DeviceResponseDto, + WalletStatusReadyResponseDto, WalletStatusGeneratingResponseDto, } from '@/api/dto'; @ApiTags('User') @@ -30,7 +31,6 @@ export class UserAccountController { return this.userService.autoCreateAccount( new AutoCreateAccountCommand( dto.deviceId, dto.deviceName, dto.inviterReferralCode, - dto.provinceCode, dto.cityCode, ), ); } @@ -166,4 +166,15 @@ export class UserAccountController { async getByReferralCode(@Param('code') code: string) { return this.userService.getUserByReferralCode(new GetUserByReferralCodeQuery(code)); } + + @Get('wallet') + @ApiBearerAuth() + @ApiOperation({ summary: '获取我的钱包状态和地址' }) + @ApiResponse({ status: 200, description: '钱包已就绪', type: WalletStatusReadyResponseDto }) + @ApiResponse({ status: 202, description: '钱包生成中', type: WalletStatusGeneratingResponseDto }) + async getWalletStatus(@CurrentUser() user: CurrentUserData) { + return this.userService.getWalletStatus( + new GetWalletStatusQuery(user.accountSequence), + ); + } } diff --git a/backend/services/identity-service/src/api/dto/index.ts b/backend/services/identity-service/src/api/dto/index.ts index cdbe9628..99b534ba 100644 --- a/backend/services/identity-service/src/api/dto/index.ts +++ b/backend/services/identity-service/src/api/dto/index.ts @@ -122,31 +122,22 @@ export class RemoveDeviceDto { // Response DTOs export class AutoCreateAccountResponseDto { - @ApiProperty() - userId: string; + @ApiProperty({ example: 100001, description: '用户序列号 (唯一标识)' }) + userSerialNum: number; - @ApiProperty({ description: '账户序列号 (唯一标识,用于推荐和分享)' }) - accountSequence: number; - - @ApiProperty({ description: '推荐码' }) + @ApiProperty({ example: 'ABC123', description: '推荐码' }) referralCode: string; - @ApiPropertyOptional({ description: '助记词 (MPC模式下为空)' }) - mnemonic?: string; + @ApiProperty({ example: '榴莲勇士_38472', description: '随机用户名' }) + username: string; - @ApiPropertyOptional({ description: 'MPC客户端分片数据 (需安全存储,用于签名)' }) - clientShareData?: string; + @ApiProperty({ example: '...', description: '随机SVG头像' }) + avatarSvg: string; - @ApiPropertyOptional({ description: 'MPC公钥' }) - publicKey?: string; - - @ApiProperty({ description: '三链钱包地址 (BSC/KAVA/DST)' }) - walletAddresses: { kava: string; dst: string; bsc: string }; - - @ApiProperty() + @ApiProperty({ description: '访问令牌' }) accessToken: string; - @ApiProperty() + @ApiProperty({ description: '刷新令牌' }) refreshToken: string; } @@ -173,6 +164,36 @@ export class RecoverAccountResponseDto { refreshToken: string; } +// 钱包地址响应 +export class WalletAddressesDto { + @ApiProperty({ example: '0x1234...', description: 'KAVA链地址' }) + kava: string; + + @ApiProperty({ example: 'dst1...', description: 'DST链地址' }) + dst: string; + + @ApiProperty({ example: '0x5678...', description: 'BSC链地址' }) + bsc: string; +} + +// 钱包状态响应 (就绪) +export class WalletStatusReadyResponseDto { + @ApiProperty({ example: 'ready', description: '钱包状态' }) + status: 'ready'; + + @ApiProperty({ type: WalletAddressesDto, description: '三链钱包地址' }) + walletAddresses: WalletAddressesDto; + + @ApiProperty({ example: 'word1 word2 ... word12', description: '助记词 (12词)' }) + mnemonic: string; +} + +// 钱包状态响应 (生成中) +export class WalletStatusGeneratingResponseDto { + @ApiProperty({ example: 'generating', description: '钱包状态' }) + status: 'generating'; +} + export class LoginResponseDto { @ApiProperty() userId: string; diff --git a/backend/services/identity-service/src/api/dto/request/auto-create-account.dto.ts b/backend/services/identity-service/src/api/dto/request/auto-create-account.dto.ts index 94286207..98333e4a 100644 --- a/backend/services/identity-service/src/api/dto/request/auto-create-account.dto.ts +++ b/backend/services/identity-service/src/api/dto/request/auto-create-account.dto.ts @@ -1,30 +1,39 @@ -import { IsString, IsOptional, IsNotEmpty, Matches } from 'class-validator'; +import { IsString, IsOptional, IsNotEmpty, Matches, ValidateNested } from 'class-validator'; import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger'; +import { Type } from 'class-transformer'; + +export class DeviceNameDto { + @ApiPropertyOptional({ example: 'iPhone 15 Pro', description: '设备型号' }) + @IsOptional() + @IsString() + model?: string; + + @ApiPropertyOptional({ example: 'ios', description: '平台: ios, android, web' }) + @IsOptional() + @IsString() + platform?: string; + + @ApiPropertyOptional({ example: 'iOS 17.2', description: '系统版本' }) + @IsOptional() + @IsString() + osVersion?: string; +} export class AutoCreateAccountDto { - @ApiProperty({ example: '550e8400-e29b-41d4-a716-446655440000' }) + @ApiProperty({ example: '550e8400-e29b-41d4-a716-446655440000', description: '设备唯一标识' }) @IsString() @IsNotEmpty() deviceId: string; - @ApiPropertyOptional({ example: 'iPhone 15 Pro' }) + @ApiPropertyOptional({ type: DeviceNameDto, description: '设备信息' }) @IsOptional() - @IsString() - deviceName?: string; + @ValidateNested() + @Type(() => DeviceNameDto) + deviceName?: DeviceNameDto; - @ApiPropertyOptional({ example: 'ABC123' }) + @ApiPropertyOptional({ example: 'ABC123', description: '邀请人推荐码' }) @IsOptional() @IsString() @Matches(/^[A-Z0-9]{6}$/, { message: '推荐码格式错误' }) inviterReferralCode?: string; - - @ApiPropertyOptional() - @IsOptional() - @IsString() - provinceCode?: string; - - @ApiPropertyOptional() - @IsOptional() - @IsString() - cityCode?: string; } diff --git a/backend/services/identity-service/src/application/application.module.ts b/backend/services/identity-service/src/application/application.module.ts index e2ef1da3..a42644dd 100644 --- a/backend/services/identity-service/src/application/application.module.ts +++ b/backend/services/identity-service/src/application/application.module.ts @@ -9,6 +9,7 @@ import { BindPhoneHandler } from './commands/bind-phone/bind-phone.handler'; import { GetMyProfileHandler } from './queries/get-my-profile/get-my-profile.handler'; import { GetMyDevicesHandler } from './queries/get-my-devices/get-my-devices.handler'; import { MpcKeygenCompletedHandler } from './event-handlers/mpc-keygen-completed.handler'; +import { BlockchainWalletHandler } from './event-handlers/blockchain-wallet.handler'; import { DomainModule } from '@/domain/domain.module'; import { InfrastructureModule } from '@/infrastructure/infrastructure.module'; @@ -26,6 +27,8 @@ import { InfrastructureModule } from '@/infrastructure/infrastructure.module'; GetMyDevicesHandler, // MPC Event Handlers MpcKeygenCompletedHandler, + // Blockchain Event Handlers + BlockchainWalletHandler, ], exports: [ UserApplicationService, diff --git a/backend/services/identity-service/src/application/commands/auto-create-account/auto-create-account.command.ts b/backend/services/identity-service/src/application/commands/auto-create-account/auto-create-account.command.ts index 26a8a1d1..b887248b 100644 --- a/backend/services/identity-service/src/application/commands/auto-create-account/auto-create-account.command.ts +++ b/backend/services/identity-service/src/application/commands/auto-create-account/auto-create-account.command.ts @@ -1,9 +1,9 @@ +import { DeviceNameInput } from '../index'; + export class AutoCreateAccountCommand { constructor( public readonly deviceId: string, - public readonly deviceName?: string, + public readonly deviceName?: DeviceNameInput, public readonly inviterReferralCode?: string, - public readonly provinceCode?: string, - public readonly cityCode?: string, ) {} } diff --git a/backend/services/identity-service/src/application/commands/auto-create-account/auto-create-account.handler.ts b/backend/services/identity-service/src/application/commands/auto-create-account/auto-create-account.handler.ts index c7cfef06..3328173d 100644 --- a/backend/services/identity-service/src/application/commands/auto-create-account/auto-create-account.handler.ts +++ b/backend/services/identity-service/src/application/commands/auto-create-account/auto-create-account.handler.ts @@ -2,13 +2,13 @@ import { Injectable, Inject, Logger } from '@nestjs/common'; import { AutoCreateAccountCommand } from './auto-create-account.command'; import { UserAccountRepository, USER_ACCOUNT_REPOSITORY } from '@/domain/repositories/user-account.repository.interface'; import { UserAccount } from '@/domain/aggregates/user-account/user-account.aggregate'; -import { AccountSequenceGeneratorService, UserValidatorService, WalletGeneratorService } from '@/domain/services'; -import { ReferralCode, AccountSequence, ProvinceCode, CityCode, ChainType } from '@/domain/value-objects'; +import { AccountSequenceGeneratorService, UserValidatorService } from '@/domain/services'; +import { ReferralCode, AccountSequence, ProvinceCode, CityCode, HardwareInfo } from '@/domain/value-objects'; import { TokenService } from '@/application/services/token.service'; import { EventPublisherService } from '@/infrastructure/kafka/event-publisher.service'; import { ApplicationError } from '@/shared/exceptions/domain.exception'; import { AutoCreateAccountResult } from '../index'; -import { MpcShareStorageService } from '@/infrastructure/external/backup/mpc-share-storage.service'; +import { generateRandomIdentity } from '@/shared/utils'; @Injectable() export class AutoCreateAccountHandler { @@ -19,16 +19,18 @@ export class AutoCreateAccountHandler { private readonly userRepository: UserAccountRepository, private readonly sequenceGenerator: AccountSequenceGeneratorService, private readonly validatorService: UserValidatorService, - private readonly walletGenerator: WalletGeneratorService, private readonly tokenService: TokenService, private readonly eventPublisher: EventPublisherService, - private readonly mpcShareStorage: MpcShareStorageService, ) {} async execute(command: AutoCreateAccountCommand): Promise { + this.logger.log(`Creating account for device: ${command.deviceId}`); + + // 1. 验证设备ID const deviceCheck = await this.validatorService.checkDeviceNotRegistered(command.deviceId); if (!deviceCheck.isValid) throw new ApplicationError(deviceCheck.errorMessage!); + // 2. 验证邀请码 let inviterSequence: AccountSequence | null = null; if (command.inviterReferralCode) { const referralCode = ReferralCode.create(command.inviterReferralCode); @@ -38,66 +40,62 @@ export class AutoCreateAccountHandler { inviterSequence = inviter!.accountSequence; } + // 3. 生成用户序列号 const accountSequence = await this.sequenceGenerator.generateNextUserSequence(); + // 4. 生成随机用户名和头像 + const identity = generateRandomIdentity(); + + // 5. 构建设备名称和硬件信息 + let deviceNameStr = '未命名设备'; + let hardwareInfo: HardwareInfo | undefined; + if (command.deviceName) { + const parts: string[] = []; + if (command.deviceName.model) parts.push(command.deviceName.model); + if (command.deviceName.platform) parts.push(command.deviceName.platform); + if (command.deviceName.osVersion) parts.push(command.deviceName.osVersion); + if (parts.length > 0) deviceNameStr = parts.join(' '); + hardwareInfo = { + platform: command.deviceName.platform, + deviceModel: command.deviceName.model, + osVersion: command.deviceName.osVersion, + }; + } + + // 6. 创建账户 const account = UserAccount.createAutomatic({ accountSequence, initialDeviceId: command.deviceId, - deviceName: command.deviceName, + deviceName: deviceNameStr, + hardwareInfo, inviterSequence, - province: ProvinceCode.create(command.provinceCode || 'DEFAULT'), - city: CityCode.create(command.cityCode || 'DEFAULT'), + province: ProvinceCode.create('DEFAULT'), + city: CityCode.create('DEFAULT'), + nickname: identity.username, + avatarSvg: identity.avatarSvg, }); - // 使用 MPC 2-of-3 生成三链钱包 - this.logger.log(`Generating MPC wallet for user=${account.userId.toString()}`); - const mpcResult = await this.walletGenerator.generateMpcWalletSystem({ - userId: account.userId.toString(), - username: accountSequence.value.toString(), // 使用账户序列号作为用户名 - deviceId: command.deviceId, - }); - - // 将 MPC 钱包信息转换为领域实体 - const wallets = this.walletGenerator.convertToWalletEntities( - account.userId, - mpcResult.wallets, - ); - - // 保存 delegate share 到备份服务 (用于恢复) - this.logger.log(`Storing delegate share for user=${account.userId.toString()}`); - await this.mpcShareStorage.storeBackupShare({ - userId: account.userId.toString(), - shareData: mpcResult.delegateShare, - publicKey: mpcResult.publicKey, - }); - - account.bindMultipleWalletAddresses(wallets); + // 7. 保存账户 await this.userRepository.save(account); - await this.userRepository.saveWallets(account.userId, Array.from(wallets.values())); + // 8. 生成 Token const tokens = await this.tokenService.generateTokenPair({ userId: account.userId.toString(), accountSequence: account.accountSequence.value, deviceId: command.deviceId, }); + // 9. 发布领域事件 await this.eventPublisher.publishAll(account.domainEvents); account.clearDomainEvents(); - this.logger.log(`Account created successfully: userId=${account.userId.toString()}, seq=${account.accountSequence.value}`); + this.logger.log(`Account created: sequence=${accountSequence.value}, username=${identity.username}`); return { - userId: account.userId.toString(), - accountSequence: account.accountSequence.value, + userSerialNum: account.accountSequence.value, referralCode: account.referralCode.value, - mnemonic: '', // MPC 模式下不再使用助记词 - delegateShare: mpcResult.delegateShare, // delegate share (客户端需安全存储) - publicKey: mpcResult.publicKey, - walletAddresses: { - kava: wallets.get(ChainType.KAVA)!.address, - dst: wallets.get(ChainType.DST)!.address, - bsc: wallets.get(ChainType.BSC)!.address, - }, + username: account.nickname, + avatarSvg: account.avatarUrl || identity.avatarSvg, accessToken: tokens.accessToken, refreshToken: tokens.refreshToken, }; diff --git a/backend/services/identity-service/src/application/commands/index.ts b/backend/services/identity-service/src/application/commands/index.ts index fcdf75f5..09c85c28 100644 --- a/backend/services/identity-service/src/application/commands/index.ts +++ b/backend/services/identity-service/src/application/commands/index.ts @@ -1,11 +1,16 @@ +// ============ Types ============ +export interface DeviceNameInput { + model?: string; // iPhone 15 Pro, Pixel 8 + platform?: string; // ios, android, web + osVersion?: string; // iOS 17.2, Android 14 +} + // ============ Commands ============ export class AutoCreateAccountCommand { constructor( public readonly deviceId: string, - public readonly deviceName?: string, + public readonly deviceName?: DeviceNameInput, public readonly inviterReferralCode?: string, - public readonly provinceCode?: string, - public readonly cityCode?: string, ) {} } @@ -145,15 +150,30 @@ export class GenerateReferralLinkCommand { ) {} } +export class GetWalletStatusQuery { + constructor(public readonly userSerialNum: number) {} +} + // ============ Results ============ + +// 钱包状态 +export type WalletStatus = 'generating' | 'ready' | 'failed'; + +export interface WalletStatusResult { + status: WalletStatus; + walletAddresses?: { + kava: string; + dst: string; + bsc: string; + }; + mnemonic?: string; // 助记词 (ready 状态时返回) + errorMessage?: string; // 失败原因 (failed 状态时返回) +} export interface AutoCreateAccountResult { - userId: string; - accountSequence: number; - referralCode: string; - mnemonic: string; // 兼容字段,MPC模式下为空 - delegateShare?: string; // MPC delegate share (客户端需安全存储) - publicKey?: string; // MPC 公钥 - walletAddresses: { kava: string; dst: string; bsc: string }; + userSerialNum: number; // 用户序列号 + referralCode: string; // 推荐码 + username: string; // 随机用户名 + avatarSvg: string; // 随机SVG头像 accessToken: string; refreshToken: string; } diff --git a/backend/services/identity-service/src/application/event-handlers/blockchain-wallet.handler.ts b/backend/services/identity-service/src/application/event-handlers/blockchain-wallet.handler.ts new file mode 100644 index 00000000..7a78dc07 --- /dev/null +++ b/backend/services/identity-service/src/application/event-handlers/blockchain-wallet.handler.ts @@ -0,0 +1,144 @@ +/** + * Blockchain Wallet Event Handler + * + * Handles wallet address events from blockchain-service: + * - WalletAddressCreated: Saves derived wallet addresses to user account + * + * This handler receives properly derived addresses from blockchain-service: + * - KAVA: Cosmos bech32 format (kava1...) + * - DST: Cosmos bech32 format (dst1...) + * - BSC: EVM format (0x...) + */ + +import { Injectable, Inject, Logger, OnModuleInit } from '@nestjs/common'; +import { UserAccountRepository, USER_ACCOUNT_REPOSITORY } from '@/domain/repositories/user-account.repository.interface'; +import { WalletAddress } from '@/domain/entities/wallet-address.entity'; +import { ChainType, UserId } from '@/domain/value-objects'; +import { RedisService } from '@/infrastructure/redis/redis.service'; +import { + BlockchainEventConsumerService, + WalletAddressCreatedPayload, +} from '@/infrastructure/kafka/blockchain-event-consumer.service'; + +// Redis key prefix for keygen status +const KEYGEN_STATUS_PREFIX = 'keygen:status:'; +const KEYGEN_STATUS_TTL = 60 * 60 * 24; // 24 hours + +// Status data for wallet completion (extended from MpcKeygenCompletedHandler) +interface WalletCompletedStatusData { + status: 'completed'; + userId: string; + publicKey?: string; + walletAddresses?: { chainType: string; address: string }[]; + updatedAt: string; +} + +@Injectable() +export class BlockchainWalletHandler implements OnModuleInit { + private readonly logger = new Logger(BlockchainWalletHandler.name); + + constructor( + @Inject(USER_ACCOUNT_REPOSITORY) + private readonly userRepository: UserAccountRepository, + private readonly redisService: RedisService, + private readonly blockchainEventConsumer: BlockchainEventConsumerService, + ) {} + + async onModuleInit() { + // Register event handler + this.blockchainEventConsumer.onWalletAddressCreated(this.handleWalletAddressCreated.bind(this)); + this.logger.log('[INIT] Registered BlockchainWalletHandler for WalletAddressCreated events'); + } + + /** + * Handle WalletAddressCreated event from blockchain-service + * + * This event contains properly derived addresses: + * - KAVA: kava1... (Cosmos bech32) + * - DST: dst1... (Cosmos bech32) + * - BSC: 0x... (EVM) + */ + private async handleWalletAddressCreated(payload: WalletAddressCreatedPayload): Promise { + const { userId, publicKey, addresses } = payload; + + this.logger.log(`[HANDLE] Processing WalletAddressCreated: userId=${userId}`); + this.logger.log(`[HANDLE] Public key: ${publicKey?.substring(0, 30)}...`); + this.logger.log(`[HANDLE] Addresses: ${JSON.stringify(addresses)}`); + + if (!userId) { + this.logger.error('[ERROR] WalletAddressCreated event missing userId, skipping'); + return; + } + + if (!addresses || addresses.length === 0) { + this.logger.error('[ERROR] WalletAddressCreated event missing addresses, skipping'); + return; + } + + try { + // 1. Find user account + const account = await this.userRepository.findById(UserId.create(userId)); + if (!account) { + this.logger.error(`[ERROR] User not found: ${userId}`); + return; + } + + // 2. Create wallet addresses for each chain + const wallets: WalletAddress[] = addresses.map((addr) => { + const chainType = this.parseChainType(addr.chainType); + this.logger.log(`[WALLET] Creating wallet: ${addr.chainType} -> ${addr.address}`); + return WalletAddress.create({ + userId: account.userId, + chainType, + address: addr.address, + }); + }); + + // 3. Save wallet addresses to user account + await this.userRepository.saveWallets(account.userId, wallets); + this.logger.log(`[WALLET] Saved ${wallets.length} wallet addresses for user: ${userId}`); + + // 4. Update Redis status to completed + const statusData: WalletCompletedStatusData = { + status: 'completed', + userId, + publicKey, + walletAddresses: addresses, + updatedAt: new Date().toISOString(), + }; + + await this.redisService.set( + `${KEYGEN_STATUS_PREFIX}${userId}`, + JSON.stringify(statusData), + KEYGEN_STATUS_TTL, + ); + + this.logger.log(`[STATUS] Keygen status updated to 'completed' for user: ${userId}`); + + // Log all addresses + addresses.forEach((addr) => { + this.logger.log(`[COMPLETE] ${addr.chainType}: ${addr.address}`); + }); + } catch (error) { + this.logger.error(`[ERROR] Failed to process WalletAddressCreated: ${error}`, error); + } + } + + /** + * Parse chain type string to ChainType value object + */ + private parseChainType(chainType: string): ChainType { + const normalizedType = chainType.toUpperCase(); + switch (normalizedType) { + case 'KAVA': + return ChainType.KAVA; + case 'DST': + return ChainType.DST; + case 'BSC': + return ChainType.BSC; + default: + this.logger.warn(`[WARN] Unknown chain type: ${chainType}, defaulting to BSC`); + return ChainType.BSC; + } + } +} diff --git a/backend/services/identity-service/src/application/event-handlers/index.ts b/backend/services/identity-service/src/application/event-handlers/index.ts index 92a915c1..c320a7ba 100644 --- a/backend/services/identity-service/src/application/event-handlers/index.ts +++ b/backend/services/identity-service/src/application/event-handlers/index.ts @@ -1 +1,2 @@ export * from './mpc-keygen-completed.handler'; +export * from './blockchain-wallet.handler'; diff --git a/backend/services/identity-service/src/application/event-handlers/mpc-keygen-completed.handler.ts b/backend/services/identity-service/src/application/event-handlers/mpc-keygen-completed.handler.ts index c696bc77..3e4bfc02 100644 --- a/backend/services/identity-service/src/application/event-handlers/mpc-keygen-completed.handler.ts +++ b/backend/services/identity-service/src/application/event-handlers/mpc-keygen-completed.handler.ts @@ -2,16 +2,17 @@ * MPC Keygen Event Handler * * Handles keygen events from mpc-service: - * - KeygenStarted: Updates status in Redis - * - KeygenCompleted: Derives wallet addresses and saves to user account - * - SessionFailed: Logs error and updates status + * - KeygenStarted: Updates status in Redis to "generating" + * - KeygenCompleted: Updates status to indicate waiting for blockchain-service + * - SessionFailed: Logs error and updates status to "failed" + * + * NOTE: Address derivation is now handled by blockchain-service. + * This handler only manages status updates. The actual wallet addresses + * are saved by BlockchainWalletHandler when it receives WalletAddressCreated + * events from blockchain-service. */ -import { Injectable, Inject, Logger, OnModuleInit } from '@nestjs/common'; -import { keccak256 } from 'ethers'; -import { UserAccountRepository, USER_ACCOUNT_REPOSITORY } from '@/domain/repositories/user-account.repository.interface'; -import { WalletAddress } from '@/domain/entities/wallet-address.entity'; -import { ChainType, UserId } from '@/domain/value-objects'; +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; import { RedisService } from '@/infrastructure/redis/redis.service'; import { MpcEventConsumerService, @@ -24,14 +25,13 @@ import { const KEYGEN_STATUS_PREFIX = 'keygen:status:'; const KEYGEN_STATUS_TTL = 60 * 60 * 24; // 24 hours -export type KeygenStatus = 'pending' | 'generating' | 'completed' | 'failed'; +export type KeygenStatus = 'pending' | 'generating' | 'deriving' | 'completed' | 'failed'; export interface KeygenStatusData { status: KeygenStatus; userId: string; mpcSessionId?: string; publicKey?: string; - walletAddress?: string; errorMessage?: string; updatedAt: string; } @@ -41,28 +41,26 @@ export class MpcKeygenCompletedHandler implements OnModuleInit { private readonly logger = new Logger(MpcKeygenCompletedHandler.name); constructor( - @Inject(USER_ACCOUNT_REPOSITORY) - private readonly userRepository: UserAccountRepository, private readonly redisService: RedisService, private readonly mpcEventConsumer: MpcEventConsumerService, ) {} async onModuleInit() { - // 注册事件处理器 + // Register event handlers this.mpcEventConsumer.onKeygenStarted(this.handleKeygenStarted.bind(this)); this.mpcEventConsumer.onKeygenCompleted(this.handleKeygenCompleted.bind(this)); this.mpcEventConsumer.onSessionFailed(this.handleSessionFailed.bind(this)); - this.logger.log('Registered MPC event handlers'); + this.logger.log('[INIT] Registered MPC event handlers (status updates only)'); } /** - * 处理 keygen 开始事件 + * Handle keygen started event * - * 更新 Redis 中的状态为 "generating" + * Update Redis status to "generating" */ private async handleKeygenStarted(payload: KeygenStartedPayload): Promise { const { userId, mpcSessionId } = payload; - this.logger.log(`Keygen started: userId=${userId}, mpcSessionId=${mpcSessionId}`); + this.logger.log(`[STATUS] Keygen started: userId=${userId}, mpcSessionId=${mpcSessionId}`); try { const statusData: KeygenStatusData = { @@ -78,60 +76,38 @@ export class MpcKeygenCompletedHandler implements OnModuleInit { KEYGEN_STATUS_TTL, ); - this.logger.log(`Keygen status updated to 'generating' for user: ${userId}`); + this.logger.log(`[STATUS] Keygen status updated to 'generating' for user: ${userId}`); } catch (error) { - this.logger.error(`Failed to update keygen status: ${error}`, error); + this.logger.error(`[ERROR] Failed to update keygen status: ${error}`, error); } } /** - * 处理 keygen 完成事件 + * Handle keygen completed event * - * 从 mpc-service 收到公钥后: - * 1. 解析用户信息 - * 2. 从公钥派生各链钱包地址 - * 3. 保存钱包地址到用户账户 - * 4. 更新 Redis 状态为 completed + * From mpc-service, keygen is complete with public key. + * Update status to "deriving" - blockchain-service will now derive addresses + * and send WalletAddressCreated event which BlockchainWalletHandler will process. */ private async handleKeygenCompleted(payload: KeygenCompletedPayload): Promise { const { publicKey, extraPayload } = payload; if (!extraPayload?.userId) { - this.logger.warn('KeygenCompleted event missing userId, skipping'); + this.logger.warn('[WARN] KeygenCompleted event missing userId, skipping'); return; } const { userId, username } = extraPayload; - this.logger.log(`Processing keygen completed: userId=${userId}, username=${username}`); + this.logger.log(`[STATUS] Keygen completed: userId=${userId}, username=${username}`); + this.logger.log(`[STATUS] Public key: ${publicKey?.substring(0, 30)}...`); + this.logger.log(`[STATUS] Waiting for blockchain-service to derive addresses...`); try { - // 1. 查找用户账户 - const account = await this.userRepository.findById(UserId.create(userId)); - if (!account) { - this.logger.error(`User not found: ${userId}`); - return; - } - - // 2. 从公钥派生以太坊地址 (各链通用 EVM 地址) - const walletAddress = this.deriveAddressFromPublicKey(publicKey); - this.logger.log(`Derived wallet address: ${walletAddress}`); - - // 3. 创建三条链的钱包地址 - const wallets: WalletAddress[] = [ - WalletAddress.create({ userId: account.userId, chainType: ChainType.KAVA, address: walletAddress }), - WalletAddress.create({ userId: account.userId, chainType: ChainType.DST, address: walletAddress }), - WalletAddress.create({ userId: account.userId, chainType: ChainType.BSC, address: walletAddress }), - ]; - - // 4. 保存钱包地址到用户账户 - await this.userRepository.saveWallets(account.userId, wallets); - - // 5. 更新 Redis 状态为 completed + // Update status to "deriving" - waiting for blockchain-service const statusData: KeygenStatusData = { - status: 'completed', + status: 'deriving', userId, publicKey, - walletAddress, updatedAt: new Date().toISOString(), }; @@ -141,32 +117,33 @@ export class MpcKeygenCompletedHandler implements OnModuleInit { KEYGEN_STATUS_TTL, ); - this.logger.log(`Wallet addresses saved for user: ${userId}, address: ${walletAddress}`); + this.logger.log(`[STATUS] Keygen status updated to 'deriving' for user: ${userId}`); + this.logger.log(`[STATUS] blockchain-service will derive addresses and send WalletAddressCreated event`); } catch (error) { - this.logger.error(`Failed to process keygen completed: ${error}`, error); + this.logger.error(`[ERROR] Failed to update keygen status: ${error}`, error); } } /** - * 处理 session 失败事件 + * Handle session failed event * - * 当 keygen 失败时: - * 1. 记录错误日志 - * 2. 更新 Redis 状态为 failed + * When keygen fails: + * 1. Log error + * 2. Update Redis status to "failed" */ private async handleSessionFailed(payload: SessionFailedPayload): Promise { const { sessionType, errorMessage, extraPayload } = payload; - // 只处理 keygen 失败 + // Only handle keygen failures if (sessionType !== 'keygen' && sessionType !== 'KEYGEN') { return; } const userId = extraPayload?.userId || 'unknown'; - this.logger.error(`Keygen failed for user ${userId}: ${errorMessage}`); + this.logger.error(`[ERROR] Keygen failed for user ${userId}: ${errorMessage}`); try { - // 更新 Redis 状态为 failed + // Update Redis status to failed const statusData: KeygenStatusData = { status: 'failed', userId, @@ -180,86 +157,9 @@ export class MpcKeygenCompletedHandler implements OnModuleInit { KEYGEN_STATUS_TTL, ); - this.logger.log(`Keygen status updated to 'failed' for user: ${userId}`); + this.logger.log(`[STATUS] Keygen status updated to 'failed' for user: ${userId}`); } catch (error) { - this.logger.error(`Failed to update keygen failed status: ${error}`, error); + this.logger.error(`[ERROR] Failed to update keygen failed status: ${error}`, error); } } - - /** - * 从压缩公钥派生以太坊地址 - * - * @param compressedPubKey 33字节压缩公钥 (hex string) - * @returns 以太坊地址 (0x...) - */ - private deriveAddressFromPublicKey(compressedPubKey: string): string { - // 移除 0x 前缀(如果有) - const pubKeyHex = compressedPubKey.startsWith('0x') - ? compressedPubKey.slice(2) - : compressedPubKey; - - // 如果是压缩公钥 (33 bytes = 66 hex chars),需要解压 - let uncompressedPubKey: string; - if (pubKeyHex.length === 66) { - // 压缩公钥,需要解压 - uncompressedPubKey = this.decompressPublicKey(pubKeyHex); - } else if (pubKeyHex.length === 128 || pubKeyHex.length === 130) { - // 未压缩公钥 (带或不带 04 前缀) - uncompressedPubKey = pubKeyHex.length === 130 ? pubKeyHex.slice(2) : pubKeyHex; - } else { - throw new Error(`Invalid public key length: ${pubKeyHex.length}`); - } - - // 对未压缩公钥进行 keccak256 哈希 - const hash = keccak256('0x' + uncompressedPubKey); - // 取最后 20 字节作为地址 - return '0x' + hash.slice(-40); - } - - /** - * 解压 secp256k1 压缩公钥 - */ - private decompressPublicKey(compressedHex: string): string { - const prefix = parseInt(compressedHex.slice(0, 2), 16); - const xHex = compressedHex.slice(2); - const x = BigInt('0x' + xHex); - - // secp256k1 curve parameters - const p = BigInt('0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEFFFFFC2F'); - const a = BigInt(0); - const b = BigInt(7); - - // Calculate y^2 = x^3 + ax + b (mod p) - const ySquared = (x ** 3n + a * x + b) % p; - - // Calculate modular square root - const y = this.modPow(ySquared, (p + 1n) / 4n, p); - - // Choose correct y based on prefix (02 = even, 03 = odd) - const isEven = y % 2n === 0n; - const needEven = prefix === 0x02; - const finalY = isEven === needEven ? y : p - y; - - // Format as 64-char hex strings - const xStr = x.toString(16).padStart(64, '0'); - const yStr = finalY.toString(16).padStart(64, '0'); - - return xStr + yStr; - } - - /** - * Modular exponentiation - */ - private modPow(base: bigint, exp: bigint, mod: bigint): bigint { - let result = 1n; - base = base % mod; - while (exp > 0n) { - if (exp % 2n === 1n) { - result = (result * base) % mod; - } - exp = exp / 2n; - base = (base * base) % mod; - } - return result; - } } diff --git a/backend/services/identity-service/src/application/services/user-application.service.ts b/backend/services/identity-service/src/application/services/user-application.service.ts index 335ca351..b7557d85 100644 --- a/backend/services/identity-service/src/application/services/user-application.service.ts +++ b/backend/services/identity-service/src/application/services/user-application.service.ts @@ -8,7 +8,7 @@ import { } from '@/domain/services'; import { UserId, PhoneNumber, ReferralCode, AccountSequence, ProvinceCode, CityCode, - ChainType, Mnemonic, KYCInfo, + ChainType, Mnemonic, KYCInfo, HardwareInfo, } from '@/domain/value-objects'; import { TokenService } from './token.service'; import { RedisService } from '@/infrastructure/redis/redis.service'; @@ -17,12 +17,14 @@ import { EventPublisherService } from '@/infrastructure/kafka/event-publisher.se import { MpcWalletService } from '@/infrastructure/external/mpc'; import { BackupClientService } from '@/infrastructure/external/backup'; import { ApplicationError } from '@/shared/exceptions/domain.exception'; +import { generateRandomIdentity } from '@/shared/utils'; import { AutoCreateAccountCommand, RecoverByMnemonicCommand, RecoverByPhoneCommand, AutoLoginCommand, RegisterCommand, LoginCommand, BindPhoneNumberCommand, UpdateProfileCommand, SubmitKYCCommand, ReviewKYCCommand, RemoveDeviceCommand, SendSmsCodeCommand, GetMyProfileQuery, GetMyDevicesQuery, GetUserByReferralCodeQuery, ValidateReferralCodeQuery, GetReferralStatsQuery, GenerateReferralLinkCommand, + GetWalletStatusQuery, WalletStatusResult, AutoCreateAccountResult, RecoverAccountResult, AutoLoginResult, RegisterResult, LoginResult, UserProfileDTO, DeviceDTO, UserBriefDTO, ReferralCodeValidationResult, ReferralLinkResult, ReferralStatsResult, MeResult, @@ -51,13 +53,16 @@ export class UserApplicationService { /** * 自动创建账户 (首次打开APP) * - * 使用 MPC 2-of-3 协议生成钱包地址: - * - 生成三条链 (BSC/KAVA/DST) 的钱包地址 - * - 计算地址摘要并用 MPC 签名 - * - 签名存储在数据库中用于防止地址被篡改 + * 简化版本: + * - 生成随机用户名和头像 + * - 创建账户记录 + * - 生成推荐码 + * - 返回 token + * + * 注意: MPC钱包地址生成移到后台异步处理 */ async autoCreateAccount(command: AutoCreateAccountCommand): Promise { - this.logger.log(`Creating account with MPC 2-of-3 for device: ${command.deviceId}`); + this.logger.log(`Creating account for device: ${command.deviceId}`); // 1. 验证设备ID (检查设备是否已创建过账户) const deviceValidation = await this.validatorService.checkDeviceNotRegistered(command.deviceId); @@ -76,83 +81,71 @@ export class UserApplicationService { // 3. 生成用户序列号 const accountSequence = await this.sequenceGenerator.generateNextUserSequence(); - // 4. 创建用户账户 + // 4. 生成随机用户名和头像 + const identity = generateRandomIdentity(); + + // 5. 构建设备名称字符串和硬件信息 + let deviceNameStr = '未命名设备'; + let hardwareInfo: HardwareInfo | undefined; + if (command.deviceName) { + const parts: string[] = []; + if (command.deviceName.model) parts.push(command.deviceName.model); + if (command.deviceName.platform) parts.push(command.deviceName.platform); + if (command.deviceName.osVersion) parts.push(command.deviceName.osVersion); + if (parts.length > 0) deviceNameStr = parts.join(' '); + hardwareInfo = { + platform: command.deviceName.platform, + deviceModel: command.deviceName.model, + osVersion: command.deviceName.osVersion, + }; + } + + // 6. 创建用户账户 const account = UserAccount.createAutomatic({ accountSequence, initialDeviceId: command.deviceId, - deviceName: command.deviceName, + deviceName: deviceNameStr, + hardwareInfo, inviterSequence, - province: ProvinceCode.create(command.provinceCode || 'DEFAULT'), - city: CityCode.create(command.cityCode || 'DEFAULT'), + province: ProvinceCode.create('DEFAULT'), + city: CityCode.create('DEFAULT'), + nickname: identity.username, + avatarSvg: identity.avatarSvg, }); - // 5. 使用 MPC 2-of-3 生成三链钱包地址 - this.logger.log(`Generating MPC wallet for account sequence: ${accountSequence.value}`); - const mpcResult = await this.mpcWalletService.generateMpcWallet({ - userId: account.userId.toString(), - username: accountSequence.value.toString(), // 使用账户序列号作为用户名 - deviceId: command.deviceId, - }); - - // 6. 创建钱包地址实体 (包含 MPC 签名) - const wallets = new Map(); - for (const walletInfo of mpcResult.wallets) { - const chainType = walletInfo.chainType as ChainType; - const wallet = WalletAddress.createMpc({ - userId: account.userId, - chainType, - address: walletInfo.address, - publicKey: walletInfo.publicKey, - addressDigest: walletInfo.addressDigest, - signature: walletInfo.signature, - }); - wallets.set(chainType, wallet); - } - - // 7. 绑定钱包地址到账户 - account.bindMultipleWalletAddresses(wallets); - - // 8. 保存账户和钱包 + // 7. 保存账户 await this.userRepository.save(account); - await this.userRepository.saveWallets(account.userId, Array.from(wallets.values())); - // 9. 保存 delegate share 到 backup-service (用于恢复) - // 注意: delegate share 由 mpc-service 代理生成,用户设备也应安全存储一份 - if (mpcResult.delegateShare) { - await this.backupClient.storeBackupShare({ - userId: account.userId.toString(), - accountSequence: account.accountSequence.value, - publicKey: mpcResult.publicKey, - encryptedShareData: mpcResult.delegateShare, - }); - this.logger.log(`Delegate share sent to backup-service for user: ${account.userId.toString()}`); - } - - // 10. 生成 Token + // 8. 生成 Token const tokens = await this.tokenService.generateTokenPair({ userId: account.userId.toString(), accountSequence: account.accountSequence.value, deviceId: command.deviceId, }); - // 11. 发布领域事件 + // 9. 发布领域事件 (包含 UserAccountAutoCreated) await this.eventPublisher.publishAll(account.domainEvents); account.clearDomainEvents(); - this.logger.log(`Account created successfully: sequence=${accountSequence.value}, publicKey=${mpcResult.publicKey}`); + // 10. 发布 MPC Keygen 请求事件 (触发后台生成钱包) + const { MpcKeygenRequestedEvent } = await import('@/domain/events'); + const sessionId = crypto.randomUUID(); + await this.eventPublisher.publish(new MpcKeygenRequestedEvent({ + sessionId, + userId: account.userId.toString(), + username: `user_${account.accountSequence.value}`, // 用于 mpc-system 标识 + threshold: 2, + totalParties: 3, + requireDelegate: true, + })); + + this.logger.log(`Account created: sequence=${accountSequence.value}, username=${identity.username}, MPC keygen requested`); return { - userId: account.userId.toString(), - accountSequence: account.accountSequence.value, + userSerialNum: account.accountSequence.value, referralCode: account.referralCode.value, - mnemonic: '', // MPC 模式不使用助记词 - delegateShare: mpcResult.delegateShare, // delegate share (客户端需安全存储) - publicKey: mpcResult.publicKey, - walletAddresses: { - kava: wallets.get(ChainType.KAVA)!.address, - dst: wallets.get(ChainType.DST)!.address, - bsc: wallets.get(ChainType.BSC)!.address, - }, + username: account.nickname, + avatarSvg: account.avatarUrl || identity.avatarSvg, accessToken: tokens.accessToken, refreshToken: tokens.refreshToken, }; @@ -645,4 +638,47 @@ export class UserApplicationService { } return result; } + + // ============ 钱包状态查询 ============ + + /** + * 获取钱包状态 (GET /user/{userSerialNum}/wallet) + * + * 钱包通过 Kafka 事件异步生成,此接口用于轮询查询状态 + */ + async getWalletStatus(query: GetWalletStatusQuery): Promise { + const accountSequence = AccountSequence.create(query.userSerialNum); + const account = await this.userRepository.findByAccountSequence(accountSequence); + + if (!account) { + throw new ApplicationError('用户不存在'); + } + + // 获取所有钱包地址 + const wallets = account.getAllWalletAddresses(); + + // 检查是否已有三条链的钱包地址 + const kavaWallet = wallets.find(w => w.chainType === ChainType.KAVA); + const dstWallet = wallets.find(w => w.chainType === ChainType.DST); + const bscWallet = wallets.find(w => w.chainType === ChainType.BSC); + + if (kavaWallet && dstWallet && bscWallet) { + // 钱包已就绪 + // 注意: MPC 模式下没有助记词,返回空字符串 + return { + status: 'ready', + walletAddresses: { + kava: kavaWallet.address, + dst: dstWallet.address, + bsc: bscWallet.address, + }, + mnemonic: '', // MPC模式无助记词 + }; + } + + // 钱包还在生成中 + return { + status: 'generating', + }; + } } diff --git a/backend/services/identity-service/src/domain/aggregates/user-account/user-account.aggregate.ts b/backend/services/identity-service/src/domain/aggregates/user-account/user-account.aggregate.ts index daf94f40..ecf9b080 100644 --- a/backend/services/identity-service/src/domain/aggregates/user-account/user-account.aggregate.ts +++ b/backend/services/identity-service/src/domain/aggregates/user-account/user-account.aggregate.ts @@ -1,7 +1,7 @@ import { DomainError } from '@/shared/exceptions/domain.exception'; import { UserId, AccountSequence, PhoneNumber, ReferralCode, ProvinceCode, CityCode, - DeviceInfo, ChainType, KYCInfo, KYCStatus, AccountStatus, + DeviceInfo, HardwareInfo, ChainType, KYCInfo, KYCStatus, AccountStatus, } from '@/domain/value-objects'; import { WalletAddress } from '@/domain/entities/wallet-address.entity'; import { @@ -87,19 +87,26 @@ export class UserAccount { accountSequence: AccountSequence; initialDeviceId: string; deviceName?: string; + hardwareInfo?: HardwareInfo; inviterSequence: AccountSequence | null; province: ProvinceCode; city: CityCode; + nickname?: string; + avatarSvg?: string; }): UserAccount { const devices = new Map(); devices.set(params.initialDeviceId, new DeviceInfo( params.initialDeviceId, params.deviceName || '未命名设备', new Date(), new Date(), + params.hardwareInfo, )); // UserID将由数据库自动生成(autoincrement),这里使用临时值0 + const nickname = params.nickname || `用户${params.accountSequence.value}`; + const avatarUrl = params.avatarSvg || null; + const account = new UserAccount( UserId.create(0), params.accountSequence, devices, null, - `用户${params.accountSequence.value}`, null, params.inviterSequence, + nickname, avatarUrl, params.inviterSequence, ReferralCode.generate(), params.province, params.city, null, new Map(), null, KYCStatus.NOT_VERIFIED, AccountStatus.ACTIVE, new Date(), null, new Date(), @@ -123,6 +130,7 @@ export class UserAccount { phoneNumber: PhoneNumber; initialDeviceId: string; deviceName?: string; + hardwareInfo?: HardwareInfo; inviterSequence: AccountSequence | null; province: ProvinceCode; city: CityCode; @@ -130,6 +138,7 @@ export class UserAccount { const devices = new Map(); devices.set(params.initialDeviceId, new DeviceInfo( params.initialDeviceId, params.deviceName || '未命名设备', new Date(), new Date(), + params.hardwareInfo, )); // UserID将由数据库自动生成(autoincrement),这里使用临时值0 @@ -192,15 +201,21 @@ export class UserAccount { ); } - addDevice(deviceId: string, deviceName?: string): void { + addDevice(deviceId: string, deviceName?: string, hardwareInfo?: HardwareInfo): void { this.ensureActive(); if (this._devices.size >= 5 && !this._devices.has(deviceId)) { throw new DomainError('最多允许5个设备同时登录'); } if (this._devices.has(deviceId)) { - this._devices.get(deviceId)!.updateActivity(); + const device = this._devices.get(deviceId)!; + device.updateActivity(); + if (hardwareInfo) { + device.updateHardwareInfo(hardwareInfo); + } } else { - this._devices.set(deviceId, new DeviceInfo(deviceId, deviceName || '未命名设备', new Date(), new Date())); + this._devices.set(deviceId, new DeviceInfo( + deviceId, deviceName || '未命名设备', new Date(), new Date(), hardwareInfo, + )); this.addDomainEvent(new DeviceAddedEvent({ userId: this.userId.toString(), accountSequence: this.accountSequence.value, diff --git a/backend/services/identity-service/src/domain/value-objects/index.ts b/backend/services/identity-service/src/domain/value-objects/index.ts index 32a33a61..2d122a5f 100644 --- a/backend/services/identity-service/src/domain/value-objects/index.ts +++ b/backend/services/identity-service/src/domain/value-objects/index.ts @@ -144,26 +144,65 @@ export class Mnemonic { } } +// ============ HardwareInfo ============ +export interface HardwareInfo { + platform?: string; // ios, android, web + deviceModel?: string; // iPhone 15 Pro, Pixel 8 + osVersion?: string; // iOS 17.2, Android 14 + appVersion?: string; // 1.0.0 + screenWidth?: number; + screenHeight?: number; + locale?: string; // zh-CN, en-US + timezone?: string; // Asia/Shanghai +} + // ============ DeviceInfo ============ export class DeviceInfo { private _lastActiveAt: Date; + private _hardwareInfo: HardwareInfo; constructor( public readonly deviceId: string, public readonly deviceName: string, public readonly addedAt: Date, lastActiveAt: Date, + hardwareInfo?: HardwareInfo, ) { this._lastActiveAt = lastActiveAt; + this._hardwareInfo = hardwareInfo || {}; } get lastActiveAt(): Date { return this._lastActiveAt; } + get hardwareInfo(): HardwareInfo { + return this._hardwareInfo; + } + + get platform(): string | undefined { + return this._hardwareInfo.platform; + } + + get deviceModel(): string | undefined { + return this._hardwareInfo.deviceModel; + } + + get osVersion(): string | undefined { + return this._hardwareInfo.osVersion; + } + + get appVersion(): string | undefined { + return this._hardwareInfo.appVersion; + } + updateActivity(): void { this._lastActiveAt = new Date(); } + + updateHardwareInfo(info: HardwareInfo): void { + this._hardwareInfo = { ...this._hardwareInfo, ...info }; + } } // ============ ChainType ============ diff --git a/backend/services/identity-service/src/infrastructure/kafka/blockchain-event-consumer.service.ts b/backend/services/identity-service/src/infrastructure/kafka/blockchain-event-consumer.service.ts new file mode 100644 index 00000000..47d153e6 --- /dev/null +++ b/backend/services/identity-service/src/infrastructure/kafka/blockchain-event-consumer.service.ts @@ -0,0 +1,144 @@ +/** + * Blockchain Event Consumer Service + * + * Consumes wallet address creation events from blockchain-service via Kafka. + * Updates user wallet addresses when blockchain-service derives addresses from MPC public keys. + */ + +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { Kafka, Consumer, logLevel, EachMessagePayload } from 'kafkajs'; + +// Blockchain Event Topics (events from blockchain-service) +export const BLOCKCHAIN_TOPICS = { + WALLET_ADDRESS_CREATED: 'blockchain.wallets', +} as const; + +export interface WalletAddressCreatedPayload { + userId: string; + publicKey: string; + addresses: { + chainType: string; + address: string; + }[]; +} + +export type BlockchainEventHandler = (payload: T) => Promise; + +@Injectable() +export class BlockchainEventConsumerService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(BlockchainEventConsumerService.name); + private kafka: Kafka; + private consumer: Consumer; + private isConnected = false; + + private walletAddressCreatedHandler?: BlockchainEventHandler; + + constructor(private readonly configService: ConfigService) {} + + async onModuleInit() { + const brokers = this.configService.get('KAFKA_BROKERS')?.split(',') || ['localhost:9092']; + const clientId = this.configService.get('KAFKA_CLIENT_ID') || 'identity-service'; + const groupId = 'identity-service-blockchain-events'; + + this.logger.log(`[INIT] Blockchain Event Consumer initializing...`); + this.logger.log(`[INIT] ClientId: ${clientId}`); + this.logger.log(`[INIT] GroupId: ${groupId}`); + this.logger.log(`[INIT] Brokers: ${brokers.join(', ')}`); + this.logger.log(`[INIT] Topics to subscribe: ${Object.values(BLOCKCHAIN_TOPICS).join(', ')}`); + + this.kafka = new Kafka({ + clientId, + brokers, + logLevel: logLevel.WARN, + retry: { + initialRetryTime: 100, + retries: 8, + }, + }); + + this.consumer = this.kafka.consumer({ + groupId, + sessionTimeout: 30000, + heartbeatInterval: 3000, + }); + + try { + this.logger.log(`[CONNECT] Connecting Blockchain Event consumer...`); + await this.consumer.connect(); + this.isConnected = true; + this.logger.log(`[CONNECT] Blockchain Event Kafka consumer connected successfully`); + + // Subscribe to blockchain topics + await this.consumer.subscribe({ topics: Object.values(BLOCKCHAIN_TOPICS), fromBeginning: false }); + this.logger.log(`[SUBSCRIBE] Subscribed to blockchain topics: ${Object.values(BLOCKCHAIN_TOPICS).join(', ')}`); + + // Start consuming + await this.startConsuming(); + } catch (error) { + this.logger.error(`[ERROR] Failed to connect Blockchain Event Kafka consumer`, error); + } + } + + async onModuleDestroy() { + if (this.isConnected) { + await this.consumer.disconnect(); + this.logger.log('Blockchain Event Kafka consumer disconnected'); + } + } + + /** + * Register handler for wallet address created events + */ + onWalletAddressCreated(handler: BlockchainEventHandler): void { + this.walletAddressCreatedHandler = handler; + this.logger.log(`[REGISTER] WalletAddressCreated handler registered`); + } + + private async startConsuming(): Promise { + await this.consumer.run({ + eachMessage: async ({ topic, partition, message }: EachMessagePayload) => { + const offset = message.offset; + this.logger.log(`[RECEIVE] Message received: topic=${topic}, partition=${partition}, offset=${offset}`); + + try { + const value = message.value?.toString(); + if (!value) { + this.logger.warn(`[RECEIVE] Empty message received on ${topic}`); + return; + } + + this.logger.log(`[RECEIVE] Raw message value: ${value.substring(0, 500)}...`); + + const parsed = JSON.parse(value); + const payload = parsed.payload || parsed; + const eventType = parsed.eventType || 'unknown'; + + this.logger.log(`[RECEIVE] Parsed event: eventType=${eventType}`); + this.logger.log(`[RECEIVE] Payload keys: ${Object.keys(payload).join(', ')}`); + + // Handle WalletAddressCreated events + if (eventType === 'blockchain.wallet.address.created' || topic === BLOCKCHAIN_TOPICS.WALLET_ADDRESS_CREATED) { + this.logger.log(`[HANDLE] Processing WalletAddressCreated event`); + this.logger.log(`[HANDLE] userId: ${payload.userId}`); + this.logger.log(`[HANDLE] publicKey: ${payload.publicKey?.substring(0, 30)}...`); + this.logger.log(`[HANDLE] addresses count: ${payload.addresses?.length}`); + + if (this.walletAddressCreatedHandler) { + await this.walletAddressCreatedHandler(payload as WalletAddressCreatedPayload); + this.logger.log(`[HANDLE] WalletAddressCreated handler completed successfully`); + } else { + this.logger.warn(`[HANDLE] No handler registered for WalletAddressCreated`); + } + } else { + this.logger.warn(`[RECEIVE] Unknown event type: ${eventType}`); + } + } catch (error) { + this.logger.error(`[ERROR] Error processing blockchain event from ${topic}`, error); + } + }, + }); + + this.logger.log(`[START] Started consuming blockchain events`); + } +} diff --git a/backend/services/identity-service/src/infrastructure/kafka/index.ts b/backend/services/identity-service/src/infrastructure/kafka/index.ts index f9d2d6e3..e07c7174 100644 --- a/backend/services/identity-service/src/infrastructure/kafka/index.ts +++ b/backend/services/identity-service/src/infrastructure/kafka/index.ts @@ -4,3 +4,4 @@ export * from './event-consumer.controller'; export * from './dead-letter.service'; export * from './event-retry.service'; export * from './mpc-event-consumer.service'; +export * from './blockchain-event-consumer.service'; diff --git a/backend/services/identity-service/src/infrastructure/kafka/kafka.module.ts b/backend/services/identity-service/src/infrastructure/kafka/kafka.module.ts index 0d0ef1b2..f30dc288 100644 --- a/backend/services/identity-service/src/infrastructure/kafka/kafka.module.ts +++ b/backend/services/identity-service/src/infrastructure/kafka/kafka.module.ts @@ -1,15 +1,18 @@ import { Module } from '@nestjs/common'; import { EventPublisherService } from './event-publisher.service'; import { MpcEventConsumerService } from './mpc-event-consumer.service'; +import { BlockchainEventConsumerService } from './blockchain-event-consumer.service'; @Module({ providers: [ EventPublisherService, MpcEventConsumerService, + BlockchainEventConsumerService, ], exports: [ EventPublisherService, MpcEventConsumerService, + BlockchainEventConsumerService, ], }) export class KafkaModule {} diff --git a/backend/services/identity-service/src/infrastructure/persistence/entities/user-account.entity.ts b/backend/services/identity-service/src/infrastructure/persistence/entities/user-account.entity.ts index 76056184..aebb6642 100644 --- a/backend/services/identity-service/src/infrastructure/persistence/entities/user-account.entity.ts +++ b/backend/services/identity-service/src/infrastructure/persistence/entities/user-account.entity.ts @@ -29,6 +29,16 @@ export interface UserDeviceEntity { userId: bigint; deviceId: string; deviceName: string | null; + // Hardware Info + platform: string | null; + deviceModel: string | null; + osVersion: string | null; + appVersion: string | null; + screenWidth: number | null; + screenHeight: number | null; + locale: string | null; + timezone: string | null; + // Timestamps addedAt: Date; lastActiveAt: Date; } diff --git a/backend/services/identity-service/src/infrastructure/persistence/mappers/user-account.mapper.ts b/backend/services/identity-service/src/infrastructure/persistence/mappers/user-account.mapper.ts index 87d3a262..00247a0f 100644 --- a/backend/services/identity-service/src/infrastructure/persistence/mappers/user-account.mapper.ts +++ b/backend/services/identity-service/src/infrastructure/persistence/mappers/user-account.mapper.ts @@ -1,16 +1,32 @@ import { Injectable } from '@nestjs/common'; import { UserAccount } from '@/domain/aggregates/user-account/user-account.aggregate'; import { WalletAddress } from '@/domain/entities/wallet-address.entity'; -import { DeviceInfo, KYCInfo, KYCStatus, AccountStatus, ChainType, AddressStatus } from '@/domain/value-objects'; +import { DeviceInfo, HardwareInfo, KYCInfo, KYCStatus, AccountStatus, ChainType, AddressStatus } from '@/domain/value-objects'; import { UserAccountEntity } from '../entities/user-account.entity'; import { toMpcSignatureString } from '../entities/wallet-address.entity'; @Injectable() export class UserAccountMapper { toDomain(entity: UserAccountEntity): UserAccount { - const devices = (entity.devices || []).map( - (d) => new DeviceInfo(d.deviceId, d.deviceName || '未命名设备', d.addedAt, d.lastActiveAt), - ); + const devices = (entity.devices || []).map((d) => { + const hardwareInfo: HardwareInfo = { + platform: d.platform || undefined, + deviceModel: d.deviceModel || undefined, + osVersion: d.osVersion || undefined, + appVersion: d.appVersion || undefined, + screenWidth: d.screenWidth || undefined, + screenHeight: d.screenHeight || undefined, + locale: d.locale || undefined, + timezone: d.timezone || undefined, + }; + return new DeviceInfo( + d.deviceId, + d.deviceName || '未命名设备', + d.addedAt, + d.lastActiveAt, + hardwareInfo, + ); + }); const wallets = (entity.walletAddresses || []).map((w) => WalletAddress.reconstruct({ diff --git a/backend/services/identity-service/src/infrastructure/persistence/repositories/user-account.repository.impl.ts b/backend/services/identity-service/src/infrastructure/persistence/repositories/user-account.repository.impl.ts index 58693f63..ba4b685f 100644 --- a/backend/services/identity-service/src/infrastructure/persistence/repositories/user-account.repository.impl.ts +++ b/backend/services/identity-service/src/infrastructure/persistence/repositories/user-account.repository.impl.ts @@ -7,7 +7,7 @@ import { UserAccount } from '@/domain/aggregates/user-account/user-account.aggre import { WalletAddress } from '@/domain/entities/wallet-address.entity'; import { UserId, AccountSequence, PhoneNumber, ReferralCode, ChainType, - AccountStatus, KYCStatus, DeviceInfo, KYCInfo, AddressStatus, + AccountStatus, KYCStatus, DeviceInfo, HardwareInfo, KYCInfo, AddressStatus, } from '@/domain/value-objects'; import { toMpcSignatureString, fromMpcSignatureString } from '../entities/wallet-address.entity'; @@ -79,6 +79,14 @@ export class UserAccountRepositoryImpl implements UserAccountRepository { userId: savedUserId, deviceId: d.deviceId, deviceName: d.deviceName, + platform: d.hardwareInfo.platform || null, + deviceModel: d.hardwareInfo.deviceModel || null, + osVersion: d.hardwareInfo.osVersion || null, + appVersion: d.hardwareInfo.appVersion || null, + screenWidth: d.hardwareInfo.screenWidth || null, + screenHeight: d.hardwareInfo.screenHeight || null, + locale: d.hardwareInfo.locale || null, + timezone: d.hardwareInfo.timezone || null, addedAt: d.addedAt, lastActiveAt: d.lastActiveAt, })), @@ -205,9 +213,25 @@ export class UserAccountRepositoryImpl implements UserAccountRepository { } private toDomain(data: any): UserAccount { - const devices = data.devices.map( - (d: any) => new DeviceInfo(d.deviceId, d.deviceName || '未命名设备', d.addedAt, d.lastActiveAt), - ); + const devices = data.devices.map((d: any) => { + const hardwareInfo: HardwareInfo = { + platform: d.platform || undefined, + deviceModel: d.deviceModel || undefined, + osVersion: d.osVersion || undefined, + appVersion: d.appVersion || undefined, + screenWidth: d.screenWidth || undefined, + screenHeight: d.screenHeight || undefined, + locale: d.locale || undefined, + timezone: d.timezone || undefined, + }; + return new DeviceInfo( + d.deviceId, + d.deviceName || '未命名设备', + d.addedAt, + d.lastActiveAt, + hardwareInfo, + ); + }); const wallets = data.walletAddresses.map((w: any) => WalletAddress.reconstruct({