fix(service-party-app): 修复 co_managed_keygen 消息丢失问题

问题:
- service-party-app 在 joinSession 后有 1 秒延迟才开始 keygen
- server-party 检测到所有参与者后立即发送 TSS Round 0 消息
- service-party-app 此时还没订阅消息流,导致消息丢失
- TSS 协议无法完成

修复:
- TSSHandler 新增 prepareForKeygen() 方法,在 joinSession 后立即订阅消息
- 新增 isPrepared 状态,在预订阅阶段也能缓冲消息
- handleIncomingMessage 支持 isPrepared || isRunning 时缓冲消息
- participateKeygen 保留预订阅阶段缓冲的消息,不重复订阅
- main.ts 在 joinSession 成功后立即调用 prepareForKeygen()
- 移除 1 秒延迟,改用 setImmediate 立即触发 keygen

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2025-12-29 10:35:39 -08:00
parent 820a61793c
commit dfead071ab
2 changed files with 113 additions and 18 deletions

View File

@ -679,11 +679,19 @@ function setupIpcHandlers() {
threshold: activeKeygenSession.threshold,
});
// 100% 可靠方案:延迟检查会话状态,如果所有人都已加入则触发 keygen
// 这样即使错过 session_started 事件也能正常工作
setTimeout(() => {
// 关键步骤:立即预订阅消息流
// 这确保在其他方开始发送 TSS 消息时,我们已经准备好接收和缓冲
// 即使 keygen 进程还没启动,消息也不会丢失
if (tssHandler && 'prepareForKeygen' in tssHandler) {
debugLog.info('tss', `Preparing for keygen: subscribing to messages for session ${sessionId}`);
(tssHandler as { prepareForKeygen: (sessionId: string, partyId: string) => void }).prepareForKeygen(sessionId, partyId);
}
// 立即检查会话状态并触发 keygen不再延迟
// 使用 setImmediate 确保在当前事件循环结束后立即执行
setImmediate(() => {
checkAndTriggerKeygen(sessionId);
}, 1000); // 延迟 1 秒等待可能的事件到达
});
}
return { success: true, data: result };
} catch (error) {
@ -752,10 +760,16 @@ function setupIpcHandlers() {
threshold: activeKeygenSession.threshold,
});
// 100% 可靠方案:延迟检查会话状态,如果所有人都已加入则触发 keygen
setTimeout(() => {
// 关键步骤:立即预订阅消息流
if (tssHandler && 'prepareForKeygen' in tssHandler) {
debugLog.info('tss', `Initiator preparing for keygen: subscribing to messages for session ${result.session_id}`);
(tssHandler as { prepareForKeygen: (sessionId: string, partyId: string) => void }).prepareForKeygen(result.session_id, partyId);
}
// 立即检查会话状态并触发 keygen不再延迟
setImmediate(() => {
checkAndTriggerKeygen(result.session_id);
}, 1000);
});
} else {
console.warn('Initiator failed to join session');
}

View File

