diff --git a/backend/mpc-system/services/service-party-app/electron/main.ts b/backend/mpc-system/services/service-party-app/electron/main.ts index 3c323cdf..d2892a49 100644 --- a/backend/mpc-system/services/service-party-app/electron/main.ts +++ b/backend/mpc-system/services/service-party-app/electron/main.ts @@ -679,11 +679,19 @@ function setupIpcHandlers() { threshold: activeKeygenSession.threshold, }); - // 100% 可靠方案:延迟检查会话状态,如果所有人都已加入则触发 keygen - // 这样即使错过 session_started 事件也能正常工作 - setTimeout(() => { + // 关键步骤:立即预订阅消息流 + // 这确保在其他方开始发送 TSS 消息时,我们已经准备好接收和缓冲 + // 即使 keygen 进程还没启动,消息也不会丢失 + if (tssHandler && 'prepareForKeygen' in tssHandler) { + debugLog.info('tss', `Preparing for keygen: subscribing to messages for session ${sessionId}`); + (tssHandler as { prepareForKeygen: (sessionId: string, partyId: string) => void }).prepareForKeygen(sessionId, partyId); + } + + // 立即检查会话状态并触发 keygen(不再延迟) + // 使用 setImmediate 确保在当前事件循环结束后立即执行 + setImmediate(() => { checkAndTriggerKeygen(sessionId); - }, 1000); // 延迟 1 秒等待可能的事件到达 + }); } return { success: true, data: result }; } catch (error) { @@ -752,10 +760,16 @@ function setupIpcHandlers() { threshold: activeKeygenSession.threshold, }); - // 100% 可靠方案:延迟检查会话状态,如果所有人都已加入则触发 keygen - setTimeout(() => { + // 关键步骤:立即预订阅消息流 + if (tssHandler && 'prepareForKeygen' in tssHandler) { + debugLog.info('tss', `Initiator preparing for keygen: subscribing to messages for session ${result.session_id}`); + (tssHandler as { prepareForKeygen: (sessionId: string, partyId: string) => void }).prepareForKeygen(result.session_id, partyId); + } + + // 立即检查会话状态并触发 keygen(不再延迟) + setImmediate(() => { checkAndTriggerKeygen(result.session_id); - }, 1000); + }); } else { console.warn('Initiator failed to join session'); } diff --git a/backend/mpc-system/services/service-party-app/electron/modules/tss-handler.ts b/backend/mpc-system/services/service-party-app/electron/modules/tss-handler.ts index 32f02a8c..433e1364 100644 --- a/backend/mpc-system/services/service-party-app/electron/modules/tss-handler.ts +++ b/backend/mpc-system/services/service-party-app/electron/modules/tss-handler.ts @@ -71,6 +71,8 @@ export class TSSHandler extends EventEmitter { payload: Buffer; }> = []; private isProcessReady = false; + // 是否已预订阅消息(用于在 keygen 启动前就开始缓冲消息) + private isPrepared = false; constructor(grpcClient: GrpcClient, database?: DatabaseManager) { super(); @@ -113,6 +115,51 @@ export class TSSHandler extends EventEmitter { return binaryName; } + /** + * 预订阅消息流 - 在 joinSession 后立即调用 + * 这确保在其他方开始发送消息时,我们已经准备好接收和缓冲 + * + * @param sessionId 会话 ID + * @param partyId 自己的 party ID + */ + prepareForKeygen(sessionId: string, partyId: string): void { + if (this.isPrepared) { + console.log('[TSS] Already prepared for keygen, skip'); + return; + } + + console.log(`[TSS] Preparing for keygen: session=${sessionId.substring(0, 8)}..., party=${partyId.substring(0, 8)}...`); + + this.sessionId = sessionId; + this.partyId = partyId; + this.isPrepared = true; + this.messageBuffer = []; + + // 立即订阅消息流,开始缓冲消息 + // 使用 bound 方法确保 this 上下文正确 + this.grpcClient.on('mpcMessage', this.handleIncomingMessage.bind(this)); + this.grpcClient.subscribeMessages(sessionId, partyId); + + console.log('[TSS] Message subscription started, buffering enabled'); + } + + /** + * 取消预订阅 + */ + cancelPrepare(): void { + if (!this.isPrepared) { + return; + } + + console.log('[TSS] Canceling prepare'); + this.isPrepared = false; + this.messageBuffer = []; + this.grpcClient.removeAllListeners('mpcMessage'); + this.grpcClient.unsubscribeMessages(); + this.sessionId = null; + this.partyId = null; + } + /** * 参与 Keygen 协议 */ @@ -128,13 +175,19 @@ export class TSSHandler extends EventEmitter { throw new Error('TSS protocol already running'); } + // 检查是否已经预订阅 + const wasPrepared = this.isPrepared && this.sessionId === sessionId; + const bufferedCount = this.messageBuffer.length; + + console.log(`[TSS] Starting keygen: wasPrepared=${wasPrepared}, bufferedMessages=${bufferedCount}`); + this.sessionId = sessionId; this.partyId = partyId; this.partyIndex = partyIndex; this.participants = participants; this.isRunning = true; this.isProcessReady = false; - this.messageBuffer = []; // 清空消息缓冲 + // 注意:不清空消息缓冲,保留预订阅阶段收到的消息 // 构建 party index map this.partyIndexMap.clear(); @@ -163,9 +216,15 @@ export class TSSHandler extends EventEmitter { let resultData = ''; - // 先订阅消息(可能在进程就绪前就收到消息,会被缓冲) - this.grpcClient.on('mpcMessage', this.handleIncomingMessage.bind(this)); - this.grpcClient.subscribeMessages(sessionId, partyId); + // 如果没有预订阅,现在订阅消息 + // 如果已经预订阅,消息监听器已经注册,不需要重复注册 + if (!wasPrepared) { + console.log('[TSS] Subscribing to messages (not prepared before)'); + this.grpcClient.on('mpcMessage', this.handleIncomingMessage.bind(this)); + this.grpcClient.subscribeMessages(sessionId, partyId); + } else { + console.log(`[TSS] Using existing subscription, ${bufferedCount} messages buffered`); + } // 处理标准输出 (JSON 消息) this.tssProcess.stdout?.on('data', (data: Buffer) => { @@ -203,6 +262,7 @@ export class TSSHandler extends EventEmitter { const completedSessionId = this.sessionId; this.isRunning = false; this.isProcessReady = false; + this.isPrepared = false; this.messageBuffer = []; this.tssProcess = null; // 清理消息监听器,防止下次 keygen 时重复注册 @@ -237,6 +297,7 @@ export class TSSHandler extends EventEmitter { this.tssProcess.on('error', (err) => { this.isRunning = false; this.isProcessReady = false; + this.isPrepared = false; this.messageBuffer = []; this.tssProcess = null; // 清理消息监听器 @@ -246,6 +307,7 @@ export class TSSHandler extends EventEmitter { } catch (err) { this.isRunning = false; + this.isPrepared = false; reject(err); } }); @@ -296,6 +358,11 @@ export class TSSHandler extends EventEmitter { /** * 处理从 gRPC 接收的 MPC 消息(带去重) + * + * 消息处理状态: + * 1. isPrepared=true, isRunning=false: 预订阅阶段,缓冲消息 + * 2. isPrepared=true, isRunning=true, isProcessReady=false: 进程启动中,缓冲消息 + * 3. isPrepared=true, isRunning=true, isProcessReady=true: 进程就绪,直接发送 */ private handleIncomingMessage(message: { messageId: string; @@ -311,16 +378,22 @@ export class TSSHandler extends EventEmitter { } } - // 如果进程未就绪,缓冲消息 - if (!this.isProcessReady || !this.tssProcess || !this.tssProcess.stdin) { - if (this.isRunning) { - console.log(`[TSS] Buffering message from ${message.fromParty.substring(0, 8)}... (process not ready)`); - this.messageBuffer.push(message); - } + // 如果进程就绪,直接发送 + if (this.isProcessReady && this.tssProcess?.stdin) { + this.sendMessageToProcess(message); return; } - this.sendMessageToProcess(message); + // 如果已预订阅或正在运行,缓冲消息 + // 这确保在任何阶段收到的消息都不会丢失 + if (this.isPrepared || this.isRunning) { + console.log(`[TSS] Buffering message from ${message.fromParty.substring(0, 8)}... (prepared=${this.isPrepared}, running=${this.isRunning}, ready=${this.isProcessReady})`); + this.messageBuffer.push(message); + return; + } + + // 既没预订阅也没运行,忽略消息 + console.log(`[TSS] Ignoring message from ${message.fromParty.substring(0, 8)}... (not prepared)`); } /** @@ -383,6 +456,7 @@ export class TSSHandler extends EventEmitter { } this.isRunning = false; this.isProcessReady = false; + this.isPrepared = false; this.messageBuffer = []; this.grpcClient.removeAllListeners('mpcMessage'); } @@ -393,6 +467,13 @@ export class TSSHandler extends EventEmitter { getIsRunning(): boolean { return this.isRunning; } + + /** + * 检查是否已预订阅 + */ + getIsPrepared(): boolean { + return this.isPrepared; + } } /**