diff --git a/backend/services/blockchain-service/src/infrastructure/kafka/event-publisher.service.ts b/backend/services/blockchain-service/src/infrastructure/kafka/event-publisher.service.ts index 94500a30..dd4b4723 100644 --- a/backend/services/blockchain-service/src/infrastructure/kafka/event-publisher.service.ts +++ b/backend/services/blockchain-service/src/infrastructure/kafka/event-publisher.service.ts @@ -103,6 +103,8 @@ export class EventPublisherService implements OnModuleInit, OnModuleDestroy { 'blockchain.deposit.confirmed': 'blockchain.deposits', 'blockchain.wallet.address.created': 'blockchain.wallets', 'blockchain.transaction.broadcasted': 'blockchain.transactions', + // MPC 签名请求 - 发送到 mpc-service 消费的 topic + 'blockchain.mpc.signing.requested': 'mpc.SigningRequested', }; return topicMap[eventType] || 'blockchain.events'; } diff --git a/backend/services/blockchain-service/src/infrastructure/kafka/mpc-event-consumer.service.ts b/backend/services/blockchain-service/src/infrastructure/kafka/mpc-event-consumer.service.ts index 36c99e88..fd948178 100644 --- a/backend/services/blockchain-service/src/infrastructure/kafka/mpc-event-consumer.service.ts +++ b/backend/services/blockchain-service/src/infrastructure/kafka/mpc-event-consumer.service.ts @@ -1,8 +1,10 @@ /** * MPC Event Consumer Service for Blockchain Service * - * Consumes MPC keygen completion events from mpc-service via Kafka. - * Derives wallet addresses from public keys and publishes WalletAddressCreated events. + * Consumes MPC events from mpc-service via Kafka: + * - KeygenCompleted: derives wallet addresses from public keys + * - SigningCompleted: returns signature for hot wallet transfers + * - SessionFailed: handles keygen/signing failures */ import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; @@ -12,6 +14,7 @@ import { Kafka, Consumer, logLevel, EachMessagePayload } from 'kafkajs'; // MPC Event Topics (events from mpc-service) export const MPC_TOPICS = { KEYGEN_COMPLETED: 'mpc.KeygenCompleted', + SIGNING_COMPLETED: 'mpc.SigningCompleted', SESSION_FAILED: 'mpc.SessionFailed', } as const; @@ -34,15 +37,30 @@ export interface KeygenCompletedPayload { }; } +export interface SigningCompletedPayload { + sessionId: string; + partyId: string; + messageHash: string; + signature: string; + publicKey: string; + extraPayload?: { + userId: string; + username: string; + mpcSessionId: string; + source?: string; // 'blockchain-service' | 'identity-service' + }; +} + export interface SessionFailedPayload { sessionId: string; partyId: string; - sessionType: string; + sessionType: string; // 'keygen' | 'sign' errorMessage: string; errorCode?: string; extraPayload?: { userId: string; username: string; + source?: string; }; } @@ -56,7 +74,9 @@ export class MpcEventConsumerService implements OnModuleInit, OnModuleDestroy { private isConnected = false; private keygenCompletedHandler?: MpcEventHandler; + private signingCompletedHandler?: MpcEventHandler; private sessionFailedHandler?: MpcEventHandler; + private signingFailedHandler?: MpcEventHandler; constructor(private readonly configService: ConfigService) {} @@ -120,13 +140,29 @@ export class MpcEventConsumerService implements OnModuleInit, OnModuleDestroy { } /** - * Register handler for session failed events + * Register handler for signing completed events + */ + onSigningCompleted(handler: MpcEventHandler): void { + this.signingCompletedHandler = handler; + this.logger.log(`[REGISTER] SigningCompleted handler registered`); + } + + /** + * Register handler for session failed events (keygen) */ onSessionFailed(handler: MpcEventHandler): void { this.sessionFailedHandler = handler; this.logger.log(`[REGISTER] SessionFailed handler registered`); } + /** + * Register handler for signing failed events + */ + onSigningFailed(handler: MpcEventHandler): void { + this.signingFailedHandler = handler; + this.logger.log(`[REGISTER] SigningFailed handler registered`); + } + private async startConsuming(): Promise { await this.consumer.run({ eachMessage: async ({ topic, partition, message }: EachMessagePayload) => { @@ -161,15 +197,35 @@ export class MpcEventConsumerService implements OnModuleInit, OnModuleDestroy { } break; + case MPC_TOPICS.SIGNING_COMPLETED: + this.logger.log(`[HANDLE] Processing SigningCompleted event`); + this.logger.log(`[HANDLE] sessionId: ${(payload as SigningCompletedPayload).sessionId}`); + this.logger.log(`[HANDLE] signature: ${(payload as SigningCompletedPayload).signature?.substring(0, 20)}...`); + if (this.signingCompletedHandler) { + await this.signingCompletedHandler(payload as SigningCompletedPayload); + this.logger.log(`[HANDLE] SigningCompleted handler completed successfully`); + } else { + this.logger.warn(`[HANDLE] No handler registered for SigningCompleted`); + } + break; + case MPC_TOPICS.SESSION_FAILED: this.logger.log(`[HANDLE] Processing SessionFailed event`); this.logger.log(`[HANDLE] sessionType: ${(payload as SessionFailedPayload).sessionType}`); this.logger.log(`[HANDLE] errorMessage: ${(payload as SessionFailedPayload).errorMessage}`); - if (this.sessionFailedHandler) { - await this.sessionFailedHandler(payload as SessionFailedPayload); - this.logger.log(`[HANDLE] SessionFailed handler completed`); + + const failedPayload = payload as SessionFailedPayload; + // Route to appropriate handler based on session type + if (failedPayload.sessionType === 'sign') { + if (this.signingFailedHandler) { + await this.signingFailedHandler(failedPayload); + this.logger.log(`[HANDLE] SigningFailed handler completed`); + } } else { - this.logger.warn(`[HANDLE] No handler registered for SessionFailed`); + if (this.sessionFailedHandler) { + await this.sessionFailedHandler(failedPayload); + this.logger.log(`[HANDLE] SessionFailed handler completed`); + } } break; diff --git a/backend/services/blockchain-service/src/infrastructure/mpc/mpc-signing.client.ts b/backend/services/blockchain-service/src/infrastructure/mpc/mpc-signing.client.ts index c2dfdc62..7f4fcea2 100644 --- a/backend/services/blockchain-service/src/infrastructure/mpc/mpc-signing.client.ts +++ b/backend/services/blockchain-service/src/infrastructure/mpc/mpc-signing.client.ts @@ -1,14 +1,23 @@ /** * MPC Signing Client * - * 调用 mpc-service 进行 MPC 签名 + * 通过 Kafka 事件与 mpc-service 通信进行 MPC 签名 * 用于热钱包的 ERC20 转账签名 + * + * 事件流: + * blockchain-service → Kafka(mpc.SigningRequested) → mpc-service + * mpc-service → Kafka(mpc.SigningCompleted) → blockchain-service */ -import { Injectable, Logger } from '@nestjs/common'; +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; -import { HttpService } from '@nestjs/axios'; -import { firstValueFrom } from 'rxjs'; +import { randomUUID } from 'crypto'; +import { EventPublisherService } from '@/infrastructure/kafka/event-publisher.service'; +import { + MpcEventConsumerService, + SigningCompletedPayload, + SessionFailedPayload, +} from '@/infrastructure/kafka/mpc-event-consumer.service'; export interface CreateSigningInput { username: string; @@ -21,20 +30,31 @@ export interface SigningResult { signature?: string; } +// 签名结果回调 +type SigningCallback = (signature: string | null, error?: string) => void; + +// MPC 签名请求 Topic +export const MPC_SIGNING_TOPIC = 'mpc.SigningRequested'; + @Injectable() -export class MpcSigningClient { +export class MpcSigningClient implements OnModuleInit { private readonly logger = new Logger(MpcSigningClient.name); - private readonly mpcServiceUrl: string; private readonly hotWalletUsername: string; private readonly hotWalletAddress: string; - private readonly pollingIntervalMs: number = 2000; - private readonly maxPollingAttempts: number = 150; // 5 minutes max + private readonly signingTimeoutMs: number = 300000; // 5 minutes + + // 待处理的签名请求回调 Map + private pendingRequests: Map void; + reject: (error: Error) => void; + timeout: NodeJS.Timeout; + }> = new Map(); constructor( private readonly configService: ConfigService, - private readonly httpService: HttpService, + private readonly eventPublisher: EventPublisherService, + private readonly mpcEventConsumer: MpcEventConsumerService, ) { - this.mpcServiceUrl = this.configService.get('MPC_SERVICE_URL', 'http://localhost:3013'); this.hotWalletUsername = this.configService.get('HOT_WALLET_USERNAME', ''); this.hotWalletAddress = this.configService.get('HOT_WALLET_ADDRESS', ''); @@ -45,9 +65,16 @@ export class MpcSigningClient { this.logger.warn('[INIT] HOT_WALLET_ADDRESS not configured'); } - this.logger.log(`[INIT] MPC Service URL: ${this.mpcServiceUrl}`); this.logger.log(`[INIT] Hot Wallet Username: ${this.hotWalletUsername || '(not configured)'}`); this.logger.log(`[INIT] Hot Wallet Address: ${this.hotWalletAddress || '(not configured)'}`); + this.logger.log(`[INIT] Using Kafka event-driven signing`); + } + + async onModuleInit() { + // 注册签名完成和失败事件处理器 + this.mpcEventConsumer.onSigningCompleted(this.handleSigningCompleted.bind(this)); + this.mpcEventConsumer.onSigningFailed(this.handleSigningFailed.bind(this)); + this.logger.log('[INIT] MPC signing event handlers registered'); } /** @@ -72,66 +99,7 @@ export class MpcSigningClient { } /** - * 创建 MPC 签名会话 - */ - async createSigningSession(messageHash: string): Promise<{ sessionId: string; status: string }> { - this.logger.log(`[SIGN] Creating signing session for messageHash: ${messageHash.slice(0, 16)}...`); - - if (!this.hotWalletUsername) { - throw new Error('Hot wallet username not configured'); - } - - const response = await firstValueFrom( - this.httpService.post<{ - sessionId: string; - status: string; - }>( - `${this.mpcServiceUrl}/api/v1/mpc/sign`, - { - username: this.hotWalletUsername, - messageHash, - }, - { - headers: { 'Content-Type': 'application/json' }, - timeout: 30000, - }, - ), - ); - - this.logger.log(`[SIGN] Session created: ${response.data.sessionId}`); - return { - sessionId: response.data.sessionId, - status: response.data.status, - }; - } - - /** - * 获取签名会话状态 - */ - async getSigningStatus(sessionId: string): Promise { - const response = await firstValueFrom( - this.httpService.get<{ - sessionId: string; - status: string; - signature?: string; - }>( - `${this.mpcServiceUrl}/api/v1/mpc/sign/${sessionId}/status`, - { - headers: { 'Content-Type': 'application/json' }, - timeout: 10000, - }, - ), - ); - - return { - sessionId: response.data.sessionId, - status: response.data.status, - signature: response.data.signature, - }; - } - - /** - * 签名消息(创建会话并等待完成) + * 签名消息(通过 Kafka 事件驱动) * * @param messageHash 要签名的消息哈希 (hex string with 0x prefix) * @returns 签名结果 (hex string) @@ -139,49 +107,91 @@ export class MpcSigningClient { async signMessage(messageHash: string): Promise { this.logger.log(`[SIGN] Starting MPC signing for: ${messageHash.slice(0, 16)}...`); - // Step 1: 创建签名会话 - const session = await this.createSigningSession(messageHash); - this.logger.log(`[SIGN] Session ID: ${session.sessionId}`); - - // Step 2: 轮询等待签名完成 - const result = await this.pollForCompletion(session.sessionId); - - if (result.status === 'completed' && result.signature) { - this.logger.log(`[SIGN] Signature obtained: ${result.signature.slice(0, 20)}...`); - return result.signature; + if (!this.hotWalletUsername) { + throw new Error('Hot wallet username not configured'); } - throw new Error(`MPC signing failed with status: ${result.status}`); + const sessionId = randomUUID(); + this.logger.log(`[SIGN] Session ID: ${sessionId}`); + + // 创建 Promise 等待签名结果 + const signaturePromise = new Promise((resolve, reject) => { + // 设置超时 + const timeout = setTimeout(() => { + this.pendingRequests.delete(sessionId); + reject(new Error(`MPC signing timeout after ${this.signingTimeoutMs}ms`)); + }, this.signingTimeoutMs); + + // 保存到待处理队列 + this.pendingRequests.set(sessionId, { resolve, reject, timeout }); + }); + + // 发布签名请求事件到 Kafka + try { + await this.eventPublisher.publish({ + eventType: 'blockchain.mpc.signing.requested', + toPayload: () => ({ + sessionId, + userId: 'system', // 系统热钱包 + username: this.hotWalletUsername, + messageHash, + source: 'blockchain-service', + }), + eventId: sessionId, + occurredAt: new Date(), + }); + + this.logger.log(`[SIGN] Signing request published to Kafka: sessionId=${sessionId}`); + } catch (error) { + // 发布失败,清理待处理队列 + const pending = this.pendingRequests.get(sessionId); + if (pending) { + clearTimeout(pending.timeout); + this.pendingRequests.delete(sessionId); + } + throw error; + } + + // 等待签名结果 + const signature = await signaturePromise; + this.logger.log(`[SIGN] Signature obtained: ${signature.slice(0, 20)}...`); + return signature; } /** - * 轮询等待签名完成 + * 处理签名完成事件 */ - private async pollForCompletion(sessionId: string): Promise { - for (let attempt = 0; attempt < this.maxPollingAttempts; attempt++) { - const result = await this.getSigningStatus(sessionId); + private async handleSigningCompleted(payload: SigningCompletedPayload): Promise { + const sessionId = payload.sessionId; + this.logger.log(`[EVENT] Signing completed: sessionId=${sessionId}`); - this.logger.debug(`[POLL] Session ${sessionId}: status=${result.status}, attempt=${attempt + 1}`); + const pending = this.pendingRequests.get(sessionId); + if (pending) { + clearTimeout(pending.timeout); + this.pendingRequests.delete(sessionId); - if (result.status === 'completed') { - return result; + if (payload.signature) { + pending.resolve(payload.signature); + } else { + pending.reject(new Error('Signing completed but no signature returned')); } - - if (result.status === 'failed' || result.status === 'expired') { - return result; - } - - // 等待下一次轮询 - await this.sleep(this.pollingIntervalMs); + } else { + this.logger.warn(`[EVENT] No pending request for sessionId=${sessionId}`); } - - return { - sessionId, - status: 'timeout', - }; } - private sleep(ms: number): Promise { - return new Promise(resolve => setTimeout(resolve, ms)); + /** + * 处理签名失败事件 + */ + private async handleSigningFailed(payload: SessionFailedPayload): Promise { + const sessionId = payload.sessionId; + this.logger.warn(`[EVENT] Signing failed: sessionId=${sessionId}, error=${payload.errorMessage}`); + + const pending = this.pendingRequests.get(sessionId); + if (pending) { + clearTimeout(pending.timeout); + this.pendingRequests.delete(sessionId); + pending.reject(new Error(`MPC signing failed: ${payload.errorMessage}`)); + } } } diff --git a/backend/services/mpc-service/src/application/event-handlers/signing-requested.handler.ts b/backend/services/mpc-service/src/application/event-handlers/signing-requested.handler.ts index 604e0e39..5ab0b337 100644 --- a/backend/services/mpc-service/src/application/event-handlers/signing-requested.handler.ts +++ b/backend/services/mpc-service/src/application/event-handlers/signing-requested.handler.ts @@ -1,8 +1,12 @@ /** * SigningRequested Event Handler * - * Handles signing requests from identity-service via Kafka. + * Handles signing requests from identity-service and blockchain-service via Kafka. * Processes the signing and publishes completion/failure events. + * + * 事件流: + * - identity-service → mpc.SigningRequested → mpc-service → mpc.SigningCompleted → identity-service + * - blockchain-service → mpc.SigningRequested → mpc-service → mpc.SigningCompleted → blockchain-service */ import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; @@ -38,8 +42,10 @@ export class SigningRequestedHandler implements OnModuleInit { private async handleMessage(topic: string, payload: Record): Promise { const data = payload as unknown as SigningRequestedPayload; const { sessionId, userId, username, messageHash, userShare } = data; + // source 标识请求来源: 'blockchain-service' | 'identity-service' + const source = (data as any).source || 'identity-service'; - this.logger.log(`Processing signing request: userId=${userId}, username=${username}, sessionId=${sessionId}`); + this.logger.log(`Processing signing request: userId=${userId}, username=${username}, sessionId=${sessionId}, source=${source}`); try { // Step 1: Create signing session via mpc-system @@ -65,11 +71,12 @@ export class SigningRequestedHandler implements OnModuleInit { '', // publicKey - not needed for signing result ); - // Add extra payload for identity-service + // Add extra payload for the requesting service (identity-service or blockchain-service) (completedEvent as any).extraPayload = { userId, username, mpcSessionId, + source, }; await this.eventPublisher.publishWithRetry(completedEvent); @@ -82,10 +89,10 @@ export class SigningRequestedHandler implements OnModuleInit { SessionType.SIGN, `Signing failed with status: ${result.status}`, ); - (failedEvent as any).extraPayload = { userId, username }; + (failedEvent as any).extraPayload = { userId, username, source }; await this.eventPublisher.publishWithRetry(failedEvent); - this.logger.warn(`Signing failed: userId=${userId}, status=${result.status}`); + this.logger.warn(`Signing failed: userId=${userId}, status=${result.status}, source=${source}`); } } catch (error) { this.logger.error(`Signing processing error: userId=${userId}`, error); @@ -97,7 +104,7 @@ export class SigningRequestedHandler implements OnModuleInit { SessionType.SIGN, error instanceof Error ? error.message : 'Unknown error', ); - (failedEvent as any).extraPayload = { userId, username }; + (failedEvent as any).extraPayload = { userId, username, source }; try { await this.eventPublisher.publishWithRetry(failedEvent); diff --git a/backend/services/mpc-service/src/infrastructure/messaging/kafka/event-consumer.service.ts b/backend/services/mpc-service/src/infrastructure/messaging/kafka/event-consumer.service.ts index e0ec71a4..6e0509b6 100644 --- a/backend/services/mpc-service/src/infrastructure/messaging/kafka/event-consumer.service.ts +++ b/backend/services/mpc-service/src/infrastructure/messaging/kafka/event-consumer.service.ts @@ -2,7 +2,7 @@ * Event Consumer Service * * Consumes domain events from Kafka for async processing. - * Handles keygen and signing requests from identity-service. + * Handles keygen and signing requests from identity-service and blockchain-service. */ import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; @@ -31,6 +31,7 @@ export interface SigningRequestedPayload { username: string; messageHash: string; userShare?: string; + source?: string; // 'identity-service' | 'blockchain-service' } export type MessageHandler = (topic: string, payload: Record) => Promise;