@ -71,6 +71,8 @@ export class TSSHandler extends EventEmitter {
payload: Buffer;
}> = [];
private isProcessReady = false;
// 是否已预订阅消息(用于在 keygen 启动前就开始缓冲消息)
private isPrepared = false;
constructor(grpcClient: GrpcClient, database?: DatabaseManager) {
super();
@ -113,6 +115,51 @@ export class TSSHandler extends EventEmitter {
return binaryName;
}
/**
* - joinSession
*
*
* @param sessionId ID
* @param partyId party ID
*/
prepareForKeygen(sessionId: string, partyId: string): void {
if (this.isPrepared) {
console.log('[TSS] Already prepared for keygen, skip');
return;
}
console.log(`[TSS] Preparing for keygen: session=${sessionId.substring(0, 8)}..., party=${partyId.substring(0, 8)}...`);
this.sessionId = sessionId;
this.partyId = partyId;
this.isPrepared = true;
this.messageBuffer = [];
// 立即订阅消息流,开始缓冲消息
// 使用 bound 方法确保 this 上下文正确
this.grpcClient.on('mpcMessage', this.handleIncomingMessage.bind(this));
this.grpcClient.subscribeMessages(sessionId, partyId);
console.log('[TSS] Message subscription started, buffering enabled');
}
/**
*
*/
cancelPrepare(): void {
if (!this.isPrepared) {
return;
}
console.log('[TSS] Canceling prepare');
this.isPrepared = false;
this.messageBuffer = [];
this.grpcClient.removeAllListeners('mpcMessage');
this.grpcClient.unsubscribeMessages();
this.sessionId = null;
this.partyId = null;
}
/**
* Keygen
*/
@ -128,13 +175,19 @@ export class TSSHandler extends EventEmitter {
throw new Error('TSS protocol already running');
}
// 检查是否已经预订阅
const wasPrepared = this.isPrepared && this.sessionId === sessionId;
const bufferedCount = this.messageBuffer.length;
console.log(`[TSS] Starting keygen: wasPrepared=${wasPrepared}, bufferedMessages=${bufferedCount}`);
this.sessionId = sessionId;
this.partyId = partyId;
this.partyIndex = partyIndex;
this.participants = participants;
this.isRunning = true;
this.isProcessReady = false;
this.messageBuffer = []; // 清空消息缓冲
// 注意:不清空消息缓冲,保留预订阅阶段收到的消息
// 构建 party index map
this.partyIndexMap.clear();
@ -163,9 +216,15 @@ export class TSSHandler extends EventEmitter {
let resultData = '';
// 先订阅消息(可能在进程就绪前就收到消息,会被缓冲)
this.grpcClient.on('mpcMessage', this.handleIncomingMessage.bind(this));
this.grpcClient.subscribeMessages(sessionId, partyId);
// 如果没有预订阅,现在订阅消息
// 如果已经预订阅,消息监听器已经注册,不需要重复注册
if (!wasPrepared) {
console.log('[TSS] Subscribing to messages (not prepared before)');
this.grpcClient.on('mpcMessage', this.handleIncomingMessage.bind(this));
this.grpcClient.subscribeMessages(sessionId, partyId);
} else {
console.log(`[TSS] Using existing subscription, ${bufferedCount} messages buffered`);
}
// 处理标准输出 (JSON 消息)
this.tssProcess.stdout?.on('data', (data: Buffer) => {
@ -203,6 +262,7 @@ export class TSSHandler extends EventEmitter {
const completedSessionId = this.sessionId;
this.isRunning = false;
this.isProcessReady = false;
this.isPrepared = false;
this.messageBuffer = [];
this.tssProcess = null;
// 清理消息监听器,防止下次 keygen 时重复注册
@ -237,6 +297,7 @@ export class TSSHandler extends EventEmitter {
this.tssProcess.on('error', (err) => {
this.isRunning = false;
this.isProcessReady = false;
this.isPrepared = false;
this.messageBuffer = [];
this.tssProcess = null;
// 清理消息监听器
@ -246,6 +307,7 @@ export class TSSHandler extends EventEmitter {
} catch (err) {
this.isRunning = false;
this.isPrepared = false;
reject(err);
}
});
@ -296,6 +358,11 @@ export class TSSHandler extends EventEmitter {
/**
* gRPC MPC
*
*
* 1. isPrepared=true, isRunning=false:
* 2. isPrepared=true, isRunning=true, isProcessReady=false:
* 3. isPrepared=true, isRunning=true, isProcessReady=true:
*/
private handleIncomingMessage(message: {
messageId: string;
@ -311,16 +378,22 @@ export class TSSHandler extends EventEmitter {
}
}
// 如果进程未就绪,缓冲消息
if (!this.isProcessReady || !this.tssProcess || !this.tssProcess.stdin) {
if (this.isRunning) {
console.log(`[TSS] Buffering message from ${message.fromParty.substring(0, 8)}... (process not ready)`);
this.messageBuffer.push(message);
}
// 如果进程就绪,直接发送
if (this.isProcessReady && this.tssProcess?.stdin) {
this.sendMessageToProcess(message);
return;
}
this.sendMessageToProcess(message);
// 如果已预订阅或正在运行,缓冲消息
// 这确保在任何阶段收到的消息都不会丢失
if (this.isPrepared || this.isRunning) {
console.log(`[TSS] Buffering message from ${message.fromParty.substring(0, 8)}... (prepared=${this.isPrepared}, running=${this.isRunning}, ready=${this.isProcessReady})`);
this.messageBuffer.push(message);
return;
}
// 既没预订阅也没运行,忽略消息
console.log(`[TSS] Ignoring message from ${message.fromParty.substring(0, 8)}... (not prepared)`);
}
/**
@ -383,6 +456,7 @@ export class TSSHandler extends EventEmitter {
}
this.isRunning = false;
this.isProcessReady = false;
this.isPrepared = false;
this.messageBuffer = [];
this.grpcClient.removeAllListeners('mpcMessage');
}
@ -393,6 +467,13 @@ export class TSSHandler extends EventEmitter {
getIsRunning(): boolean {
return this.isRunning;
}
/**
*
*/
getIsPrepared(): boolean {
return this.isPrepared;
}
}
/**