diff --git a/backend/services/identity-service/src/application/application.module.ts b/backend/services/identity-service/src/application/application.module.ts index ceb88305..e2ef1da3 100644 --- a/backend/services/identity-service/src/application/application.module.ts +++ b/backend/services/identity-service/src/application/application.module.ts @@ -8,6 +8,7 @@ import { RecoverByPhoneHandler } from './commands/recover-by-phone/recover-by-ph import { BindPhoneHandler } from './commands/bind-phone/bind-phone.handler'; import { GetMyProfileHandler } from './queries/get-my-profile/get-my-profile.handler'; import { GetMyDevicesHandler } from './queries/get-my-devices/get-my-devices.handler'; +import { MpcKeygenCompletedHandler } from './event-handlers/mpc-keygen-completed.handler'; import { DomainModule } from '@/domain/domain.module'; import { InfrastructureModule } from '@/infrastructure/infrastructure.module'; @@ -23,6 +24,8 @@ import { InfrastructureModule } from '@/infrastructure/infrastructure.module'; BindPhoneHandler, GetMyProfileHandler, GetMyDevicesHandler, + // MPC Event Handlers + MpcKeygenCompletedHandler, ], exports: [ UserApplicationService, diff --git a/backend/services/identity-service/src/application/event-handlers/index.ts b/backend/services/identity-service/src/application/event-handlers/index.ts new file mode 100644 index 00000000..92a915c1 --- /dev/null +++ b/backend/services/identity-service/src/application/event-handlers/index.ts @@ -0,0 +1 @@ +export * from './mpc-keygen-completed.handler'; diff --git a/backend/services/identity-service/src/application/event-handlers/mpc-keygen-completed.handler.ts b/backend/services/identity-service/src/application/event-handlers/mpc-keygen-completed.handler.ts new file mode 100644 index 00000000..c696bc77 --- /dev/null +++ b/backend/services/identity-service/src/application/event-handlers/mpc-keygen-completed.handler.ts @@ -0,0 +1,265 @@ +/** + * MPC Keygen Event Handler + * + * Handles keygen events from mpc-service: + * - KeygenStarted: Updates status in Redis + * - KeygenCompleted: Derives wallet addresses and saves to user account + * - SessionFailed: Logs error and updates status + */ + +import { Injectable, Inject, Logger, OnModuleInit } from '@nestjs/common'; +import { keccak256 } from 'ethers'; +import { UserAccountRepository, USER_ACCOUNT_REPOSITORY } from '@/domain/repositories/user-account.repository.interface'; +import { WalletAddress } from '@/domain/entities/wallet-address.entity'; +import { ChainType, UserId } from '@/domain/value-objects'; +import { RedisService } from '@/infrastructure/redis/redis.service'; +import { + MpcEventConsumerService, + KeygenStartedPayload, + KeygenCompletedPayload, + SessionFailedPayload, +} from '@/infrastructure/kafka/mpc-event-consumer.service'; + +// Redis key prefix for keygen status +const KEYGEN_STATUS_PREFIX = 'keygen:status:'; +const KEYGEN_STATUS_TTL = 60 * 60 * 24; // 24 hours + +export type KeygenStatus = 'pending' | 'generating' | 'completed' | 'failed'; + +export interface KeygenStatusData { + status: KeygenStatus; + userId: string; + mpcSessionId?: string; + publicKey?: string; + walletAddress?: string; + errorMessage?: string; + updatedAt: string; +} + +@Injectable() +export class MpcKeygenCompletedHandler implements OnModuleInit { + private readonly logger = new Logger(MpcKeygenCompletedHandler.name); + + constructor( + @Inject(USER_ACCOUNT_REPOSITORY) + private readonly userRepository: UserAccountRepository, + private readonly redisService: RedisService, + private readonly mpcEventConsumer: MpcEventConsumerService, + ) {} + + async onModuleInit() { + // 注册事件处理器 + this.mpcEventConsumer.onKeygenStarted(this.handleKeygenStarted.bind(this)); + this.mpcEventConsumer.onKeygenCompleted(this.handleKeygenCompleted.bind(this)); + this.mpcEventConsumer.onSessionFailed(this.handleSessionFailed.bind(this)); + this.logger.log('Registered MPC event handlers'); + } + + /** + * 处理 keygen 开始事件 + * + * 更新 Redis 中的状态为 "generating" + */ + private async handleKeygenStarted(payload: KeygenStartedPayload): Promise { + const { userId, mpcSessionId } = payload; + this.logger.log(`Keygen started: userId=${userId}, mpcSessionId=${mpcSessionId}`); + + try { + const statusData: KeygenStatusData = { + status: 'generating', + userId, + mpcSessionId, + updatedAt: new Date().toISOString(), + }; + + await this.redisService.set( + `${KEYGEN_STATUS_PREFIX}${userId}`, + JSON.stringify(statusData), + KEYGEN_STATUS_TTL, + ); + + this.logger.log(`Keygen status updated to 'generating' for user: ${userId}`); + } catch (error) { + this.logger.error(`Failed to update keygen status: ${error}`, error); + } + } + + /** + * 处理 keygen 完成事件 + * + * 从 mpc-service 收到公钥后: + * 1. 解析用户信息 + * 2. 从公钥派生各链钱包地址 + * 3. 保存钱包地址到用户账户 + * 4. 更新 Redis 状态为 completed + */ + private async handleKeygenCompleted(payload: KeygenCompletedPayload): Promise { + const { publicKey, extraPayload } = payload; + + if (!extraPayload?.userId) { + this.logger.warn('KeygenCompleted event missing userId, skipping'); + return; + } + + const { userId, username } = extraPayload; + this.logger.log(`Processing keygen completed: userId=${userId}, username=${username}`); + + try { + // 1. 查找用户账户 + const account = await this.userRepository.findById(UserId.create(userId)); + if (!account) { + this.logger.error(`User not found: ${userId}`); + return; + } + + // 2. 从公钥派生以太坊地址 (各链通用 EVM 地址) + const walletAddress = this.deriveAddressFromPublicKey(publicKey); + this.logger.log(`Derived wallet address: ${walletAddress}`); + + // 3. 创建三条链的钱包地址 + const wallets: WalletAddress[] = [ + WalletAddress.create({ userId: account.userId, chainType: ChainType.KAVA, address: walletAddress }), + WalletAddress.create({ userId: account.userId, chainType: ChainType.DST, address: walletAddress }), + WalletAddress.create({ userId: account.userId, chainType: ChainType.BSC, address: walletAddress }), + ]; + + // 4. 保存钱包地址到用户账户 + await this.userRepository.saveWallets(account.userId, wallets); + + // 5. 更新 Redis 状态为 completed + const statusData: KeygenStatusData = { + status: 'completed', + userId, + publicKey, + walletAddress, + updatedAt: new Date().toISOString(), + }; + + await this.redisService.set( + `${KEYGEN_STATUS_PREFIX}${userId}`, + JSON.stringify(statusData), + KEYGEN_STATUS_TTL, + ); + + this.logger.log(`Wallet addresses saved for user: ${userId}, address: ${walletAddress}`); + } catch (error) { + this.logger.error(`Failed to process keygen completed: ${error}`, error); + } + } + + /** + * 处理 session 失败事件 + * + * 当 keygen 失败时: + * 1. 记录错误日志 + * 2. 更新 Redis 状态为 failed + */ + private async handleSessionFailed(payload: SessionFailedPayload): Promise { + const { sessionType, errorMessage, extraPayload } = payload; + + // 只处理 keygen 失败 + if (sessionType !== 'keygen' && sessionType !== 'KEYGEN') { + return; + } + + const userId = extraPayload?.userId || 'unknown'; + this.logger.error(`Keygen failed for user ${userId}: ${errorMessage}`); + + try { + // 更新 Redis 状态为 failed + const statusData: KeygenStatusData = { + status: 'failed', + userId, + errorMessage, + updatedAt: new Date().toISOString(), + }; + + await this.redisService.set( + `${KEYGEN_STATUS_PREFIX}${userId}`, + JSON.stringify(statusData), + KEYGEN_STATUS_TTL, + ); + + this.logger.log(`Keygen status updated to 'failed' for user: ${userId}`); + } catch (error) { + this.logger.error(`Failed to update keygen failed status: ${error}`, error); + } + } + + /** + * 从压缩公钥派生以太坊地址 + * + * @param compressedPubKey 33字节压缩公钥 (hex string) + * @returns 以太坊地址 (0x...) + */ + private deriveAddressFromPublicKey(compressedPubKey: string): string { + // 移除 0x 前缀(如果有) + const pubKeyHex = compressedPubKey.startsWith('0x') + ? compressedPubKey.slice(2) + : compressedPubKey; + + // 如果是压缩公钥 (33 bytes = 66 hex chars),需要解压 + let uncompressedPubKey: string; + if (pubKeyHex.length === 66) { + // 压缩公钥,需要解压 + uncompressedPubKey = this.decompressPublicKey(pubKeyHex); + } else if (pubKeyHex.length === 128 || pubKeyHex.length === 130) { + // 未压缩公钥 (带或不带 04 前缀) + uncompressedPubKey = pubKeyHex.length === 130 ? pubKeyHex.slice(2) : pubKeyHex; + } else { + throw new Error(`Invalid public key length: ${pubKeyHex.length}`); + } + + // 对未压缩公钥进行 keccak256 哈希 + const hash = keccak256('0x' + uncompressedPubKey); + // 取最后 20 字节作为地址 + return '0x' + hash.slice(-40); + } + + /** + * 解压 secp256k1 压缩公钥 + */ + private decompressPublicKey(compressedHex: string): string { + const prefix = parseInt(compressedHex.slice(0, 2), 16); + const xHex = compressedHex.slice(2); + const x = BigInt('0x' + xHex); + + // secp256k1 curve parameters + const p = BigInt('0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEFFFFFC2F'); + const a = BigInt(0); + const b = BigInt(7); + + // Calculate y^2 = x^3 + ax + b (mod p) + const ySquared = (x ** 3n + a * x + b) % p; + + // Calculate modular square root + const y = this.modPow(ySquared, (p + 1n) / 4n, p); + + // Choose correct y based on prefix (02 = even, 03 = odd) + const isEven = y % 2n === 0n; + const needEven = prefix === 0x02; + const finalY = isEven === needEven ? y : p - y; + + // Format as 64-char hex strings + const xStr = x.toString(16).padStart(64, '0'); + const yStr = finalY.toString(16).padStart(64, '0'); + + return xStr + yStr; + } + + /** + * Modular exponentiation + */ + private modPow(base: bigint, exp: bigint, mod: bigint): bigint { + let result = 1n; + base = base % mod; + while (exp > 0n) { + if (exp % 2n === 1n) { + result = (result * base) % mod; + } + exp = exp / 2n; + base = (base * base) % mod; + } + return result; + } +} diff --git a/backend/services/identity-service/src/domain/events/index.ts b/backend/services/identity-service/src/domain/events/index.ts index 714bd341..53ef98d3 100644 --- a/backend/services/identity-service/src/domain/events/index.ts +++ b/backend/services/identity-service/src/domain/events/index.ts @@ -172,3 +172,34 @@ export class UserAccountDeactivatedEvent extends DomainEvent { return 'UserAccountDeactivated'; } } + +/** + * MPC 密钥生成请求事件 + * 用户创建账户后发布此事件,触发 MPC 服务生成钱包地址 + * + * payload 格式需要与 mpc-service 的 KeygenRequestedPayload 匹配: + * - sessionId: 唯一会话ID + * - userId: 用户ID + * - username: 用户名 (用于 mpc-system 标识) + * - threshold: 签名阈值 (默认 2) + * - totalParties: 总参与方数 (默认 3) + * - requireDelegate: 是否需要委托分片 (默认 true) + */ +export class MpcKeygenRequestedEvent extends DomainEvent { + constructor( + public readonly payload: { + sessionId: string; + userId: string; + username: string; + threshold: number; + totalParties: number; + requireDelegate: boolean; + }, + ) { + super(); + } + + get eventType(): string { + return 'MpcKeygenRequested'; + } +} diff --git a/backend/services/identity-service/src/infrastructure/kafka/event-publisher.service.ts b/backend/services/identity-service/src/infrastructure/kafka/event-publisher.service.ts index 77c13729..8c4c40a4 100644 --- a/backend/services/identity-service/src/infrastructure/kafka/event-publisher.service.ts +++ b/backend/services/identity-service/src/infrastructure/kafka/event-publisher.service.ts @@ -1,4 +1,4 @@ -import { Injectable, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { Kafka, Producer, Consumer, logLevel } from 'kafkajs'; import { DomainEvent } from '@/domain/events'; @@ -13,7 +13,7 @@ export interface DomainEventMessage { payload: any; } -// 定义主题常量 +// 定义主题常量 - identity-service 发布的事件 export const IDENTITY_TOPICS = { USER_ACCOUNT_CREATED: 'identity.UserAccountCreated', USER_ACCOUNT_AUTO_CREATED: 'identity.UserAccountAutoCreated', @@ -30,28 +30,49 @@ export const IDENTITY_TOPICS = { USER_ACCOUNT_FROZEN: 'identity.UserAccountFrozen', ACCOUNT_FROZEN: 'identity.AccountFrozen', USER_ACCOUNT_DEACTIVATED: 'identity.UserAccountDeactivated', + // MPC 请求发送到 mpc.* topic,让 mpc-service 消费 + MPC_KEYGEN_REQUESTED: 'mpc.KeygenRequested', + MPC_SIGNING_REQUESTED: 'mpc.SigningRequested', +} as const; + +// 定义 identity-service 需要消费的 MPC 事件主题 +export const MPC_CONSUME_TOPICS = { + KEYGEN_COMPLETED: 'mpc.KeygenCompleted', + SESSION_FAILED: 'mpc.SessionFailed', } as const; @Injectable() export class EventPublisherService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(EventPublisherService.name); private kafka: Kafka; private producer: Producer; constructor(private readonly configService: ConfigService) { + const brokers = (this.configService.get('KAFKA_BROKERS', 'localhost:9092')).split(','); + const clientId = this.configService.get('KAFKA_CLIENT_ID', 'identity-service'); + + this.logger.log(`[INIT] Kafka EventPublisher initializing...`); + this.logger.log(`[INIT] ClientId: ${clientId}`); + this.logger.log(`[INIT] Brokers: ${brokers.join(', ')}`); + this.kafka = new Kafka({ - clientId: this.configService.get('KAFKA_CLIENT_ID', 'identity-service'), - brokers: (this.configService.get('KAFKA_BROKERS', 'localhost:9092')).split(','), + clientId, + brokers, logLevel: logLevel.WARN, }); this.producer = this.kafka.producer(); } async onModuleInit() { + this.logger.log(`[CONNECT] Connecting Kafka producer...`); await this.producer.connect(); + this.logger.log(`[CONNECT] Kafka producer connected successfully`); } async onModuleDestroy() { + this.logger.log(`[DISCONNECT] Disconnecting Kafka producer...`); await this.producer.disconnect(); + this.logger.log(`[DISCONNECT] Kafka producer disconnected`); } async publish(event: DomainEvent): Promise; @@ -61,6 +82,10 @@ export class EventPublisherService implements OnModuleInit, OnModuleDestroy { // 直接发布到指定 topic (用于重试场景) const topic = eventOrTopic; const msg = message!; + + this.logger.log(`[PUBLISH] Publishing to topic: ${topic}`); + this.logger.debug(`[PUBLISH] Message: ${JSON.stringify(msg)}`); + await this.producer.send({ topic, messages: [ @@ -70,28 +95,60 @@ export class EventPublisherService implements OnModuleInit, OnModuleDestroy { }, ], }); + + this.logger.log(`[PUBLISH] Successfully published eventId=${msg.eventId} to ${topic}`); } else { // 从领域事件发布 const event = eventOrTopic; + const topic = this.getTopicForEvent(event); + const payload = (event as any).payload; + + this.logger.log(`[PUBLISH] Publishing event: type=${event.eventType}, topic=${topic}`); + this.logger.log(`[PUBLISH] EventId: ${event.eventId}`); + this.logger.debug(`[PUBLISH] Payload: ${JSON.stringify(payload)}`); + + const messageValue = { + eventId: event.eventId, + eventType: event.eventType, + occurredAt: event.occurredAt.toISOString(), + aggregateId: (event as any).aggregateId || '', + aggregateType: (event as any).aggregateType || 'UserAccount', + payload, + }; + await this.producer.send({ - topic: `identity.${event.eventType}`, + topic, messages: [ { key: event.eventId, - value: JSON.stringify({ - eventId: event.eventId, - eventType: event.eventType, - occurredAt: event.occurredAt.toISOString(), - aggregateId: (event as any).aggregateId || '', - aggregateType: (event as any).aggregateType || 'UserAccount', - payload: (event as any).payload, - }), + value: JSON.stringify(messageValue), }, ], }); + + this.logger.log(`[PUBLISH] Successfully published ${event.eventType} to ${topic}`); } } + /** + * 根据事件类型获取对应的 Kafka topic + * MPC 相关事件发送到 mpc.* topic,其他事件发送到 identity.* topic + */ + private getTopicForEvent(event: DomainEvent): string { + const eventType = event.eventType; + + // MPC 相关事件使用 mpc.* 前缀 + if (eventType === 'MpcKeygenRequested') { + return IDENTITY_TOPICS.MPC_KEYGEN_REQUESTED; + } + if (eventType === 'MpcSigningRequested') { + return IDENTITY_TOPICS.MPC_SIGNING_REQUESTED; + } + + // 其他事件使用 identity.* 前缀 + return `identity.${eventType}`; + } + async publishAll(events: DomainEvent[]): Promise { for (const event of events) { await this.publish(event); diff --git a/backend/services/identity-service/src/infrastructure/kafka/mpc-event-consumer.service.ts b/backend/services/identity-service/src/infrastructure/kafka/mpc-event-consumer.service.ts index a0ea22c2..9e5f4df6 100644 --- a/backend/services/identity-service/src/infrastructure/kafka/mpc-event-consumer.service.ts +++ b/backend/services/identity-service/src/infrastructure/kafka/mpc-event-consumer.service.ts @@ -9,13 +9,21 @@ import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/commo import { ConfigService } from '@nestjs/config'; import { Kafka, Consumer, logLevel, EachMessagePayload } from 'kafkajs'; -// MPC Event Topics +// MPC Event Topics (events from mpc-service) export const MPC_TOPICS = { + KEYGEN_STARTED: 'mpc.KeygenStarted', KEYGEN_COMPLETED: 'mpc.KeygenCompleted', SIGNING_COMPLETED: 'mpc.SigningCompleted', SESSION_FAILED: 'mpc.SessionFailed', } as const; +export interface KeygenStartedPayload { + sessionId: string; + userId: string; + username: string; + mpcSessionId: string; +} + export interface KeygenCompletedPayload { sessionId: string; partyId: string; @@ -68,6 +76,7 @@ export class MpcEventConsumerService implements OnModuleInit, OnModuleDestroy { private consumer: Consumer; private isConnected = false; + private keygenStartedHandler?: MpcEventHandler; private keygenCompletedHandler?: MpcEventHandler; private signingCompletedHandler?: MpcEventHandler; private sessionFailedHandler?: MpcEventHandler; @@ -79,6 +88,12 @@ export class MpcEventConsumerService implements OnModuleInit, OnModuleDestroy { const clientId = this.configService.get('KAFKA_CLIENT_ID') || 'identity-service'; const groupId = 'identity-service-mpc-events'; + this.logger.log(`[INIT] MPC Event Consumer initializing...`); + this.logger.log(`[INIT] ClientId: ${clientId}`); + this.logger.log(`[INIT] GroupId: ${groupId}`); + this.logger.log(`[INIT] Brokers: ${brokers.join(', ')}`); + this.logger.log(`[INIT] Topics to subscribe: ${Object.values(MPC_TOPICS).join(', ')}`); + this.kafka = new Kafka({ clientId, brokers, @@ -96,18 +111,19 @@ export class MpcEventConsumerService implements OnModuleInit, OnModuleDestroy { }); try { + this.logger.log(`[CONNECT] Connecting MPC Event consumer...`); await this.consumer.connect(); this.isConnected = true; - this.logger.log('MPC Event Kafka consumer connected'); + this.logger.log(`[CONNECT] MPC Event Kafka consumer connected successfully`); // Subscribe to MPC topics await this.consumer.subscribe({ topics: Object.values(MPC_TOPICS), fromBeginning: false }); - this.logger.log(`Subscribed to MPC topics: ${Object.values(MPC_TOPICS).join(', ')}`); + this.logger.log(`[SUBSCRIBE] Subscribed to MPC topics: ${Object.values(MPC_TOPICS).join(', ')}`); // Start consuming await this.startConsuming(); } catch (error) { - this.logger.error('Failed to connect MPC Event Kafka consumer', error); + this.logger.error(`[ERROR] Failed to connect MPC Event Kafka consumer`, error); } } @@ -118,6 +134,13 @@ export class MpcEventConsumerService implements OnModuleInit, OnModuleDestroy { } } + /** + * Register handler for keygen started events + */ + onKeygenStarted(handler: MpcEventHandler): void { + this.keygenStartedHandler = handler; + } + /** * Register handler for keygen completed events */ @@ -142,46 +165,77 @@ export class MpcEventConsumerService implements OnModuleInit, OnModuleDestroy { private async startConsuming(): Promise { await this.consumer.run({ eachMessage: async ({ topic, partition, message }: EachMessagePayload) => { + const offset = message.offset; + this.logger.log(`[RECEIVE] Message received: topic=${topic}, partition=${partition}, offset=${offset}`); + try { const value = message.value?.toString(); if (!value) { - this.logger.warn('Empty message received'); + this.logger.warn(`[RECEIVE] Empty message received on ${topic}`); return; } + this.logger.log(`[RECEIVE] Raw message value: ${value.substring(0, 500)}...`); + const parsed = JSON.parse(value); const payload = parsed.payload || parsed; - this.logger.debug(`Received MPC event from ${topic}: ${JSON.stringify(payload)}`); + this.logger.log(`[RECEIVE] Parsed event: eventType=${parsed.eventType || 'unknown'}`); + this.logger.log(`[RECEIVE] Payload keys: ${Object.keys(payload).join(', ')}`); switch (topic) { + case MPC_TOPICS.KEYGEN_STARTED: + this.logger.log(`[HANDLE] Processing KeygenStarted event`); + if (this.keygenStartedHandler) { + await this.keygenStartedHandler(payload as KeygenStartedPayload); + this.logger.log(`[HANDLE] KeygenStarted handler completed`); + } else { + this.logger.warn(`[HANDLE] No handler registered for KeygenStarted`); + } + break; + case MPC_TOPICS.KEYGEN_COMPLETED: + this.logger.log(`[HANDLE] Processing KeygenCompleted event`); + this.logger.log(`[HANDLE] publicKey: ${(payload as KeygenCompletedPayload).publicKey?.substring(0, 20)}...`); if (this.keygenCompletedHandler) { await this.keygenCompletedHandler(payload as KeygenCompletedPayload); + this.logger.log(`[HANDLE] KeygenCompleted handler completed`); + } else { + this.logger.warn(`[HANDLE] No handler registered for KeygenCompleted`); } break; case MPC_TOPICS.SIGNING_COMPLETED: + this.logger.log(`[HANDLE] Processing SigningCompleted event`); if (this.signingCompletedHandler) { await this.signingCompletedHandler(payload as SigningCompletedPayload); + this.logger.log(`[HANDLE] SigningCompleted handler completed`); + } else { + this.logger.warn(`[HANDLE] No handler registered for SigningCompleted`); } break; case MPC_TOPICS.SESSION_FAILED: + this.logger.log(`[HANDLE] Processing SessionFailed event`); + this.logger.log(`[HANDLE] sessionType: ${(payload as SessionFailedPayload).sessionType}`); + this.logger.log(`[HANDLE] errorMessage: ${(payload as SessionFailedPayload).errorMessage}`); if (this.sessionFailedHandler) { await this.sessionFailedHandler(payload as SessionFailedPayload); + this.logger.log(`[HANDLE] SessionFailed handler completed`); + } else { + this.logger.warn(`[HANDLE] No handler registered for SessionFailed`); } break; default: - this.logger.warn(`Unknown MPC topic: ${topic}`); + this.logger.warn(`[RECEIVE] Unknown MPC topic: ${topic}`); } } catch (error) { - this.logger.error(`Error processing MPC event from ${topic}`, error); + this.logger.error(`[ERROR] Error processing MPC event from ${topic}`, error); } }, }); - this.logger.log('Started consuming MPC events'); + this.logger.log(`[START] Started consuming MPC events`); } } diff --git a/backend/services/identity-service/src/shared/utils/index.ts b/backend/services/identity-service/src/shared/utils/index.ts new file mode 100644 index 00000000..ae712e5f --- /dev/null +++ b/backend/services/identity-service/src/shared/utils/index.ts @@ -0,0 +1 @@ +export * from './random-identity.util'; diff --git a/backend/services/identity-service/src/shared/utils/random-identity.util.ts b/backend/services/identity-service/src/shared/utils/random-identity.util.ts new file mode 100644 index 00000000..2178e878 --- /dev/null +++ b/backend/services/identity-service/src/shared/utils/random-identity.util.ts @@ -0,0 +1,158 @@ +/** + * 随机用户名和头像生成器 + */ + +// 形容词词库 (水果/美食主题) +const ADJECTIVES = [ + '快乐', '阳光', '活泼', '可爱', '勇敢', '聪明', '温暖', '甜蜜', + '闪亮', '酷炫', '神秘', '优雅', '热情', '淘气', '呆萌', '霸气', + '清新', '软糯', '香甜', '金色', '紫色', '粉色', '蓝色', '绿色', +]; + +// 名词词库 (水果主题 - 榴莲相关) +const NOUNS = [ + '榴莲', '芒果', '椰子', '菠萝', '龙眼', '荔枝', '山竹', '木瓜', + '百香果', '火龙果', '杨桃', '莲雾', '番石榴', '释迦', '红毛丹', + '勇士', '战士', '骑士', '猎手', '侠客', '英雄', '达人', '玩家', +]; + +// 生成随机用户名: 形容词 + 名词 + 随机数字 +export function generateRandomUsername(): string { + const adjective = ADJECTIVES[Math.floor(Math.random() * ADJECTIVES.length)]; + const noun = NOUNS[Math.floor(Math.random() * NOUNS.length)]; + const number = Math.floor(Math.random() * 90000) + 10000; // 5位数字 + return `${adjective}${noun}_${number}`; +} + +// 预定义的柔和配色方案 +const COLOR_PALETTES = [ + { bg: '#FFE4E1', primary: '#FF6B6B', secondary: '#4ECDC4' }, // 粉红+红+青 + { bg: '#E8F5E9', primary: '#66BB6A', secondary: '#FFA726' }, // 浅绿+绿+橙 + { bg: '#E3F2FD', primary: '#42A5F5', secondary: '#AB47BC' }, // 浅蓝+蓝+紫 + { bg: '#FFF3E0', primary: '#FF7043', secondary: '#26A69A' }, // 浅橙+橙+青 + { bg: '#F3E5F5', primary: '#AB47BC', secondary: '#42A5F5' }, // 浅紫+紫+蓝 + { bg: '#FFFDE7', primary: '#FFCA28', secondary: '#EC407A' }, // 浅黄+黄+粉 + { bg: '#E0F7FA', primary: '#26C6DA', secondary: '#7E57C2' }, // 浅青+青+紫 + { bg: '#FCE4EC', primary: '#EC407A', secondary: '#66BB6A' }, // 浅粉+粉+绿 +]; + +// 榴莲形状变体 +const DURIAN_SHAPES = [ + // 经典榴莲 + (color: string) => ` + + + + `, + // 圆润榴莲 + (color: string) => ` + + + + + + `, + // 可爱榴莲 + (color: string) => ` + + + + + `, +]; + +// 表情变体 +const FACE_EXPRESSIONS = [ + // 开心 + (x: number, y: number) => ` + + + + `, + // 眨眼 + (x: number, y: number) => ` + + + + `, + // 惊讶 + (x: number, y: number) => ` + + + + `, + // 酷 + (x: number, y: number) => ` + + + + `, + // 害羞 + (x: number, y: number) => ` + + + + + + `, +]; + +// 装饰元素 +const DECORATIONS = [ + // 星星 + (color: string) => ` + + `, + // 爱心 + (color: string) => ` + + `, + // 音符 + (color: string) => ` + + + + `, + // 闪光 + (color: string) => ` + + + + + `, + // 无装饰 + () => '', +]; + +/** + * 生成随机SVG头像 (榴莲主题) + */ +export function generateRandomAvatarSvg(): string { + // 随机选择配色 + const palette = COLOR_PALETTES[Math.floor(Math.random() * COLOR_PALETTES.length)]; + // 随机选择榴莲形状 + const shape = DURIAN_SHAPES[Math.floor(Math.random() * DURIAN_SHAPES.length)]; + // 随机选择表情 + const face = FACE_EXPRESSIONS[Math.floor(Math.random() * FACE_EXPRESSIONS.length)]; + // 随机选择装饰 (50%概率有装饰) + const decoration = Math.random() > 0.5 + ? DECORATIONS[Math.floor(Math.random() * (DECORATIONS.length - 1))] + : DECORATIONS[DECORATIONS.length - 1]; + + return ` + + ${shape(palette.primary)} + ${face(50, 52)} + ${decoration(palette.secondary)} +`; +} + +/** + * 生成完整的随机身份 + */ +export function generateRandomIdentity(): { username: string; avatarSvg: string } { + return { + username: generateRandomUsername(), + avatarSvg: generateRandomAvatarSvg(), + }; +} diff --git a/backend/services/mpc-service/src/application/event-handlers/keygen-requested.handler.ts b/backend/services/mpc-service/src/application/event-handlers/keygen-requested.handler.ts index b3a6b583..5b81a92e 100644 --- a/backend/services/mpc-service/src/application/event-handlers/keygen-requested.handler.ts +++ b/backend/services/mpc-service/src/application/event-handlers/keygen-requested.handler.ts @@ -13,6 +13,7 @@ import { MPC_CONSUME_TOPICS, KeygenRequestedPayload, } from '../../infrastructure/messaging/kafka/event-consumer.service'; +import { KeygenStartedEvent } from '../../domain/events/keygen-started.event'; import { KeygenCompletedEvent } from '../../domain/events/keygen-completed.event'; import { SessionFailedEvent } from '../../domain/events/session-failed.event'; import { SessionType } from '../../domain/enums'; @@ -28,18 +29,24 @@ export class KeygenRequestedHandler implements OnModuleInit { ) {} async onModuleInit() { + this.logger.log(`[INIT] KeygenRequestedHandler initializing...`); await this.eventConsumer.subscribe( MPC_CONSUME_TOPICS.KEYGEN_REQUESTED, this.handleMessage.bind(this), ); - this.logger.log(`Subscribed to ${MPC_CONSUME_TOPICS.KEYGEN_REQUESTED}`); + this.logger.log(`[INIT] Subscribed to ${MPC_CONSUME_TOPICS.KEYGEN_REQUESTED}`); } private async handleMessage(topic: string, payload: Record): Promise { + this.logger.log(`[HANDLE] Received keygen request from topic: ${topic}`); + this.logger.log(`[HANDLE] Payload: ${JSON.stringify(payload)}`); + const data = payload as unknown as KeygenRequestedPayload; const { sessionId, userId, username, threshold, totalParties, requireDelegate } = data; - this.logger.log(`Processing keygen request: userId=${userId}, username=${username}, sessionId=${sessionId}`); + this.logger.log(`[HANDLE] Parsed request: sessionId=${sessionId}`); + this.logger.log(`[HANDLE] userId=${userId}, username=${username}`); + this.logger.log(`[HANDLE] threshold=${threshold}, totalParties=${totalParties}, requireDelegate=${requireDelegate}`); try { // Step 1: Create keygen session via mpc-system @@ -53,6 +60,11 @@ export class KeygenRequestedHandler implements OnModuleInit { const mpcSessionId = createResult.sessionId; this.logger.log(`Keygen session created in mpc-system: ${mpcSessionId}`); + // Step 1.5: Publish KeygenStarted event to notify identity-service + const startedEvent = new KeygenStartedEvent(sessionId, userId, username, mpcSessionId); + await this.eventPublisher.publish(startedEvent); + this.logger.log(`Published KeygenStarted event: userId=${userId}, mpcSessionId=${mpcSessionId}`); + // Step 2: Poll for completion (with max retries) const result = await this.pollKeygenCompletion(mpcSessionId, 150, 2000); diff --git a/backend/services/mpc-service/src/domain/events/index.ts b/backend/services/mpc-service/src/domain/events/index.ts index b72413b5..bd3b49cd 100644 --- a/backend/services/mpc-service/src/domain/events/index.ts +++ b/backend/services/mpc-service/src/domain/events/index.ts @@ -15,6 +15,7 @@ export { ShareRevokedEvent } from './share-revoked.event'; export { ShareUsedEvent } from './share-used.event'; // Session Events +export { KeygenStartedEvent } from './keygen-started.event'; export { KeygenCompletedEvent } from './keygen-completed.event'; export { SigningCompletedEvent } from './signing-completed.event'; export { SessionFailedEvent } from './session-failed.event'; @@ -29,6 +30,7 @@ import { ShareCreatedEvent } from './share-created.event'; import { ShareRotatedEvent } from './share-rotated.event'; import { ShareRevokedEvent } from './share-revoked.event'; import { ShareUsedEvent } from './share-used.event'; +import { KeygenStartedEvent } from './keygen-started.event'; import { KeygenCompletedEvent } from './keygen-completed.event'; import { SigningCompletedEvent } from './signing-completed.event'; import { SessionFailedEvent } from './session-failed.event'; @@ -41,6 +43,7 @@ export type MPCDomainEvent = | ShareRotatedEvent | ShareRevokedEvent | ShareUsedEvent + | KeygenStartedEvent | KeygenCompletedEvent | SigningCompletedEvent | SessionFailedEvent @@ -54,6 +57,7 @@ export const MPC_TOPICS = { SHARE_ROTATED: 'mpc.ShareRotated', SHARE_REVOKED: 'mpc.ShareRevoked', SHARE_USED: 'mpc.ShareUsed', + KEYGEN_STARTED: 'mpc.KeygenStarted', KEYGEN_COMPLETED: 'mpc.KeygenCompleted', SIGNING_COMPLETED: 'mpc.SigningCompleted', SESSION_FAILED: 'mpc.SessionFailed', diff --git a/backend/services/mpc-service/src/domain/events/keygen-started.event.ts b/backend/services/mpc-service/src/domain/events/keygen-started.event.ts new file mode 100644 index 00000000..36374227 --- /dev/null +++ b/backend/services/mpc-service/src/domain/events/keygen-started.event.ts @@ -0,0 +1,40 @@ +/** + * KeygenStarted Event + * + * Emitted when a keygen session starts processing. + * Used to notify identity-service that keygen is in progress. + */ + +import { DomainEvent } from './domain-event.base'; + +export class KeygenStartedEvent extends DomainEvent { + constructor( + public readonly sessionId: string, + public readonly userId: string, + public readonly username: string, + public readonly mpcSessionId: string, + ) { + super(); + } + + get eventType(): string { + return 'KeygenStarted'; + } + + get aggregateId(): string { + return this.sessionId; + } + + get aggregateType(): string { + return 'PartySession'; + } + + get payload(): Record { + return { + sessionId: this.sessionId, + userId: this.userId, + username: this.username, + mpcSessionId: this.mpcSessionId, + }; + } +} diff --git a/backend/services/mpc-service/src/infrastructure/messaging/kafka/event-consumer.service.ts b/backend/services/mpc-service/src/infrastructure/messaging/kafka/event-consumer.service.ts index 4ae45032..962b343a 100644 --- a/backend/services/mpc-service/src/infrastructure/messaging/kafka/event-consumer.service.ts +++ b/backend/services/mpc-service/src/infrastructure/messaging/kafka/event-consumer.service.ts @@ -49,6 +49,11 @@ export class EventConsumerService implements OnModuleInit, OnModuleDestroy { const clientId = this.configService.get('KAFKA_CLIENT_ID') || 'mpc-service'; const groupId = this.configService.get('KAFKA_GROUP_ID') || 'mpc-service-group'; + this.logger.log(`[INIT] MPC Event Consumer initializing...`); + this.logger.log(`[INIT] ClientId: ${clientId}`); + this.logger.log(`[INIT] GroupId: ${groupId}`); + this.logger.log(`[INIT] Brokers: ${brokers.join(', ')}`); + this.kafka = new Kafka({ clientId, brokers, @@ -66,11 +71,12 @@ export class EventConsumerService implements OnModuleInit, OnModuleDestroy { }); try { + this.logger.log(`[CONNECT] Connecting Kafka consumer...`); await this.consumer.connect(); this.isConnected = true; - this.logger.log('Kafka consumer connected'); + this.logger.log(`[CONNECT] Kafka consumer connected successfully`); } catch (error) { - this.logger.error('Failed to connect Kafka consumer', error); + this.logger.error(`[ERROR] Failed to connect Kafka consumer`, error); } } @@ -86,17 +92,18 @@ export class EventConsumerService implements OnModuleInit, OnModuleDestroy { */ async subscribe(topic: string, handler: MessageHandler): Promise { if (!this.isConnected) { - this.logger.warn('Kafka not connected, cannot subscribe'); + this.logger.warn(`[SUBSCRIBE] Kafka not connected, cannot subscribe to ${topic}`); return; } this.handlers.set(topic, handler); + this.logger.log(`[SUBSCRIBE] Registering handler for topic: ${topic}`); try { await this.consumer.subscribe({ topic, fromBeginning: false }); - this.logger.log(`Subscribed to topic: ${topic}`); + this.logger.log(`[SUBSCRIBE] Successfully subscribed to topic: ${topic}`); } catch (error) { - this.logger.error(`Failed to subscribe to topic: ${topic}`, error); + this.logger.error(`[ERROR] Failed to subscribe to topic: ${topic}`, error); throw error; } } @@ -106,35 +113,48 @@ export class EventConsumerService implements OnModuleInit, OnModuleDestroy { */ async startConsuming(): Promise { if (!this.isConnected) { - this.logger.warn('Kafka not connected, cannot start consuming'); + this.logger.warn(`[START] Kafka not connected, cannot start consuming`); return; } + this.logger.log(`[START] Starting message consumption...`); + this.logger.log(`[START] Registered handlers for topics: ${Array.from(this.handlers.keys()).join(', ')}`); + await this.consumer.run({ eachMessage: async ({ topic, partition, message }: EachMessagePayload) => { + const offset = message.offset; + this.logger.log(`[RECEIVE] Message received: topic=${topic}, partition=${partition}, offset=${offset}`); + const handler = this.handlers.get(topic); if (!handler) { - this.logger.warn(`No handler for topic: ${topic}`); + this.logger.warn(`[RECEIVE] No handler registered for topic: ${topic}`); return; } try { const value = message.value?.toString(); if (!value) { - this.logger.warn('Empty message received'); + this.logger.warn(`[RECEIVE] Empty message received on ${topic}`); return; } - const parsed = JSON.parse(value); - this.logger.debug(`Received message from ${topic}: ${JSON.stringify(parsed)}`); + this.logger.log(`[RECEIVE] Raw message value: ${value.substring(0, 500)}...`); - await handler(topic, parsed.payload || parsed); + const parsed = JSON.parse(value); + const payload = parsed.payload || parsed; + + this.logger.log(`[RECEIVE] Parsed event: eventType=${parsed.eventType || 'unknown'}`); + this.logger.log(`[RECEIVE] Payload keys: ${Object.keys(payload).join(', ')}`); + + this.logger.log(`[HANDLE] Invoking handler for ${topic}...`); + await handler(topic, payload); + this.logger.log(`[HANDLE] Handler completed for ${topic}`); } catch (error) { - this.logger.error(`Error processing message from ${topic}`, error); + this.logger.error(`[ERROR] Error processing message from ${topic}`, error); } }, }); - this.logger.log('Started consuming messages'); + this.logger.log(`[START] Started consuming messages successfully`); } } diff --git a/backend/services/mpc-service/src/infrastructure/messaging/kafka/event-publisher.service.ts b/backend/services/mpc-service/src/infrastructure/messaging/kafka/event-publisher.service.ts index 9b6bf284..8360b217 100644 --- a/backend/services/mpc-service/src/infrastructure/messaging/kafka/event-publisher.service.ts +++ b/backend/services/mpc-service/src/infrastructure/messaging/kafka/event-publisher.service.ts @@ -22,6 +22,10 @@ export class EventPublisherService implements OnModuleInit, OnModuleDestroy { const brokers = this.configService.get('KAFKA_BROKERS')?.split(',') || ['localhost:9092']; const clientId = this.configService.get('KAFKA_CLIENT_ID') || 'mpc-party-service'; + this.logger.log(`[INIT] MPC Event Publisher initializing...`); + this.logger.log(`[INIT] ClientId: ${clientId}`); + this.logger.log(`[INIT] Brokers: ${brokers.join(', ')}`); + this.kafka = new Kafka({ clientId, brokers, @@ -38,11 +42,12 @@ export class EventPublisherService implements OnModuleInit, OnModuleDestroy { }); try { + this.logger.log(`[CONNECT] Connecting Kafka producer...`); await this.producer.connect(); this.isConnected = true; - this.logger.log('Kafka producer connected'); + this.logger.log(`[CONNECT] Kafka producer connected successfully`); } catch (error) { - this.logger.error('Failed to connect Kafka producer', error); + this.logger.error(`[ERROR] Failed to connect Kafka producer`, error); } } @@ -58,21 +63,30 @@ export class EventPublisherService implements OnModuleInit, OnModuleDestroy { */ async publish(event: DomainEvent): Promise { if (!this.isConnected) { - this.logger.warn('Kafka not connected, skipping event publish'); + this.logger.warn(`[PUBLISH] Kafka not connected, skipping event publish for ${event.eventType}`); return; } const topic = this.getTopicForEvent(event); + + this.logger.log(`[PUBLISH] Publishing event: type=${event.eventType}, topic=${topic}`); + this.logger.log(`[PUBLISH] EventId: ${event.eventId}`); + this.logger.log(`[PUBLISH] AggregateId: ${event.aggregateId}`); + + const messageValue = { + eventId: event.eventId, + eventType: event.eventType, + occurredAt: event.occurredAt.toISOString(), + aggregateId: event.aggregateId, + aggregateType: event.aggregateType, + payload: event.payload, + }; + + this.logger.log(`[PUBLISH] Payload keys: ${Object.keys(event.payload).join(', ')}`); + const message = { key: event.eventId, - value: JSON.stringify({ - eventId: event.eventId, - eventType: event.eventType, - occurredAt: event.occurredAt.toISOString(), - aggregateId: event.aggregateId, - aggregateType: event.aggregateType, - payload: event.payload, - }), + value: JSON.stringify(messageValue), headers: { eventType: event.eventType, aggregateType: event.aggregateType, @@ -85,9 +99,9 @@ export class EventPublisherService implements OnModuleInit, OnModuleDestroy { topic, messages: [message], }); - this.logger.debug(`Published event: ${event.eventType} to ${topic}`); + this.logger.log(`[PUBLISH] Successfully published ${event.eventType} to ${topic}`); } catch (error) { - this.logger.error(`Failed to publish event: ${event.eventType}`, error); + this.logger.error(`[ERROR] Failed to publish event: ${event.eventType}`, error); throw error; } } @@ -105,23 +119,28 @@ export class EventPublisherService implements OnModuleInit, OnModuleDestroy { * Publish with retry logic */ async publishWithRetry(event: DomainEvent, maxRetries = 3): Promise { + this.logger.log(`[RETRY] Publishing with retry: ${event.eventType}, maxRetries=${maxRetries}`); let lastError: Error | undefined; for (let attempt = 1; attempt <= maxRetries; attempt++) { try { + this.logger.log(`[RETRY] Attempt ${attempt}/${maxRetries} for ${event.eventType}`); await this.publish(event); + this.logger.log(`[RETRY] Successfully published on attempt ${attempt}`); return; } catch (error) { lastError = error instanceof Error ? error : new Error(String(error)); - this.logger.warn(`Publish attempt ${attempt} failed: ${lastError.message}`); + this.logger.warn(`[RETRY] Attempt ${attempt} failed: ${lastError.message}`); if (attempt < maxRetries) { const delay = Math.pow(2, attempt) * 100; // Exponential backoff + this.logger.log(`[RETRY] Waiting ${delay}ms before next attempt...`); await new Promise(resolve => setTimeout(resolve, delay)); } } } + this.logger.error(`[RETRY] All ${maxRetries} attempts failed for ${event.eventType}`); throw lastError; } @@ -131,6 +150,7 @@ export class EventPublisherService implements OnModuleInit, OnModuleDestroy { ShareRotated: MPC_TOPICS.SHARE_ROTATED, ShareRevoked: MPC_TOPICS.SHARE_REVOKED, ShareUsed: MPC_TOPICS.SHARE_USED, + KeygenStarted: MPC_TOPICS.KEYGEN_STARTED, KeygenCompleted: MPC_TOPICS.KEYGEN_COMPLETED, SigningCompleted: MPC_TOPICS.SIGNING_COMPLETED, SessionFailed: MPC_TOPICS.SESSION_FAILED,