diff --git a/backend/services/docker-compose.2.0.yml b/backend/services/docker-compose.2.0.yml index 92a89873..07bb6c78 100644 --- a/backend/services/docker-compose.2.0.yml +++ b/backend/services/docker-compose.2.0.yml @@ -418,8 +418,10 @@ services: REDIS_PORT: ${REDIS_PORT:-6379} REDIS_PASSWORD: ${REDIS_PASSWORD:-} REDIS_DB: 8 - # Kafka - 用于 MPC 签名通信和事件发布 + # Kafka - 用于事件发布 KAFKA_BROKERS: ${KAFKA_BROKERS:-kafka:29092} + # MPC Service - 直接 HTTP 调用签名 + MPC_SERVICE_URL: ${MPC_SERVICE_URL:-http://mpc-service:3006} # JWT 配置 JWT_SECRET: ${JWT_SECRET:-your-jwt-secret-change-in-production} # 区块链配置 diff --git a/backend/services/mining-blockchain-service/src/application/event-handlers/index.ts b/backend/services/mining-blockchain-service/src/application/event-handlers/index.ts index 7231998d..957ef247 100644 --- a/backend/services/mining-blockchain-service/src/application/event-handlers/index.ts +++ b/backend/services/mining-blockchain-service/src/application/event-handlers/index.ts @@ -1,2 +1 @@ -export * from './mpc-keygen-completed.handler'; export * from './withdrawal-requested.handler'; diff --git a/backend/services/mining-blockchain-service/src/application/event-handlers/mpc-keygen-completed.handler.ts b/backend/services/mining-blockchain-service/src/application/event-handlers/mpc-keygen-completed.handler.ts deleted file mode 100644 index 201f444c..00000000 --- a/backend/services/mining-blockchain-service/src/application/event-handlers/mpc-keygen-completed.handler.ts +++ /dev/null @@ -1,74 +0,0 @@ -import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; -import { AddressDerivationService } from '../services/address-derivation.service'; -import { MpcEventConsumerService, KeygenCompletedPayload } from '@/infrastructure/kafka/mpc-event-consumer.service'; - -/** - * MPC 密钥生成完成事件处理器 - * - * 监听 mpc.KeygenCompleted 事件,从公钥派生多链钱包地址, - * 并发布 blockchain.WalletAddressCreated 事件通知 identity-service - */ -@Injectable() -export class MpcKeygenCompletedHandler implements OnModuleInit { - private readonly logger = new Logger(MpcKeygenCompletedHandler.name); - - constructor( - private readonly addressDerivationService: AddressDerivationService, - private readonly mpcEventConsumer: MpcEventConsumerService, - ) {} - - onModuleInit() { - // Register handler for keygen completed events - this.mpcEventConsumer.onKeygenCompleted(this.handleKeygenCompleted.bind(this)); - this.logger.log(`[INIT] MpcKeygenCompletedHandler registered with MpcEventConsumer`); - } - - /** - * 处理 MPC 密钥生成完成事件 - * 从 mpc-service 的 KeygenCompleted 事件中提取 publicKey、userId 和 accountSequence - */ - private async handleKeygenCompleted(payload: KeygenCompletedPayload): Promise { - this.logger.log(`[HANDLE] Received KeygenCompleted event`); - this.logger.log(`[HANDLE] sessionId: ${payload.sessionId}`); - this.logger.log(`[HANDLE] publicKey: ${payload.publicKey?.substring(0, 30)}...`); - this.logger.log(`[HANDLE] extraPayload: ${JSON.stringify(payload.extraPayload)}`); - - // Extract userId and accountSequence from extraPayload - const userId = payload.extraPayload?.userId; - const accountSequence = payload.extraPayload?.accountSequence; - - if (!userId) { - this.logger.error(`[ERROR] Missing userId in extraPayload, cannot derive addresses`); - return; - } - - if (!accountSequence) { - this.logger.error(`[ERROR] Missing accountSequence in extraPayload, cannot derive addresses`); - return; - } - - const publicKey = payload.publicKey; - if (!publicKey) { - this.logger.error(`[ERROR] Missing publicKey in payload, cannot derive addresses`); - return; - } - - try { - this.logger.log(`[DERIVE] Starting address derivation for user: ${userId}, account: ${accountSequence}`); - - const result = await this.addressDerivationService.deriveAndRegister({ - userId: BigInt(userId), - accountSequence: accountSequence, - publicKey, - }); - - this.logger.log(`[DERIVE] Successfully derived ${result.addresses.length} addresses for account ${accountSequence}`); - result.addresses.forEach((addr) => { - this.logger.log(`[DERIVE] - ${addr.chainType}: ${addr.address}`); - }); - } catch (error) { - this.logger.error(`[ERROR] Failed to derive addresses for account ${accountSequence}:`, error); - throw error; - } - } -} diff --git a/backend/services/mining-blockchain-service/src/infrastructure/infrastructure.module.ts b/backend/services/mining-blockchain-service/src/infrastructure/infrastructure.module.ts index f8d4403d..aef33fb0 100644 --- a/backend/services/mining-blockchain-service/src/infrastructure/infrastructure.module.ts +++ b/backend/services/mining-blockchain-service/src/infrastructure/infrastructure.module.ts @@ -2,7 +2,7 @@ import { Global, Module } from '@nestjs/common'; import { HttpModule } from '@nestjs/axios'; import { PrismaService } from './persistence/prisma/prisma.service'; import { RedisService, AddressCacheService } from './redis'; -import { EventPublisherService, MpcEventConsumerService, WithdrawalEventConsumerService } from './kafka'; +import { EventPublisherService, WithdrawalEventConsumerService } from './kafka'; import { EvmProviderAdapter, AddressDerivationAdapter, MnemonicDerivationAdapter, RecoveryMnemonicAdapter, BlockScannerService } from './blockchain'; import { MpcSigningClient } from './mpc'; import { DomainModule } from '@/domain/domain.module'; @@ -33,7 +33,6 @@ import { PrismaService, RedisService, EventPublisherService, - MpcEventConsumerService, WithdrawalEventConsumerService, MpcSigningClient, @@ -81,7 +80,6 @@ import { PrismaService, RedisService, EventPublisherService, - MpcEventConsumerService, WithdrawalEventConsumerService, MpcSigningClient, EvmProviderAdapter, diff --git a/backend/services/mining-blockchain-service/src/infrastructure/kafka/index.ts b/backend/services/mining-blockchain-service/src/infrastructure/kafka/index.ts index e78978c3..df1e0281 100644 --- a/backend/services/mining-blockchain-service/src/infrastructure/kafka/index.ts +++ b/backend/services/mining-blockchain-service/src/infrastructure/kafka/index.ts @@ -1,5 +1,4 @@ export * from './event-publisher.service'; export * from './event-consumer.controller'; -export * from './mpc-event-consumer.service'; export * from './withdrawal-event-consumer.service'; export * from './deposit-ack-consumer.service'; diff --git a/backend/services/mining-blockchain-service/src/infrastructure/kafka/mpc-event-consumer.service.ts b/backend/services/mining-blockchain-service/src/infrastructure/kafka/mpc-event-consumer.service.ts deleted file mode 100644 index c873f4ca..00000000 --- a/backend/services/mining-blockchain-service/src/infrastructure/kafka/mpc-event-consumer.service.ts +++ /dev/null @@ -1,247 +0,0 @@ -/** - * MPC Event Consumer Service for Blockchain Service - * - * Consumes MPC events from mpc-service via Kafka: - * - KeygenCompleted: derives wallet addresses from public keys - * - SigningCompleted: returns signature for hot wallet transfers - * - SessionFailed: handles keygen/signing failures - */ - -import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; -import { ConfigService } from '@nestjs/config'; -import { Kafka, Consumer, logLevel, EachMessagePayload } from 'kafkajs'; - -// MPC Event Topics (events from mpc-service) -export const MPC_TOPICS = { - KEYGEN_COMPLETED: 'mining_mpc.KeygenCompleted', - SIGNING_COMPLETED: 'mining_mpc.SigningCompleted', - SESSION_FAILED: 'mining_mpc.SessionFailed', -} as const; - -export interface KeygenCompletedPayload { - sessionId: string; - partyId: string; - publicKey: string; - shareId: string; - threshold: string; - extraPayload?: { - userId: string; - accountSequence: string; // 账户序列号 (格式: D + YYMMDD + 5位序号) - username: string; - delegateShare?: { - partyId: string; - partyIndex: number; - encryptedShare: string; - }; - serverParties?: string[]; - }; -} - -export interface SigningCompletedPayload { - sessionId: string; - partyId: string; - messageHash: string; - signature: string; - publicKey: string; - extraPayload?: { - userId: string; - username: string; - mpcSessionId: string; - source?: string; // 'blockchain-service' | 'identity-service' - }; -} - -export interface SessionFailedPayload { - sessionId: string; - partyId: string; - sessionType: string; // 'keygen' | 'sign' - errorMessage: string; - errorCode?: string; - extraPayload?: { - userId: string; - username: string; - source?: string; - }; -} - -export type MpcEventHandler = (payload: T) => Promise; - -@Injectable() -export class MpcEventConsumerService implements OnModuleInit, OnModuleDestroy { - private readonly logger = new Logger(MpcEventConsumerService.name); - private kafka: Kafka; - private consumer: Consumer; - private isConnected = false; - - private keygenCompletedHandler?: MpcEventHandler; - private signingCompletedHandler?: MpcEventHandler; - private sessionFailedHandler?: MpcEventHandler; - private signingFailedHandler?: MpcEventHandler; - - constructor(private readonly configService: ConfigService) {} - - async onModuleInit() { - const brokers = this.configService.get('KAFKA_BROKERS')?.split(',') || ['localhost:9092']; - const clientId = this.configService.get('KAFKA_CLIENT_ID') || 'mining-blockchain-service'; - const groupId = 'mining-blockchain-service-mpc-events'; - - this.logger.log(`[INIT] MPC Event Consumer for blockchain-service initializing...`); - this.logger.log(`[INIT] ClientId: ${clientId}`); - this.logger.log(`[INIT] GroupId: ${groupId}`); - this.logger.log(`[INIT] Brokers: ${brokers.join(', ')}`); - this.logger.log(`[INIT] Topics to subscribe: ${Object.values(MPC_TOPICS).join(', ')}`); - - // 企业级重试配置:指数退避,最多重试约 2.5 小时 - this.kafka = new Kafka({ - clientId, - brokers, - logLevel: logLevel.WARN, - retry: { - initialRetryTime: 1000, // 1 秒 - maxRetryTime: 300000, // 最大 5 分钟 - retries: 15, // 最多 15 次 - multiplier: 2, // 指数退避因子 - restartOnFailure: async () => true, - }, - }); - - this.consumer = this.kafka.consumer({ - groupId, - sessionTimeout: 30000, - heartbeatInterval: 3000, - }); - - try { - this.logger.log(`[CONNECT] Connecting MPC Event consumer...`); - await this.consumer.connect(); - this.isConnected = true; - this.logger.log(`[CONNECT] MPC Event Kafka consumer connected successfully`); - - // Subscribe to MPC topics - await this.consumer.subscribe({ topics: Object.values(MPC_TOPICS), fromBeginning: false }); - this.logger.log(`[SUBSCRIBE] Subscribed to MPC topics: ${Object.values(MPC_TOPICS).join(', ')}`); - - // Start consuming - await this.startConsuming(); - } catch (error) { - this.logger.error(`[ERROR] Failed to connect MPC Event Kafka consumer`, error); - } - } - - async onModuleDestroy() { - if (this.isConnected) { - await this.consumer.disconnect(); - this.logger.log('MPC Event Kafka consumer disconnected'); - } - } - - /** - * Register handler for keygen completed events - */ - onKeygenCompleted(handler: MpcEventHandler): void { - this.keygenCompletedHandler = handler; - this.logger.log(`[REGISTER] KeygenCompleted handler registered`); - } - - /** - * Register handler for signing completed events - */ - onSigningCompleted(handler: MpcEventHandler): void { - this.signingCompletedHandler = handler; - this.logger.log(`[REGISTER] SigningCompleted handler registered`); - } - - /** - * Register handler for session failed events (keygen) - */ - onSessionFailed(handler: MpcEventHandler): void { - this.sessionFailedHandler = handler; - this.logger.log(`[REGISTER] SessionFailed handler registered`); - } - - /** - * Register handler for signing failed events - */ - onSigningFailed(handler: MpcEventHandler): void { - this.signingFailedHandler = handler; - this.logger.log(`[REGISTER] SigningFailed handler registered`); - } - - private async startConsuming(): Promise { - await this.consumer.run({ - eachMessage: async ({ topic, partition, message }: EachMessagePayload) => { - const offset = message.offset; - this.logger.log(`[RECEIVE] Message received: topic=${topic}, partition=${partition}, offset=${offset}`); - - try { - const value = message.value?.toString(); - if (!value) { - this.logger.warn(`[RECEIVE] Empty message received on ${topic}`); - return; - } - - this.logger.log(`[RECEIVE] Raw message value: ${value.substring(0, 500)}...`); - - const parsed = JSON.parse(value); - const payload = parsed.payload || parsed; - - this.logger.log(`[RECEIVE] Parsed event: eventType=${parsed.eventType || 'unknown'}`); - this.logger.log(`[RECEIVE] Payload keys: ${Object.keys(payload).join(', ')}`); - - switch (topic) { - case MPC_TOPICS.KEYGEN_COMPLETED: - this.logger.log(`[HANDLE] Processing KeygenCompleted event for blockchain-service`); - this.logger.log(`[HANDLE] publicKey: ${(payload as KeygenCompletedPayload).publicKey?.substring(0, 20)}...`); - this.logger.log(`[HANDLE] extraPayload.userId: ${(payload as KeygenCompletedPayload).extraPayload?.userId}`); - if (this.keygenCompletedHandler) { - await this.keygenCompletedHandler(payload as KeygenCompletedPayload); - this.logger.log(`[HANDLE] KeygenCompleted handler completed successfully`); - } else { - this.logger.warn(`[HANDLE] No handler registered for KeygenCompleted`); - } - break; - - case MPC_TOPICS.SIGNING_COMPLETED: - this.logger.log(`[HANDLE] Processing SigningCompleted event`); - this.logger.log(`[HANDLE] sessionId: ${(payload as SigningCompletedPayload).sessionId}`); - this.logger.log(`[HANDLE] signature: ${(payload as SigningCompletedPayload).signature?.substring(0, 20)}...`); - if (this.signingCompletedHandler) { - await this.signingCompletedHandler(payload as SigningCompletedPayload); - this.logger.log(`[HANDLE] SigningCompleted handler completed successfully`); - } else { - this.logger.warn(`[HANDLE] No handler registered for SigningCompleted`); - } - break; - - case MPC_TOPICS.SESSION_FAILED: - this.logger.log(`[HANDLE] Processing SessionFailed event`); - this.logger.log(`[HANDLE] sessionType: ${(payload as SessionFailedPayload).sessionType}`); - this.logger.log(`[HANDLE] errorMessage: ${(payload as SessionFailedPayload).errorMessage}`); - - const failedPayload = payload as SessionFailedPayload; - // Route to appropriate handler based on session type - if (failedPayload.sessionType === 'sign') { - if (this.signingFailedHandler) { - await this.signingFailedHandler(failedPayload); - this.logger.log(`[HANDLE] SigningFailed handler completed`); - } - } else { - if (this.sessionFailedHandler) { - await this.sessionFailedHandler(failedPayload); - this.logger.log(`[HANDLE] SessionFailed handler completed`); - } - } - break; - - default: - this.logger.warn(`[RECEIVE] Unknown MPC topic: ${topic}`); - } - } catch (error) { - this.logger.error(`[ERROR] Error processing MPC event from ${topic}`, error); - } - }, - }); - - this.logger.log(`[START] Started consuming MPC events for address derivation`); - } -} diff --git a/backend/services/mining-blockchain-service/src/infrastructure/mpc/mpc-signing.client.ts b/backend/services/mining-blockchain-service/src/infrastructure/mpc/mpc-signing.client.ts index 796f6907..bd522eaa 100644 --- a/backend/services/mining-blockchain-service/src/infrastructure/mpc/mpc-signing.client.ts +++ b/backend/services/mining-blockchain-service/src/infrastructure/mpc/mpc-signing.client.ts @@ -1,23 +1,18 @@ /** * MPC Signing Client * - * 通过 Kafka 事件与 mpc-service 通信进行 MPC 签名 + * 通过 HTTP 直接调用 mpc-service 进行 MPC 签名 * 用于热钱包的 ERC20 转账签名 * - * 事件流: - * blockchain-service → Kafka(mpc.SigningRequested) → mpc-service - * mpc-service → Kafka(mpc.SigningCompleted) → blockchain-service + * 流程: + * blockchain-service → POST /api/v1/mpc/sign → mpc-service + * blockchain-service ← GET /api/v1/mpc/sign/{sessionId}/status (轮询) ← mpc-service */ -import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; -import { randomUUID } from 'crypto'; -import { EventPublisherService } from '@/infrastructure/kafka/event-publisher.service'; -import { - MpcEventConsumerService, - SigningCompletedPayload, - SessionFailedPayload, -} from '@/infrastructure/kafka/mpc-event-consumer.service'; +import { HttpService } from '@nestjs/axios'; +import { firstValueFrom } from 'rxjs'; export interface CreateSigningInput { username: string; @@ -30,15 +25,11 @@ export interface SigningResult { signature?: string; } -// 签名结果回调 -type SigningCallback = (signature: string | null, error?: string) => void; - -// MPC 签名请求 Topic -export const MPC_SIGNING_TOPIC = 'mining_mpc.SigningRequested'; - @Injectable() -export class MpcSigningClient implements OnModuleInit { +export class MpcSigningClient { private readonly logger = new Logger(MpcSigningClient.name); + // MPC Service URL + private readonly mpcServiceUrl: string; // C2C Bot 热钱包 private readonly hotWalletUsername: string; private readonly hotWalletAddress: string; @@ -48,20 +39,16 @@ export class MpcSigningClient implements OnModuleInit { // fUSDT (积分值) 做市商钱包 private readonly fusdtMarketMakerUsername: string; private readonly fusdtMarketMakerAddress: string; - private readonly signingTimeoutMs: number = 300000; // 5 minutes - - // 待处理的签名请求回调 Map - private pendingRequests: Map void; - reject: (error: Error) => void; - timeout: NodeJS.Timeout; - }> = new Map(); + // 轮询配置 + private readonly pollIntervalMs: number = 2000; + private readonly maxPollAttempts: number = 150; // 5 minutes constructor( private readonly configService: ConfigService, - private readonly eventPublisher: EventPublisherService, - private readonly mpcEventConsumer: MpcEventConsumerService, + private readonly httpService: HttpService, ) { + // MPC Service URL + this.mpcServiceUrl = this.configService.get('MPC_SERVICE_URL', 'http://mpc-service:3006'); // C2C Bot 热钱包配置 this.hotWalletUsername = this.configService.get('C2C_BOT_WALLET_USERNAME', ''); this.hotWalletAddress = this.configService.get('C2C_BOT_WALLET_ADDRESS', ''); @@ -88,14 +75,7 @@ export class MpcSigningClient implements OnModuleInit { this.logger.log(`[INIT] C2C Bot Wallet: ${this.hotWalletAddress || '(not configured)'}`); this.logger.log(`[INIT] eUSDT Market Maker: ${this.eusdtMarketMakerAddress || '(not configured)'}`); this.logger.log(`[INIT] fUSDT Market Maker: ${this.fusdtMarketMakerAddress || '(not configured)'}`); - this.logger.log(`[INIT] Using Kafka event-driven signing`); - } - - async onModuleInit() { - // 注册签名完成和失败事件处理器 - this.mpcEventConsumer.onSigningCompleted(this.handleSigningCompleted.bind(this)); - this.mpcEventConsumer.onSigningFailed(this.handleSigningFailed.bind(this)); - this.logger.log('[INIT] MPC signing event handlers registered'); + this.logger.log(`[INIT] MPC Service URL: ${this.mpcServiceUrl}`); } /** @@ -162,10 +142,7 @@ export class MpcSigningClient implements OnModuleInit { } /** - * 签名消息(使用 C2C Bot 热钱包,通过 Kafka 事件驱动) - * - * @param messageHash 要签名的消息哈希 (hex string with 0x prefix) - * @returns 签名结果 (hex string) + * 签名消息(使用 C2C Bot 热钱包) */ async signMessage(messageHash: string): Promise { if (!this.hotWalletUsername) { @@ -176,9 +153,6 @@ export class MpcSigningClient implements OnModuleInit { /** * 使用 eUSDT 做市商钱包签名消息 - * - * @param messageHash 要签名的消息哈希 (hex string with 0x prefix) - * @returns 签名结果 (hex string) */ async signMessageAsEusdtMarketMaker(messageHash: string): Promise { if (!this.eusdtMarketMakerUsername) { @@ -189,9 +163,6 @@ export class MpcSigningClient implements OnModuleInit { /** * 使用 fUSDT 做市商钱包签名消息 - * - * @param messageHash 要签名的消息哈希 (hex string with 0x prefix) - * @returns 签名结果 (hex string) */ async signMessageAsFusdtMarketMaker(messageHash: string): Promise { if (!this.fusdtMarketMakerUsername) { @@ -201,11 +172,11 @@ export class MpcSigningClient implements OnModuleInit { } /** - * 使用指定用户名签名消息(通过 Kafka 事件驱动) + * 使用指定用户名签名消息(通过 HTTP 调用 mpc-service) * - * @param username MPC 用户名 - * @param messageHash 要签名的消息哈希 (hex string with 0x prefix) - * @returns 签名结果 (hex string) + * 流程: + * 1. POST /api/v1/mpc/sign → 创建签名会话,获取 sessionId + * 2. GET /api/v1/mpc/sign/{sessionId}/status → 轮询结果 (每 2 秒, 最多 5 分钟) */ async signMessageWithUsername(username: string, messageHash: string): Promise { this.logger.log(`[SIGN] Starting MPC signing for: ${messageHash.slice(0, 16)}... (username: ${username})`); @@ -214,87 +185,74 @@ export class MpcSigningClient implements OnModuleInit { throw new Error('MPC username not provided'); } - const sessionId = randomUUID(); - this.logger.log(`[SIGN] Session ID: ${sessionId}`); - - // 创建 Promise 等待签名结果 - const signaturePromise = new Promise((resolve, reject) => { - // 设置超时 - const timeout = setTimeout(() => { - this.pendingRequests.delete(sessionId); - reject(new Error(`MPC signing timeout after ${this.signingTimeoutMs}ms`)); - }, this.signingTimeoutMs); - - // 保存到待处理队列 - this.pendingRequests.set(sessionId, { resolve, reject, timeout }); - }); - - // 发布签名请求事件到 Kafka try { - await this.eventPublisher.publish({ - eventType: 'mining_blockchain.mpc.signing.requested', - toPayload: () => ({ - sessionId, - userId: 'system', - username, - messageHash, - source: 'mining-blockchain-service', - }), - eventId: sessionId, - occurredAt: new Date(), - }); + // 1. 创建签名会话 + const createResponse = await firstValueFrom( + this.httpService.post<{ sessionId: string; status: string }>( + `${this.mpcServiceUrl}/api/v1/mpc/sign`, + { + username, + messageHash, + }, + { + headers: { 'Content-Type': 'application/json' }, + timeout: 30000, + }, + ), + ); - this.logger.log(`[SIGN] Signing request published to Kafka: sessionId=${sessionId}, username=${username}`); + const sessionId = createResponse.data.sessionId; + this.logger.log(`[SIGN] Signing session created: ${sessionId}`); + + // 2. 轮询签名状态 + const result = await this.pollSigningStatus(sessionId); + + if (result.status !== 'completed') { + throw new Error(`Signing session failed with status: ${result.status}`); + } + + this.logger.log(`[SIGN] Signature obtained: ${result.signature.slice(0, 20)}...`); + return result.signature; } catch (error) { - // 发布失败,清理待处理队列 - const pending = this.pendingRequests.get(sessionId); - if (pending) { - clearTimeout(pending.timeout); - this.pendingRequests.delete(sessionId); - } - throw error; - } - - // 等待签名结果 - const signature = await signaturePromise; - this.logger.log(`[SIGN] Signature obtained: ${signature.slice(0, 20)}...`); - return signature; - } - - /** - * 处理签名完成事件 - */ - private async handleSigningCompleted(payload: SigningCompletedPayload): Promise { - const sessionId = payload.sessionId; - this.logger.log(`[EVENT] Signing completed: sessionId=${sessionId}`); - - const pending = this.pendingRequests.get(sessionId); - if (pending) { - clearTimeout(pending.timeout); - this.pendingRequests.delete(sessionId); - - if (payload.signature) { - pending.resolve(payload.signature); - } else { - pending.reject(new Error('Signing completed but no signature returned')); - } - } else { - this.logger.warn(`[EVENT] No pending request for sessionId=${sessionId}`); + this.logger.error(`[SIGN] MPC signing failed: username=${username}`, error); + throw new Error(`MPC signing failed: ${error.message}`); } } /** - * 处理签名失败事件 + * 轮询签名会话状态 */ - private async handleSigningFailed(payload: SessionFailedPayload): Promise { - const sessionId = payload.sessionId; - this.logger.warn(`[EVENT] Signing failed: sessionId=${sessionId}, error=${payload.errorMessage}`); + private async pollSigningStatus(sessionId: string): Promise<{ status: string; signature: string }> { + for (let i = 0; i < this.maxPollAttempts; i++) { + try { + const response = await firstValueFrom( + this.httpService.get<{ sessionId: string; status: string; signature?: string }>( + `${this.mpcServiceUrl}/api/v1/mpc/sign/${sessionId}/status`, + { timeout: 10000 }, + ), + ); - const pending = this.pendingRequests.get(sessionId); - if (pending) { - clearTimeout(pending.timeout); - this.pendingRequests.delete(sessionId); - pending.reject(new Error(`MPC signing failed: ${payload.errorMessage}`)); + const data = response.data; + + if (data.status === 'completed') { + return { status: 'completed', signature: data.signature || '' }; + } + + if (data.status === 'failed' || data.status === 'expired') { + return { status: data.status, signature: '' }; + } + + await this.sleep(this.pollIntervalMs); + } catch (error) { + this.logger.warn(`[POLL] Error polling signing status (attempt ${i + 1}/${this.maxPollAttempts}): ${error.message}`); + await this.sleep(this.pollIntervalMs); + } } + + throw new Error(`Signing session ${sessionId} timed out after ${this.maxPollAttempts * this.pollIntervalMs / 1000}s`); + } + + private sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); } }