diff --git a/backend/services/identity-service/src/infrastructure/external/mpc/mpc-client.service.ts b/backend/services/identity-service/src/infrastructure/external/mpc/mpc-client.service.ts index 32083203..74339305 100644 --- a/backend/services/identity-service/src/infrastructure/external/mpc/mpc-client.service.ts +++ b/backend/services/identity-service/src/infrastructure/external/mpc/mpc-client.service.ts @@ -3,20 +3,38 @@ * * 与 mpc-service (NestJS) 通信的客户端服务 * + * 支持两种模式: + * 1. 同步模式 (legacy): 直接 HTTP 调用 + 轮询等待结果 + * 2. 事件驱动模式 (推荐): 发布 Kafka 事件,异步接收结果 + * * 调用路径 (DDD 分领域): * identity-service (身份域) → mpc-service (MPC域/NestJS) → mpc-system (Go/TSS实现) * - * 业务流程: - * 1. identity-service 调用 mpc-service 的 keygen API - * 2. mpc-service 协调 mpc-system 完成 TSS keygen - * 3. 返回公钥和 delegate share (用户分片) 给 identity-service + * 事件驱动流程: + * 1. identity-service 发布 mpc.KeygenRequested 事件 + * 2. mpc-service 消费事件,协调 mpc-system 完成 TSS keygen + * 3. mpc-service 发布 mpc.KeygenCompleted 事件 + * 4. identity-service 消费事件,更新用户钱包地址 */ -import { Injectable, Logger } from '@nestjs/common'; +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; import { HttpService } from '@nestjs/axios'; import { ConfigService } from '@nestjs/config'; import { firstValueFrom } from 'rxjs'; import { createHash, randomUUID } from 'crypto'; +import { EventPublisherService, IDENTITY_TOPICS } from '../../kafka/event-publisher.service'; +import { + MpcEventConsumerService, + KeygenCompletedPayload, + SigningCompletedPayload, + SessionFailedPayload, +} from '../../kafka/mpc-event-consumer.service'; + +// MPC Request Topics (发布到 mpc-service) +export const MPC_REQUEST_TOPICS = { + KEYGEN_REQUESTED: 'mpc.KeygenRequested', + SIGNING_REQUESTED: 'mpc.SigningRequested', +} as const; export interface KeygenRequest { sessionId: string; @@ -51,23 +69,285 @@ export interface SigningResult { messageHash: string; } +// 异步请求接口 (事件驱动模式) +export interface AsyncKeygenRequest { + userId: string; + username: string; + threshold: number; + totalParties: number; + requireDelegate: boolean; +} + +export interface AsyncKeygenResponse { + sessionId: string; + status: 'pending' | 'processing'; +} + +export interface AsyncSigningRequest { + userId: string; + username: string; + messageHash: string; + userShare?: string; +} + +export interface AsyncSigningResponse { + sessionId: string; + status: 'pending' | 'processing'; +} + +// 结果回调类型 +export type KeygenResultCallback = (result: KeygenResult | null, error?: string) => Promise; +export type SigningResultCallback = (result: SigningResult | null, error?: string) => Promise; + @Injectable() -export class MpcClientService { +export class MpcClientService implements OnModuleInit { private readonly logger = new Logger(MpcClientService.name); private readonly mpcServiceUrl: string; // mpc-service (NestJS) URL private readonly mpcMode: string; + private readonly useEventDriven: boolean; private readonly pollIntervalMs = 2000; private readonly maxPollAttempts = 150; // 5 minutes max + // 待处理的 keygen/signing 请求回调 + private pendingKeygenCallbacks: Map = new Map(); + private pendingSigningCallbacks: Map = new Map(); + constructor( private readonly httpService: HttpService, private readonly configService: ConfigService, + private readonly eventPublisher: EventPublisherService, + private readonly mpcEventConsumer: MpcEventConsumerService, ) { // 连接 mpc-service (NestJS) this.mpcServiceUrl = this.configService.get('MPC_SERVICE_URL', 'http://localhost:3001'); this.mpcMode = this.configService.get('MPC_MODE', 'local'); + this.useEventDriven = this.configService.get('MPC_USE_EVENT_DRIVEN', 'true') === 'true'; } + async onModuleInit() { + // 注册 MPC 事件处理器 + this.mpcEventConsumer.onKeygenCompleted(this.handleKeygenCompleted.bind(this)); + this.mpcEventConsumer.onSigningCompleted(this.handleSigningCompleted.bind(this)); + this.mpcEventConsumer.onSessionFailed(this.handleSessionFailed.bind(this)); + this.logger.log('MPC event handlers registered'); + } + + // ========================================================================== + // 事件驱动模式 - 异步 API + // ========================================================================== + + /** + * 异步发起 keygen 请求 (事件驱动) + * 立即返回 sessionId,结果通过回调处理 + */ + async requestKeygenAsync( + request: AsyncKeygenRequest, + callback?: KeygenResultCallback, + ): Promise { + const sessionId = this.generateSessionId(); + + this.logger.log(`Requesting async keygen: userId=${request.userId}, sessionId=${sessionId}`); + + // 如果是本地模式,直接执行并回调 + if (this.mpcMode === 'local') { + this.executeLocalKeygenWithCallback(sessionId, request, callback); + return { sessionId, status: 'processing' }; + } + + // 注册回调 + if (callback) { + this.pendingKeygenCallbacks.set(sessionId, callback); + } + + // 发布 keygen 请求事件 + await this.eventPublisher.publish(MPC_REQUEST_TOPICS.KEYGEN_REQUESTED, { + eventId: sessionId, + eventType: 'KeygenRequested', + occurredAt: new Date().toISOString(), + aggregateId: request.userId, + aggregateType: 'UserAccount', + payload: { + sessionId, + userId: request.userId, + username: request.username, + threshold: request.threshold, + totalParties: request.totalParties, + requireDelegate: request.requireDelegate, + }, + }); + + this.logger.log(`Keygen request published: sessionId=${sessionId}`); + return { sessionId, status: 'pending' }; + } + + /** + * 异步发起 signing 请求 (事件驱动) + */ + async requestSigningAsync( + request: AsyncSigningRequest, + callback?: SigningResultCallback, + ): Promise { + const sessionId = this.generateSessionId(); + + this.logger.log(`Requesting async signing: userId=${request.userId}, sessionId=${sessionId}`); + + // 如果是本地模式,直接执行并回调 + if (this.mpcMode === 'local') { + this.executeLocalSigningWithCallback(sessionId, request, callback); + return { sessionId, status: 'processing' }; + } + + // 注册回调 + if (callback) { + this.pendingSigningCallbacks.set(sessionId, callback); + } + + // 发布 signing 请求事件 + await this.eventPublisher.publish(MPC_REQUEST_TOPICS.SIGNING_REQUESTED, { + eventId: sessionId, + eventType: 'SigningRequested', + occurredAt: new Date().toISOString(), + aggregateId: request.userId, + aggregateType: 'UserAccount', + payload: { + sessionId, + userId: request.userId, + username: request.username, + messageHash: request.messageHash, + userShare: request.userShare, + }, + }); + + this.logger.log(`Signing request published: sessionId=${sessionId}`); + return { sessionId, status: 'pending' }; + } + + // ========================================================================== + // 事件处理器 - 处理 MPC 完成事件 + // ========================================================================== + + private async handleKeygenCompleted(payload: KeygenCompletedPayload): Promise { + const sessionId = payload.sessionId; + const callback = this.pendingKeygenCallbacks.get(sessionId); + + this.logger.log(`Keygen completed event received: sessionId=${sessionId}`); + + if (callback) { + try { + const result: KeygenResult = { + sessionId, + publicKey: payload.publicKey, + delegateShare: payload.extraPayload?.delegateShare || { + partyId: payload.partyId, + partyIndex: 0, + encryptedShare: '', + }, + serverParties: payload.extraPayload?.serverParties || [], + }; + await callback(result); + } catch (error) { + this.logger.error(`Keygen callback error: sessionId=${sessionId}`, error); + } finally { + this.pendingKeygenCallbacks.delete(sessionId); + } + } + } + + private async handleSigningCompleted(payload: SigningCompletedPayload): Promise { + const sessionId = payload.sessionId; + const callback = this.pendingSigningCallbacks.get(sessionId); + + this.logger.log(`Signing completed event received: sessionId=${sessionId}`); + + if (callback) { + try { + const result: SigningResult = { + sessionId, + signature: payload.signature, + messageHash: payload.messageHash, + }; + await callback(result); + } catch (error) { + this.logger.error(`Signing callback error: sessionId=${sessionId}`, error); + } finally { + this.pendingSigningCallbacks.delete(sessionId); + } + } + } + + private async handleSessionFailed(payload: SessionFailedPayload): Promise { + const sessionId = payload.sessionId; + const sessionType = payload.sessionType; + + this.logger.warn(`Session failed event received: sessionId=${sessionId}, type=${sessionType}`); + + if (sessionType === 'keygen') { + const callback = this.pendingKeygenCallbacks.get(sessionId); + if (callback) { + await callback(null, payload.errorMessage); + this.pendingKeygenCallbacks.delete(sessionId); + } + } else if (sessionType === 'sign') { + const callback = this.pendingSigningCallbacks.get(sessionId); + if (callback) { + await callback(null, payload.errorMessage); + this.pendingSigningCallbacks.delete(sessionId); + } + } + } + + // ========================================================================== + // 本地模式辅助方法 + // ========================================================================== + + private async executeLocalKeygenWithCallback( + sessionId: string, + request: AsyncKeygenRequest, + callback?: KeygenResultCallback, + ): Promise { + try { + const result = await this.executeLocalKeygen({ + sessionId, + username: request.username, + threshold: request.threshold, + totalParties: request.totalParties, + requireDelegate: request.requireDelegate, + }); + if (callback) { + await callback(result); + } + } catch (error) { + if (callback) { + await callback(null, error instanceof Error ? error.message : 'Unknown error'); + } + } + } + + private async executeLocalSigningWithCallback( + sessionId: string, + request: AsyncSigningRequest, + callback?: SigningResultCallback, + ): Promise { + try { + const result = await this.executeLocalSigning({ + username: request.username, + messageHash: request.messageHash, + userShare: request.userShare, + }); + if (callback) { + await callback(result); + } + } catch (error) { + if (callback) { + await callback(null, error instanceof Error ? error.message : 'Unknown error'); + } + } + } + + // ========================================================================== + // 同步模式 (Legacy) - 保留兼容性 + // ========================================================================== + /** * 生成新的会话ID (必须是纯 UUID 格式) */ diff --git a/backend/services/identity-service/src/infrastructure/external/mpc/mpc.module.ts b/backend/services/identity-service/src/infrastructure/external/mpc/mpc.module.ts index 31dadf9a..be1ea628 100644 --- a/backend/services/identity-service/src/infrastructure/external/mpc/mpc.module.ts +++ b/backend/services/identity-service/src/infrastructure/external/mpc/mpc.module.ts @@ -2,6 +2,7 @@ import { Module } from '@nestjs/common'; import { HttpModule } from '@nestjs/axios'; import { MpcWalletService } from './mpc-wallet.service'; import { MpcClientService } from './mpc-client.service'; +import { KafkaModule } from '../../kafka/kafka.module'; @Module({ imports: [ @@ -9,6 +10,7 @@ import { MpcClientService } from './mpc-client.service'; timeout: 300000, // MPC 操作可能需要较长时间 maxRedirects: 5, }), + KafkaModule, // 用于事件驱动模式 ], providers: [MpcWalletService, MpcClientService], exports: [MpcWalletService, MpcClientService], diff --git a/backend/services/identity-service/src/infrastructure/kafka/index.ts b/backend/services/identity-service/src/infrastructure/kafka/index.ts index bdb7c744..f9d2d6e3 100644 --- a/backend/services/identity-service/src/infrastructure/kafka/index.ts +++ b/backend/services/identity-service/src/infrastructure/kafka/index.ts @@ -3,3 +3,4 @@ export * from './event-publisher.service'; export * from './event-consumer.controller'; export * from './dead-letter.service'; export * from './event-retry.service'; +export * from './mpc-event-consumer.service'; diff --git a/backend/services/identity-service/src/infrastructure/kafka/kafka.module.ts b/backend/services/identity-service/src/infrastructure/kafka/kafka.module.ts index 79ab69c4..0d0ef1b2 100644 --- a/backend/services/identity-service/src/infrastructure/kafka/kafka.module.ts +++ b/backend/services/identity-service/src/infrastructure/kafka/kafka.module.ts @@ -1,8 +1,15 @@ import { Module } from '@nestjs/common'; import { EventPublisherService } from './event-publisher.service'; +import { MpcEventConsumerService } from './mpc-event-consumer.service'; @Module({ - providers: [EventPublisherService], - exports: [EventPublisherService], + providers: [ + EventPublisherService, + MpcEventConsumerService, + ], + exports: [ + EventPublisherService, + MpcEventConsumerService, + ], }) export class KafkaModule {} diff --git a/backend/services/identity-service/src/infrastructure/kafka/mpc-event-consumer.service.ts b/backend/services/identity-service/src/infrastructure/kafka/mpc-event-consumer.service.ts new file mode 100644 index 00000000..a0ea22c2 --- /dev/null +++ b/backend/services/identity-service/src/infrastructure/kafka/mpc-event-consumer.service.ts @@ -0,0 +1,187 @@ +/** + * MPC Event Consumer Service + * + * Consumes MPC keygen/signing completion events from mpc-service via Kafka. + * Updates user wallet addresses when keygen completes. + */ + +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { Kafka, Consumer, logLevel, EachMessagePayload } from 'kafkajs'; + +// MPC Event Topics +export const MPC_TOPICS = { + KEYGEN_COMPLETED: 'mpc.KeygenCompleted', + SIGNING_COMPLETED: 'mpc.SigningCompleted', + SESSION_FAILED: 'mpc.SessionFailed', +} as const; + +export interface KeygenCompletedPayload { + sessionId: string; + partyId: string; + publicKey: string; + shareId: string; + threshold: string; + extraPayload?: { + userId: string; + username: string; + delegateShare?: { + partyId: string; + partyIndex: number; + encryptedShare: string; + }; + serverParties?: string[]; + }; +} + +export interface SigningCompletedPayload { + sessionId: string; + partyId: string; + messageHash: string; + signature: string; + publicKey: string; + extraPayload?: { + userId: string; + username: string; + mpcSessionId: string; + }; +} + +export interface SessionFailedPayload { + sessionId: string; + partyId: string; + sessionType: string; + errorMessage: string; + errorCode?: string; + extraPayload?: { + userId: string; + username: string; + }; +} + +export type MpcEventHandler = (payload: T) => Promise; + +@Injectable() +export class MpcEventConsumerService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(MpcEventConsumerService.name); + private kafka: Kafka; + private consumer: Consumer; + private isConnected = false; + + private keygenCompletedHandler?: MpcEventHandler; + private signingCompletedHandler?: MpcEventHandler; + private sessionFailedHandler?: MpcEventHandler; + + constructor(private readonly configService: ConfigService) {} + + async onModuleInit() { + const brokers = this.configService.get('KAFKA_BROKERS')?.split(',') || ['localhost:9092']; + const clientId = this.configService.get('KAFKA_CLIENT_ID') || 'identity-service'; + const groupId = 'identity-service-mpc-events'; + + this.kafka = new Kafka({ + clientId, + brokers, + logLevel: logLevel.WARN, + retry: { + initialRetryTime: 100, + retries: 8, + }, + }); + + this.consumer = this.kafka.consumer({ + groupId, + sessionTimeout: 30000, + heartbeatInterval: 3000, + }); + + try { + await this.consumer.connect(); + this.isConnected = true; + this.logger.log('MPC Event Kafka consumer connected'); + + // Subscribe to MPC topics + await this.consumer.subscribe({ topics: Object.values(MPC_TOPICS), fromBeginning: false }); + this.logger.log(`Subscribed to MPC topics: ${Object.values(MPC_TOPICS).join(', ')}`); + + // Start consuming + await this.startConsuming(); + } catch (error) { + this.logger.error('Failed to connect MPC Event Kafka consumer', error); + } + } + + async onModuleDestroy() { + if (this.isConnected) { + await this.consumer.disconnect(); + this.logger.log('MPC Event Kafka consumer disconnected'); + } + } + + /** + * Register handler for keygen completed events + */ + onKeygenCompleted(handler: MpcEventHandler): void { + this.keygenCompletedHandler = handler; + } + + /** + * Register handler for signing completed events + */ + onSigningCompleted(handler: MpcEventHandler): void { + this.signingCompletedHandler = handler; + } + + /** + * Register handler for session failed events + */ + onSessionFailed(handler: MpcEventHandler): void { + this.sessionFailedHandler = handler; + } + + private async startConsuming(): Promise { + await this.consumer.run({ + eachMessage: async ({ topic, partition, message }: EachMessagePayload) => { + try { + const value = message.value?.toString(); + if (!value) { + this.logger.warn('Empty message received'); + return; + } + + const parsed = JSON.parse(value); + const payload = parsed.payload || parsed; + + this.logger.debug(`Received MPC event from ${topic}: ${JSON.stringify(payload)}`); + + switch (topic) { + case MPC_TOPICS.KEYGEN_COMPLETED: + if (this.keygenCompletedHandler) { + await this.keygenCompletedHandler(payload as KeygenCompletedPayload); + } + break; + + case MPC_TOPICS.SIGNING_COMPLETED: + if (this.signingCompletedHandler) { + await this.signingCompletedHandler(payload as SigningCompletedPayload); + } + break; + + case MPC_TOPICS.SESSION_FAILED: + if (this.sessionFailedHandler) { + await this.sessionFailedHandler(payload as SessionFailedPayload); + } + break; + + default: + this.logger.warn(`Unknown MPC topic: ${topic}`); + } + } catch (error) { + this.logger.error(`Error processing MPC event from ${topic}`, error); + } + }, + }); + + this.logger.log('Started consuming MPC events'); + } +} diff --git a/backend/services/mpc-service/src/application/application.module.ts b/backend/services/mpc-service/src/application/application.module.ts index f27ec8b0..0da566f0 100644 --- a/backend/services/mpc-service/src/application/application.module.ts +++ b/backend/services/mpc-service/src/application/application.module.ts @@ -1,7 +1,7 @@ /** * Application Module * - * mpc-service 作为网关,只需要 MPCCoordinatorService 转发请求到 mpc-system + * mpc-service 作为网关,处理来自其他服务的事件请求 */ import { Module } from '@nestjs/common'; @@ -10,6 +10,11 @@ import { InfrastructureModule } from '../infrastructure/infrastructure.module'; // Services import { MPCCoordinatorService } from './services/mpc-coordinator.service'; +import { EventConsumerStarterService } from './services/event-consumer-starter.service'; + +// Event Handlers +import { KeygenRequestedHandler } from './event-handlers/keygen-requested.handler'; +import { SigningRequestedHandler } from './event-handlers/signing-requested.handler'; @Module({ imports: [ @@ -19,6 +24,11 @@ import { MPCCoordinatorService } from './services/mpc-coordinator.service'; providers: [ // Application Services MPCCoordinatorService, + EventConsumerStarterService, // 启动 Kafka 消费者 + + // Event Handlers (Kafka consumers) + KeygenRequestedHandler, + SigningRequestedHandler, ], exports: [ MPCCoordinatorService, diff --git a/backend/services/mpc-service/src/application/event-handlers/index.ts b/backend/services/mpc-service/src/application/event-handlers/index.ts new file mode 100644 index 00000000..21c1f9fd --- /dev/null +++ b/backend/services/mpc-service/src/application/event-handlers/index.ts @@ -0,0 +1,6 @@ +/** + * Event Handlers Module Exports + */ + +export * from './keygen-requested.handler'; +export * from './signing-requested.handler'; diff --git a/backend/services/mpc-service/src/application/event-handlers/keygen-requested.handler.ts b/backend/services/mpc-service/src/application/event-handlers/keygen-requested.handler.ts new file mode 100644 index 00000000..b3a6b583 --- /dev/null +++ b/backend/services/mpc-service/src/application/event-handlers/keygen-requested.handler.ts @@ -0,0 +1,162 @@ +/** + * KeygenRequested Event Handler + * + * Handles keygen requests from identity-service via Kafka. + * Processes the keygen and publishes completion/failure events. + */ + +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { MPCCoordinatorService } from '../services/mpc-coordinator.service'; +import { EventPublisherService } from '../../infrastructure/messaging/kafka/event-publisher.service'; +import { + EventConsumerService, + MPC_CONSUME_TOPICS, + KeygenRequestedPayload, +} from '../../infrastructure/messaging/kafka/event-consumer.service'; +import { KeygenCompletedEvent } from '../../domain/events/keygen-completed.event'; +import { SessionFailedEvent } from '../../domain/events/session-failed.event'; +import { SessionType } from '../../domain/enums'; + +@Injectable() +export class KeygenRequestedHandler implements OnModuleInit { + private readonly logger = new Logger(KeygenRequestedHandler.name); + + constructor( + private readonly eventConsumer: EventConsumerService, + private readonly eventPublisher: EventPublisherService, + private readonly mpcCoordinator: MPCCoordinatorService, + ) {} + + async onModuleInit() { + await this.eventConsumer.subscribe( + MPC_CONSUME_TOPICS.KEYGEN_REQUESTED, + this.handleMessage.bind(this), + ); + this.logger.log(`Subscribed to ${MPC_CONSUME_TOPICS.KEYGEN_REQUESTED}`); + } + + private async handleMessage(topic: string, payload: Record): Promise { + const data = payload as unknown as KeygenRequestedPayload; + const { sessionId, userId, username, threshold, totalParties, requireDelegate } = data; + + this.logger.log(`Processing keygen request: userId=${userId}, username=${username}, sessionId=${sessionId}`); + + try { + // Step 1: Create keygen session via mpc-system + const createResult = await this.mpcCoordinator.createKeygenSession({ + username, + thresholdN: totalParties, + thresholdT: threshold, + requireDelegate, + }); + + const mpcSessionId = createResult.sessionId; + this.logger.log(`Keygen session created in mpc-system: ${mpcSessionId}`); + + // Step 2: Poll for completion (with max retries) + const result = await this.pollKeygenCompletion(mpcSessionId, 150, 2000); + + if (result.status === 'completed' && result.publicKey) { + // Cache public key + await this.mpcCoordinator.savePublicKeyCache(username, result.publicKey); + + // Save delegate share if exists + if (result.delegateShare) { + await this.mpcCoordinator.saveDelegateShare({ + username, + partyId: result.delegateShare.partyId, + partyIndex: result.delegateShare.partyIndex, + encryptedShare: result.delegateShare.encryptedShare, + }); + } + + // Publish success event + const completedEvent = new KeygenCompletedEvent( + sessionId, // Original session ID from identity-service + result.delegateShare?.partyId || '', + result.publicKey, + mpcSessionId, + `${threshold}-of-${totalParties}`, + ); + + // Add extra payload for identity-service + (completedEvent as any).extraPayload = { + userId, + username, + delegateShare: result.delegateShare, + serverParties: [], // mpc-system manages this + }; + + await this.eventPublisher.publishWithRetry(completedEvent); + this.logger.log(`Keygen completed: userId=${userId}, publicKey=${result.publicKey}`); + } else { + // Publish failure event + const failedEvent = new SessionFailedEvent( + sessionId, + '', // partyId + SessionType.KEYGEN, + `Keygen failed with status: ${result.status}`, + ); + (failedEvent as any).extraPayload = { userId, username }; + + await this.eventPublisher.publishWithRetry(failedEvent); + this.logger.warn(`Keygen failed: userId=${userId}, status=${result.status}`); + } + } catch (error) { + this.logger.error(`Keygen processing error: userId=${userId}`, error); + + // Publish failure event + const failedEvent = new SessionFailedEvent( + sessionId, + '', // partyId + SessionType.KEYGEN, + error instanceof Error ? error.message : 'Unknown error', + ); + (failedEvent as any).extraPayload = { userId, username }; + + try { + await this.eventPublisher.publishWithRetry(failedEvent); + } catch (publishError) { + this.logger.error('Failed to publish failure event', publishError); + } + } + } + + private async pollKeygenCompletion( + sessionId: string, + maxAttempts: number, + intervalMs: number, + ): Promise<{ + status: string; + publicKey?: string; + delegateShare?: { + partyId: string; + partyIndex: number; + encryptedShare: string; + }; + }> { + for (let i = 0; i < maxAttempts; i++) { + const status = await this.mpcCoordinator.getKeygenStatus(sessionId); + + if (status.status === 'completed') { + return { + status: 'completed', + publicKey: status.publicKey, + delegateShare: status.delegateShare, + }; + } + + if (status.status === 'failed' || status.status === 'expired') { + return { status: status.status }; + } + + await this.sleep(intervalMs); + } + + return { status: 'timeout' }; + } + + private sleep(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)); + } +} diff --git a/backend/services/mpc-service/src/application/event-handlers/signing-requested.handler.ts b/backend/services/mpc-service/src/application/event-handlers/signing-requested.handler.ts new file mode 100644 index 00000000..604e0e39 --- /dev/null +++ b/backend/services/mpc-service/src/application/event-handlers/signing-requested.handler.ts @@ -0,0 +1,141 @@ +/** + * SigningRequested Event Handler + * + * Handles signing requests from identity-service via Kafka. + * Processes the signing and publishes completion/failure events. + */ + +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { MPCCoordinatorService } from '../services/mpc-coordinator.service'; +import { EventPublisherService } from '../../infrastructure/messaging/kafka/event-publisher.service'; +import { + EventConsumerService, + MPC_CONSUME_TOPICS, + SigningRequestedPayload, +} from '../../infrastructure/messaging/kafka/event-consumer.service'; +import { SigningCompletedEvent } from '../../domain/events/signing-completed.event'; +import { SessionFailedEvent } from '../../domain/events/session-failed.event'; +import { SessionType } from '../../domain/enums'; + +@Injectable() +export class SigningRequestedHandler implements OnModuleInit { + private readonly logger = new Logger(SigningRequestedHandler.name); + + constructor( + private readonly eventConsumer: EventConsumerService, + private readonly eventPublisher: EventPublisherService, + private readonly mpcCoordinator: MPCCoordinatorService, + ) {} + + async onModuleInit() { + await this.eventConsumer.subscribe( + MPC_CONSUME_TOPICS.SIGNING_REQUESTED, + this.handleMessage.bind(this), + ); + this.logger.log(`Subscribed to ${MPC_CONSUME_TOPICS.SIGNING_REQUESTED}`); + } + + private async handleMessage(topic: string, payload: Record): Promise { + const data = payload as unknown as SigningRequestedPayload; + const { sessionId, userId, username, messageHash, userShare } = data; + + this.logger.log(`Processing signing request: userId=${userId}, username=${username}, sessionId=${sessionId}`); + + try { + // Step 1: Create signing session via mpc-system + const createResult = await this.mpcCoordinator.createSigningSession({ + username, + messageHash, + userShare, + }); + + const mpcSessionId = createResult.sessionId; + this.logger.log(`Signing session created in mpc-system: ${mpcSessionId}`); + + // Step 2: Poll for completion (with max retries) + const result = await this.pollSigningCompletion(mpcSessionId, 150, 2000); + + if (result.status === 'completed' && result.signature) { + // Publish success event + const completedEvent = new SigningCompletedEvent( + sessionId, // Original session ID from identity-service + '', // partyId + messageHash, + result.signature, + '', // publicKey - not needed for signing result + ); + + // Add extra payload for identity-service + (completedEvent as any).extraPayload = { + userId, + username, + mpcSessionId, + }; + + await this.eventPublisher.publishWithRetry(completedEvent); + this.logger.log(`Signing completed: userId=${userId}, signature=${result.signature.substring(0, 16)}...`); + } else { + // Publish failure event + const failedEvent = new SessionFailedEvent( + sessionId, + '', // partyId + SessionType.SIGN, + `Signing failed with status: ${result.status}`, + ); + (failedEvent as any).extraPayload = { userId, username }; + + await this.eventPublisher.publishWithRetry(failedEvent); + this.logger.warn(`Signing failed: userId=${userId}, status=${result.status}`); + } + } catch (error) { + this.logger.error(`Signing processing error: userId=${userId}`, error); + + // Publish failure event + const failedEvent = new SessionFailedEvent( + sessionId, + '', // partyId + SessionType.SIGN, + error instanceof Error ? error.message : 'Unknown error', + ); + (failedEvent as any).extraPayload = { userId, username }; + + try { + await this.eventPublisher.publishWithRetry(failedEvent); + } catch (publishError) { + this.logger.error('Failed to publish failure event', publishError); + } + } + } + + private async pollSigningCompletion( + sessionId: string, + maxAttempts: number, + intervalMs: number, + ): Promise<{ + status: string; + signature?: string; + }> { + for (let i = 0; i < maxAttempts; i++) { + const status = await this.mpcCoordinator.getSigningStatus(sessionId); + + if (status.status === 'completed') { + return { + status: 'completed', + signature: status.signature, + }; + } + + if (status.status === 'failed' || status.status === 'expired') { + return { status: status.status }; + } + + await this.sleep(intervalMs); + } + + return { status: 'timeout' }; + } + + private sleep(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)); + } +} diff --git a/backend/services/mpc-service/src/application/services/event-consumer-starter.service.ts b/backend/services/mpc-service/src/application/services/event-consumer-starter.service.ts new file mode 100644 index 00000000..e7d4f092 --- /dev/null +++ b/backend/services/mpc-service/src/application/services/event-consumer-starter.service.ts @@ -0,0 +1,25 @@ +/** + * Event Consumer Starter Service + * + * Starts Kafka consumers after all handlers are registered. + * This ensures handlers are registered before consuming starts. + */ + +import { Injectable, Logger, OnApplicationBootstrap } from '@nestjs/common'; +import { EventConsumerService } from '../../infrastructure/messaging/kafka/event-consumer.service'; + +@Injectable() +export class EventConsumerStarterService implements OnApplicationBootstrap { + private readonly logger = new Logger(EventConsumerStarterService.name); + + constructor(private readonly eventConsumer: EventConsumerService) {} + + async onApplicationBootstrap() { + try { + await this.eventConsumer.startConsuming(); + this.logger.log('MPC event consumers started successfully'); + } catch (error) { + this.logger.error('Failed to start MPC event consumers', error); + } + } +} diff --git a/backend/services/mpc-service/src/application/services/index.ts b/backend/services/mpc-service/src/application/services/index.ts index 1bf35dc7..c64e7e52 100644 --- a/backend/services/mpc-service/src/application/services/index.ts +++ b/backend/services/mpc-service/src/application/services/index.ts @@ -1 +1,2 @@ export * from './mpc-coordinator.service'; +export * from './event-consumer-starter.service'; diff --git a/backend/services/mpc-service/src/infrastructure/infrastructure.module.ts b/backend/services/mpc-service/src/infrastructure/infrastructure.module.ts index c7ca004a..ee62144a 100644 --- a/backend/services/mpc-service/src/infrastructure/infrastructure.module.ts +++ b/backend/services/mpc-service/src/infrastructure/infrastructure.module.ts @@ -1,7 +1,9 @@ /** * Infrastructure Module * - * mpc-service 作为网关,只需要 PrismaService 用于缓存公钥和 delegate share + * mpc-service 作为网关,需要: + * - PrismaService 用于缓存公钥和 delegate share + * - Kafka 事件发布和消费 */ import { Global, Module } from '@nestjs/common'; @@ -10,15 +12,25 @@ import { ConfigModule } from '@nestjs/config'; // Persistence import { PrismaService } from './persistence/prisma/prisma.service'; +// Kafka Messaging +import { EventPublisherService } from './messaging/kafka/event-publisher.service'; +import { EventConsumerService } from './messaging/kafka/event-consumer.service'; + @Global() @Module({ imports: [ConfigModule], providers: [ // Prisma (用于缓存公钥和 delegate share) PrismaService, + + // Kafka (事件发布和消费) + EventPublisherService, + EventConsumerService, ], exports: [ PrismaService, + EventPublisherService, + EventConsumerService, ], }) export class InfrastructureModule {} diff --git a/backend/services/mpc-service/src/infrastructure/messaging/kafka/event-consumer.service.ts b/backend/services/mpc-service/src/infrastructure/messaging/kafka/event-consumer.service.ts new file mode 100644 index 00000000..4ae45032 --- /dev/null +++ b/backend/services/mpc-service/src/infrastructure/messaging/kafka/event-consumer.service.ts @@ -0,0 +1,140 @@ +/** + * Event Consumer Service + * + * Consumes domain events from Kafka for async processing. + * Handles keygen and signing requests from identity-service. + */ + +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { Kafka, Consumer, logLevel, EachMessagePayload } from 'kafkajs'; + +// Kafka Topics for consuming +export const MPC_CONSUME_TOPICS = { + KEYGEN_REQUESTED: 'mpc.KeygenRequested', + SIGNING_REQUESTED: 'mpc.SigningRequested', +} as const; + +export interface KeygenRequestedPayload { + sessionId: string; + userId: string; + username: string; + threshold: number; + totalParties: number; + requireDelegate: boolean; +} + +export interface SigningRequestedPayload { + sessionId: string; + userId: string; + username: string; + messageHash: string; + userShare?: string; +} + +export type MessageHandler = (topic: string, payload: Record) => Promise; + +@Injectable() +export class EventConsumerService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(EventConsumerService.name); + private kafka: Kafka; + private consumer: Consumer; + private isConnected = false; + private handlers: Map = new Map(); + + constructor(private readonly configService: ConfigService) {} + + async onModuleInit() { + const brokers = this.configService.get('KAFKA_BROKERS')?.split(',') || ['localhost:9092']; + const clientId = this.configService.get('KAFKA_CLIENT_ID') || 'mpc-service'; + const groupId = this.configService.get('KAFKA_GROUP_ID') || 'mpc-service-group'; + + this.kafka = new Kafka({ + clientId, + brokers, + logLevel: logLevel.WARN, + retry: { + initialRetryTime: 100, + retries: 8, + }, + }); + + this.consumer = this.kafka.consumer({ + groupId, + sessionTimeout: 30000, + heartbeatInterval: 3000, + }); + + try { + await this.consumer.connect(); + this.isConnected = true; + this.logger.log('Kafka consumer connected'); + } catch (error) { + this.logger.error('Failed to connect Kafka consumer', error); + } + } + + async onModuleDestroy() { + if (this.isConnected) { + await this.consumer.disconnect(); + this.logger.log('Kafka consumer disconnected'); + } + } + + /** + * Subscribe to a topic with a handler + */ + async subscribe(topic: string, handler: MessageHandler): Promise { + if (!this.isConnected) { + this.logger.warn('Kafka not connected, cannot subscribe'); + return; + } + + this.handlers.set(topic, handler); + + try { + await this.consumer.subscribe({ topic, fromBeginning: false }); + this.logger.log(`Subscribed to topic: ${topic}`); + } catch (error) { + this.logger.error(`Failed to subscribe to topic: ${topic}`, error); + throw error; + } + } + + /** + * Start consuming messages + */ + async startConsuming(): Promise { + if (!this.isConnected) { + this.logger.warn('Kafka not connected, cannot start consuming'); + return; + } + + await this.consumer.run({ + eachMessage: async ({ topic, partition, message }: EachMessagePayload) => { + const handler = this.handlers.get(topic); + if (!handler) { + this.logger.warn(`No handler for topic: ${topic}`); + return; + } + + try { + const value = message.value?.toString(); + if (!value) { + this.logger.warn('Empty message received'); + return; + } + + const parsed = JSON.parse(value); + this.logger.debug(`Received message from ${topic}: ${JSON.stringify(parsed)}`); + + await handler(topic, parsed.payload || parsed); + } catch (error) { + this.logger.error(`Error processing message from ${topic}`, error); + } + }, + }); + + this.logger.log('Started consuming messages'); + } +} diff --git a/backend/services/mpc-service/src/infrastructure/messaging/kafka/index.ts b/backend/services/mpc-service/src/infrastructure/messaging/kafka/index.ts index db6b9da2..7ce4fc75 100644 --- a/backend/services/mpc-service/src/infrastructure/messaging/kafka/index.ts +++ b/backend/services/mpc-service/src/infrastructure/messaging/kafka/index.ts @@ -1 +1,2 @@ export * from './event-publisher.service'; +export * from './event-consumer.service';