From 0ca37ee76a0246b36b0ecacb8282d00568bff1a5 Mon Sep 17 00:00:00 2001 From: hailin Date: Mon, 29 Dec 2025 07:37:03 -0800 Subject: [PATCH] =?UTF-8?q?feat(mpc-system):=20=E5=A2=9E=E5=BC=BA=E8=BF=9E?= =?UTF-8?q?=E6=8E=A5=E5=8F=AF=E9=9D=A0=E6=80=A7=E5=92=8C=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E5=8E=BB=E9=87=8D=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 后端改进: - SessionEventBroadcaster: 重连时自动关闭旧 channel 防止内存泄漏 - MessageBroker: 重连时关闭旧的 party/session channel - SubscribeMessages: 订阅时自动发送数据库中的 pending 消息 客户端改进: - GrpcClient: 添加自动重连机制(指数退避,最多10次) - GrpcClient: 断开/重连/失败事件通知前端 - TSSHandler: 消息缓冲机制,进程启动前缓存收到的消息 - TSSHandler: 客户端本地消息去重,防止重连后重复处理 - Database: 添加 processed_messages 表和相关操作方法 - Main: Keygen 幂等性保护,防止重复触发 - Main: 会话事件缓存,解决前端订阅时序问题 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../input/grpc/message_grpc_handler.go | 40 +++ .../adapters/output/memory/message_broker.go | 24 +- .../domain/session_event_broadcaster.go | 7 + .../service-party-app/electron/main.ts | 79 +++++- .../electron/modules/database.ts | 66 +++++ .../electron/modules/grpc-client.ts | 258 +++++++++++++++++- .../electron/modules/tss-handler.ts | 107 +++++++- 7 files changed, 549 insertions(+), 32 deletions(-) diff --git a/backend/mpc-system/services/message-router/adapters/input/grpc/message_grpc_handler.go b/backend/mpc-system/services/message-router/adapters/input/grpc/message_grpc_handler.go index b1e41943..57cd95bd 100644 --- a/backend/mpc-system/services/message-router/adapters/input/grpc/message_grpc_handler.go +++ b/backend/mpc-system/services/message-router/adapters/input/grpc/message_grpc_handler.go @@ -87,12 +87,49 @@ func (s *MessageRouterServer) RouteMessage( } // SubscribeMessages subscribes to messages for a party (streaming) +// On subscription, it first sends any pending messages from the database +// to ensure no messages are lost during reconnection func (s *MessageRouterServer) SubscribeMessages( req *pb.SubscribeMessagesRequest, stream pb.MessageRouter_SubscribeMessagesServer, ) error { ctx := stream.Context() + logger.Info("Party subscribing to messages", + zap.String("session_id", req.SessionId), + zap.String("party_id", req.PartyId)) + + // First, send any pending messages from the database (message recovery on reconnect) + if s.getPendingMessagesUC != nil && req.SessionId != "" { + input := use_cases.GetPendingMessagesInput{ + SessionID: req.SessionId, + PartyID: req.PartyId, + AfterTimestamp: 0, // Get all pending messages + } + + pendingMessages, err := s.getPendingMessagesUC.Execute(ctx, input) + if err != nil { + logger.Warn("Failed to get pending messages on subscribe", + zap.String("session_id", req.SessionId), + zap.String("party_id", req.PartyId), + zap.Error(err)) + } else if len(pendingMessages) > 0 { + logger.Info("Sending pending messages on subscribe", + zap.String("session_id", req.SessionId), + zap.String("party_id", req.PartyId), + zap.Int("count", len(pendingMessages))) + + for _, msg := range pendingMessages { + if err := sendMessage(stream, msg); err != nil { + logger.Error("Failed to send pending message", + zap.String("message_id", msg.ID), + zap.Error(err)) + return err + } + } + } + } + // Subscribe to party messages partyCh, err := s.messageBroker.SubscribeToPartyMessages(ctx, req.PartyId) if err != nil { @@ -109,6 +146,9 @@ func (s *MessageRouterServer) SubscribeMessages( for { select { case <-ctx.Done(): + logger.Info("Party unsubscribed from messages", + zap.String("session_id", req.SessionId), + zap.String("party_id", req.PartyId)) return nil case msg, ok := <-partyCh: if !ok { diff --git a/backend/mpc-system/services/message-router/adapters/output/memory/message_broker.go b/backend/mpc-system/services/message-router/adapters/output/memory/message_broker.go index 078d1d1f..991f98ce 100644 --- a/backend/mpc-system/services/message-router/adapters/output/memory/message_broker.go +++ b/backend/mpc-system/services/message-router/adapters/output/memory/message_broker.go @@ -116,6 +116,7 @@ func (a *MessageBrokerAdapter) PublishToSession( } // SubscribeToPartyMessages subscribes to messages for a specific party +// If the party already has an active subscription, the old channel is closed first func (a *MessageBrokerAdapter) SubscribeToPartyMessages( ctx context.Context, partyID string, @@ -123,11 +124,15 @@ func (a *MessageBrokerAdapter) SubscribeToPartyMessages( a.mu.Lock() defer a.mu.Unlock() - // Create channel if not exists - if _, exists := a.partyChannels[partyID]; !exists { - a.partyChannels[partyID] = make(chan *entities.MessageDTO, 100) + // Close existing channel if party is re-subscribing (e.g., after reconnect) + if oldCh, exists := a.partyChannels[partyID]; exists { + close(oldCh) + logger.Info("closed existing party channel for re-subscription", + zap.String("party_id", partyID)) } + // Create new channel + a.partyChannels[partyID] = make(chan *entities.MessageDTO, 100) ch := a.partyChannels[partyID] // Return a read-only channel @@ -155,6 +160,7 @@ func (a *MessageBrokerAdapter) SubscribeToPartyMessages( } // SubscribeToSessionMessages subscribes to all messages in a session +// If the party already has an active subscription for this session, the old channel is closed first func (a *MessageBrokerAdapter) SubscribeToSessionMessages( ctx context.Context, sessionID string, @@ -171,14 +177,18 @@ func (a *MessageBrokerAdapter) SubscribeToSessionMessages( zap.String("key", key), zap.Int("current_channel_count", len(a.sessionChannels))) - // Create channel if not exists - if _, exists := a.sessionChannels[key]; !exists { - a.sessionChannels[key] = make(chan *entities.MessageDTO, 100) - logger.Info("Created new session channel", + // Close existing channel if party is re-subscribing (e.g., after reconnect) + if oldCh, exists := a.sessionChannels[key]; exists { + close(oldCh) + logger.Info("closed existing session channel for re-subscription", zap.String("key", key)) } + // Create new channel + a.sessionChannels[key] = make(chan *entities.MessageDTO, 100) ch := a.sessionChannels[key] + logger.Info("Created new session channel", + zap.String("key", key)) // Return a read-only channel out := make(chan *entities.MessageDTO, 100) diff --git a/backend/mpc-system/services/message-router/domain/session_event_broadcaster.go b/backend/mpc-system/services/message-router/domain/session_event_broadcaster.go index 8614e331..76b118b8 100644 --- a/backend/mpc-system/services/message-router/domain/session_event_broadcaster.go +++ b/backend/mpc-system/services/message-router/domain/session_event_broadcaster.go @@ -20,10 +20,17 @@ func NewSessionEventBroadcaster() *SessionEventBroadcaster { } // Subscribe subscribes a party to session events +// If the party already has an active subscription, the old channel is closed first +// to prevent memory leaks and ensure clean reconnection func (b *SessionEventBroadcaster) Subscribe(partyID string) <-chan *pb.SessionEvent { b.mu.Lock() defer b.mu.Unlock() + // Close existing channel if party is re-subscribing (e.g., after reconnect) + if oldCh, exists := b.subscribers[partyID]; exists { + close(oldCh) + } + // Create buffered channel for this subscriber ch := make(chan *pb.SessionEvent, 100) b.subscribers[partyID] = ch diff --git a/backend/mpc-system/services/service-party-app/electron/main.ts b/backend/mpc-system/services/service-party-app/electron/main.ts index 8a0efac2..52c24219 100644 --- a/backend/mpc-system/services/service-party-app/electron/main.ts +++ b/backend/mpc-system/services/service-party-app/electron/main.ts @@ -72,6 +72,9 @@ interface ActiveKeygenSession { } let activeKeygenSession: ActiveKeygenSession | null = null; +// Keygen 幂等性保护:追踪正在进行的 keygen 会话 ID +let keygenInProgressSessionId: string | null = null; + // 会话事件缓存 - 解决前端订阅时可能错过事件的时序问题 // 当事件到达时,前端可能还在页面导航中,尚未订阅 interface SessionEventData { @@ -242,7 +245,9 @@ function getOrCreatePartyId(db: DatabaseManager): string { // 生成一个新的 UUID 作为 partyId partyId = crypto.randomUUID(); db.setSetting('party_id', partyId); - console.log('Generated new partyId:', partyId); + debugLog.info('main', `Generated new partyId: ${partyId}`); + } else { + debugLog.info('main', `Loaded existing partyId: ${partyId}`); } return partyId; } @@ -251,17 +256,24 @@ function getOrCreatePartyId(db: DatabaseManager): string { async function initServices() { // 初始化数据库 (必须首先初始化) database = new DatabaseManager(); + // 等待数据库初始化完成(加载 WASM 和创建表) + await database.waitForReady(); + debugLog.info('main', 'Database initialized'); // 初始化 gRPC 客户端 grpcClient = new GrpcClient(); + // 清理过期的已处理消息记录(防止数据库膨胀) + database.cleanupOldProcessedMessages(); + debugLog.debug('main', 'Cleaned up old processed messages'); + // 初始化 TSS Handler if (USE_MOCK_TSS) { debugLog.info('tss', 'Using Mock TSS Handler (development mode)'); tssHandler = new MockTSSHandler(grpcClient); } else { debugLog.info('tss', 'Using real TSS Handler'); - tssHandler = new TSSHandler(grpcClient); + tssHandler = new TSSHandler(grpcClient, database); } // 设置 TSS 进度事件监听 @@ -324,6 +336,18 @@ async function handleSessionStart(event: { return; } + // 幂等性保护:检查是否已经在执行 keygen + if (keygenInProgressSessionId === event.sessionId) { + debugLog.debug('main', `Keygen already in progress for session ${event.sessionId}, skipping duplicate trigger`); + return; + } + + // 再次检查 TSS 是否在运行(双重保护) + if (tssHandler?.getIsRunning()) { + debugLog.debug('main', 'TSS already running, skipping'); + return; + } + if (!tssHandler) { debugLog.error('tss', 'TSS handler not initialized'); mainWindow?.webContents.send(`session:events:${event.sessionId}`, { @@ -333,6 +357,9 @@ async function handleSessionStart(event: { return; } + // 标记 keygen 开始 + keygenInProgressSessionId = event.sessionId; + // 从事件中更新参与者列表(如果事件包含完整列表) if (event.selectedParties && event.selectedParties.length > 0) { const myPartyId = grpcClient?.getPartyId(); @@ -384,6 +411,8 @@ async function handleSessionStart(event: { type: 'failed', error: result.error || 'Keygen failed', }); + // 清除幂等性标志 + keygenInProgressSessionId = null; } } catch (error) { debugLog.error('tss', `Keygen error: ${(error as Error).message}`); @@ -391,6 +420,8 @@ async function handleSessionStart(event: { type: 'failed', error: (error as Error).message, }); + // 清除幂等性标志 + keygenInProgressSessionId = null; } } @@ -443,8 +474,9 @@ async function handleKeygenComplete(result: KeygenResult) { allCompleted: allCompleted, }); - // 4. 清理活跃会话 + // 4. 清理活跃会话和幂等性标志 activeKeygenSession = null; + keygenInProgressSessionId = null; debugLog.info('main', 'Keygen session completed and cleaned up'); } catch (error) { @@ -453,6 +485,8 @@ async function handleKeygenComplete(result: KeygenResult) { type: 'failed', error: (error as Error).message, }); + // 清除幂等性标志 + keygenInProgressSessionId = null; } } @@ -481,6 +515,22 @@ async function connectAndRegisterToMessageRouter() { grpcClient.subscribeSessionEvents(partyId); debugLog.info('grpc', 'Subscribed to session events'); + // 监听连接状态变化 + grpcClient.on('disconnected', (reason: string) => { + debugLog.warn('grpc', `Disconnected from Message Router: ${reason}`); + mainWindow?.webContents.send('grpc:connectionStatus', { connected: false, reason }); + }); + + grpcClient.on('reconnected', () => { + debugLog.info('grpc', 'Reconnected to Message Router'); + mainWindow?.webContents.send('grpc:connectionStatus', { connected: true }); + }); + + grpcClient.on('reconnectFailed', (reason: string) => { + debugLog.error('grpc', `Failed to reconnect: ${reason}`); + mainWindow?.webContents.send('grpc:connectionStatus', { connected: false, error: reason }); + }); + // 监听会话事件并处理 grpcClient.on('sessionEvent', async (event: { eventId: string; @@ -1345,6 +1395,29 @@ function setupIpcHandlers() { ipcMain.on('debug:log', (_event, { level, source, message }) => { sendDebugLog(level as LogLevel, source as LogSource, message); }); + + // =========================================================================== + // 会话事件订阅(带缓存事件发送) + // =========================================================================== + + // 前端订阅会话事件时,立即发送缓存的事件 + ipcMain.on('grpc:subscribeSessionEvents', (_event, { sessionId }) => { + debugLog.debug('main', `Frontend subscribing to session events: ${sessionId}`); + + // 获取并发送缓存的事件 + const cachedEvents = getAndClearCachedEvents(sessionId); + if (cachedEvents.length > 0) { + debugLog.info('main', `Sending ${cachedEvents.length} cached events to frontend for session ${sessionId}`); + for (const event of cachedEvents) { + mainWindow?.webContents.send(`session:events:${sessionId}`, event); + } + } + }); + + // 前端取消订阅 + ipcMain.on('grpc:unsubscribeSessionEvents', (_event, { sessionId }) => { + debugLog.debug('main', `Frontend unsubscribing from session events: ${sessionId}`); + }); } // 应用生命周期 diff --git a/backend/mpc-system/services/service-party-app/electron/modules/database.ts b/backend/mpc-system/services/service-party-app/electron/modules/database.ts index e2a1d996..4498733d 100644 --- a/backend/mpc-system/services/service-party-app/electron/modules/database.ts +++ b/backend/mpc-system/services/service-party-app/electron/modules/database.ts @@ -156,6 +156,13 @@ export class DatabaseManager { await this.initPromise; } + /** + * 等待数据库初始化完成(公开方法) + */ + async waitForReady(): Promise { + await this.initPromise; + } + /** * 创建表结构 */ @@ -213,12 +220,22 @@ export class DatabaseManager { ) `); + // 已处理消息表 - 用于消息去重,防止重连后重复处理消息 + this.db.run(` + CREATE TABLE IF NOT EXISTS processed_messages ( + message_id TEXT PRIMARY KEY, + session_id TEXT NOT NULL, + processed_at TEXT NOT NULL + ) + `); + // 创建索引 this.db.run(`CREATE INDEX IF NOT EXISTS idx_shares_session ON shares(session_id)`); this.db.run(`CREATE INDEX IF NOT EXISTS idx_addresses_share ON derived_addresses(share_id)`); this.db.run(`CREATE INDEX IF NOT EXISTS idx_addresses_chain ON derived_addresses(chain)`); this.db.run(`CREATE INDEX IF NOT EXISTS idx_history_share ON signing_history(share_id)`); this.db.run(`CREATE INDEX IF NOT EXISTS idx_history_status ON signing_history(status)`); + this.db.run(`CREATE INDEX IF NOT EXISTS idx_processed_messages_session ON processed_messages(session_id)`); // 插入默认设置 this.db.run(`INSERT OR IGNORE INTO settings (key, value) VALUES (?, ?)`, ['message_router_url', 'mpc-grpc.szaiai.com:443']); @@ -610,6 +627,55 @@ export class DatabaseManager { return settings; } + // =========================================================================== + // 消息去重操作 + // =========================================================================== + + /** + * 检查消息是否已处理 + */ + isMessageProcessed(messageId: string): boolean { + const row = this.queryOne<{ message_id: string }>( + `SELECT message_id FROM processed_messages WHERE message_id = ?`, + [messageId] + ); + return !!row; + } + + /** + * 标记消息为已处理 + */ + markMessageProcessed(messageId: string, sessionId: string): void { + if (!this.db) return; + const now = new Date().toISOString(); + this.db.run( + `INSERT OR IGNORE INTO processed_messages (message_id, session_id, processed_at) VALUES (?, ?, ?)`, + [messageId, sessionId, now] + ); + this.saveToFile(); + } + + /** + * 清理指定会话的已处理消息记录 + * 当会话完成后调用,释放空间 + */ + clearProcessedMessages(sessionId: string): void { + if (!this.db) return; + this.db.run(`DELETE FROM processed_messages WHERE session_id = ?`, [sessionId]); + this.saveToFile(); + } + + /** + * 清理过期的已处理消息记录(超过24小时) + * 可在应用启动时调用 + */ + cleanupOldProcessedMessages(): void { + if (!this.db) return; + const cutoff = new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString(); + this.db.run(`DELETE FROM processed_messages WHERE processed_at < ?`, [cutoff]); + this.saveToFile(); + } + // =========================================================================== // 导入导出 // =========================================================================== diff --git a/backend/mpc-system/services/service-party-app/electron/modules/grpc-client.ts b/backend/mpc-system/services/service-party-app/electron/modules/grpc-client.ts index 3c52c9ba..9348317f 100644 --- a/backend/mpc-system/services/service-party-app/electron/modules/grpc-client.ts +++ b/backend/mpc-system/services/service-party-app/electron/modules/grpc-client.ts @@ -98,23 +98,62 @@ interface GetRegisteredPartiesResponse { parties: RegisteredParty[]; } +// 重连配置 +interface ReconnectConfig { + maxRetries: number; + initialDelayMs: number; + maxDelayMs: number; + backoffMultiplier: number; +} + +const DEFAULT_RECONNECT_CONFIG: ReconnectConfig = { + maxRetries: 10, + initialDelayMs: 1000, + maxDelayMs: 30000, + backoffMultiplier: 2, +}; + /** * gRPC 客户端 - 连接到 Message Router * * 连接地址格式: * - 开发环境: localhost:50051 (不加密) * - 生产环境: mpc-grpc.szaiai.com:443 (TLS 加密) + * + * 特性: + * - 自动重连机制(指数退避) + * - 事件流断开后自动重新订阅 + * - 心跳失败后自动重连 */ export class GrpcClient extends EventEmitter { private client: grpc.Client | null = null; private connected = false; private partyId: string | null = null; + private partyRole: string | null = null; private heartbeatInterval: NodeJS.Timeout | null = null; private messageStream: grpc.ClientReadableStream | null = null; private eventStream: grpc.ClientReadableStream | null = null; - constructor() { + // 重连相关 + private reconnectConfig: ReconnectConfig; + private currentAddress: string | null = null; + private currentUseTLS: boolean | undefined; + private isReconnecting = false; + private reconnectAttempts = 0; + private reconnectTimeout: NodeJS.Timeout | null = null; + private shouldReconnect = true; + + // 消息流状态(用于重连后恢复) + private activeMessageSubscription: { sessionId: string; partyId: string } | null = null; + private eventStreamSubscribed = false; + + // 心跳失败计数 + private heartbeatFailCount = 0; + private readonly MAX_HEARTBEAT_FAILS = 3; + + constructor(reconnectConfig?: Partial) { super(); + this.reconnectConfig = { ...DEFAULT_RECONNECT_CONFIG, ...reconnectConfig }; } /** @@ -123,6 +162,15 @@ export class GrpcClient extends EventEmitter { * @param useTLS 是否使用 TLS 加密 (默认: 自动检测,端口 443 使用 TLS) */ async connect(address: string, useTLS?: boolean): Promise { + // 保存连接参数用于重连 + this.currentAddress = address; + this.currentUseTLS = useTLS; + this.shouldReconnect = true; + + return this.doConnect(address, useTLS); + } + + private async doConnect(address: string, useTLS?: boolean): Promise { return new Promise((resolve, reject) => { const definition = loadProtoDefinition(); const proto = grpc.loadPackageDefinition(definition) as ProtoPackage; @@ -148,7 +196,7 @@ export class GrpcClient extends EventEmitter { ? grpc.credentials.createSsl() // TLS 加密 (生产环境) : grpc.credentials.createInsecure(); // 不加密 (开发环境) - console.log(`Connecting to Message Router: ${targetAddress} (TLS: ${shouldUseTLS})`); + console.log(`[gRPC] Connecting to Message Router: ${targetAddress} (TLS: ${shouldUseTLS})`); this.client = new MessageRouter( targetAddress, @@ -165,6 +213,10 @@ export class GrpcClient extends EventEmitter { reject(err); } else { this.connected = true; + this.reconnectAttempts = 0; // 重置重连计数 + this.heartbeatFailCount = 0; + console.log('[gRPC] Connected successfully'); + this.emit('connected'); resolve(); } }); @@ -172,31 +224,121 @@ export class GrpcClient extends EventEmitter { } /** - * 断开连接 + * 断开连接(不会自动重连) */ disconnect(): void { + this.shouldReconnect = false; + this.cleanupConnection(); + } + + /** + * 清理连接资源 + */ + private cleanupConnection(): void { + if (this.reconnectTimeout) { + clearTimeout(this.reconnectTimeout); + this.reconnectTimeout = null; + } + if (this.heartbeatInterval) { clearInterval(this.heartbeatInterval); this.heartbeatInterval = null; } if (this.messageStream) { - this.messageStream.cancel(); + try { + this.messageStream.cancel(); + } catch (e) { + // 忽略取消错误 + } this.messageStream = null; } if (this.eventStream) { - this.eventStream.cancel(); + try { + this.eventStream.cancel(); + } catch (e) { + // 忽略取消错误 + } this.eventStream = null; } if (this.client) { - (this.client as grpc.Client & { close: () => void }).close(); + try { + (this.client as grpc.Client & { close: () => void }).close(); + } catch (e) { + // 忽略关闭错误 + } this.client = null; } this.connected = false; - this.partyId = null; + } + + /** + * 触发重连 + */ + private async triggerReconnect(reason: string): Promise { + if (!this.shouldReconnect || this.isReconnecting || !this.currentAddress) { + return; + } + + console.log(`[gRPC] Triggering reconnect: ${reason}`); + this.isReconnecting = true; + this.connected = false; + this.emit('disconnected', reason); + + // 清理现有连接 + this.cleanupConnection(); + + // 计算延迟时间(指数退避) + const delay = Math.min( + this.reconnectConfig.initialDelayMs * Math.pow(this.reconnectConfig.backoffMultiplier, this.reconnectAttempts), + this.reconnectConfig.maxDelayMs + ); + + console.log(`[gRPC] Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts + 1}/${this.reconnectConfig.maxRetries})`); + + this.reconnectTimeout = setTimeout(async () => { + this.reconnectAttempts++; + + if (this.reconnectAttempts > this.reconnectConfig.maxRetries) { + console.error('[gRPC] Max reconnect attempts reached'); + this.isReconnecting = false; + this.emit('reconnectFailed', 'Max retries exceeded'); + return; + } + + try { + await this.doConnect(this.currentAddress!, this.currentUseTLS); + + // 重新注册 + if (this.partyId && this.partyRole) { + console.log(`[gRPC] Re-registering as party: ${this.partyId}`); + await this.registerParty(this.partyId, this.partyRole); + } + + // 重新订阅事件流 + if (this.eventStreamSubscribed && this.partyId) { + console.log('[gRPC] Re-subscribing to session events'); + this.subscribeSessionEvents(this.partyId); + } + + // 重新订阅消息流 + if (this.activeMessageSubscription) { + console.log(`[gRPC] Re-subscribing to messages for session: ${this.activeMessageSubscription.sessionId}`); + this.subscribeMessages(this.activeMessageSubscription.sessionId, this.activeMessageSubscription.partyId); + } + + this.isReconnecting = false; + this.emit('reconnected'); + } catch (err) { + console.error(`[gRPC] Reconnect attempt ${this.reconnectAttempts} failed:`, (err as Error).message); + this.isReconnecting = false; + // 继续尝试重连 + this.triggerReconnect('Previous reconnect attempt failed'); + } + }, delay); } /** @@ -236,6 +378,7 @@ export class GrpcClient extends EventEmitter { reject(new Error('Registration failed')); } else { this.partyId = partyId; + this.partyRole = role; this.startHeartbeat(); resolve(); } @@ -245,13 +388,15 @@ export class GrpcClient extends EventEmitter { } /** - * 开始心跳 + * 开始心跳(带重连逻辑) */ private startHeartbeat(): void { if (this.heartbeatInterval) { clearInterval(this.heartbeatInterval); } + this.heartbeatFailCount = 0; + this.heartbeatInterval = setInterval(() => { if (this.client && this.partyId) { (this.client as grpc.Client & { heartbeat: (req: unknown, callback: (err: Error | null) => void) => void }) @@ -259,8 +404,17 @@ export class GrpcClient extends EventEmitter { { party_id: this.partyId }, (err: Error | null) => { if (err) { - console.error('Heartbeat failed:', err.message); + this.heartbeatFailCount++; + console.error(`[gRPC] Heartbeat failed (${this.heartbeatFailCount}/${this.MAX_HEARTBEAT_FAILS}):`, err.message); this.emit('connectionError', err); + + // 连续失败多次后触发重连 + if (this.heartbeatFailCount >= this.MAX_HEARTBEAT_FAILS) { + this.triggerReconnect('Heartbeat failed'); + } + } else { + // 心跳成功,重置失败计数 + this.heartbeatFailCount = 0; } } ); @@ -296,13 +450,25 @@ export class GrpcClient extends EventEmitter { } /** - * 订阅会话事件 + * 订阅会话事件(带自动重连) */ subscribeSessionEvents(partyId: string): void { if (!this.client) { throw new Error('Not connected'); } + // 标记已订阅(用于重连后恢复) + this.eventStreamSubscribed = true; + + // 取消现有流 + if (this.eventStream) { + try { + this.eventStream.cancel(); + } catch (e) { + // 忽略 + } + } + this.eventStream = (this.client as grpc.Client & { subscribeSessionEvents: (req: unknown) => grpc.ClientReadableStream }) .subscribeSessionEvents({ party_id: partyId }); @@ -311,24 +477,61 @@ export class GrpcClient extends EventEmitter { }); this.eventStream.on('error', (err: Error) => { - console.error('Session event stream error:', err.message); + console.error('[gRPC] Session event stream error:', err.message); this.emit('streamError', err); + + // 非主动取消的错误触发重连 + if (!err.message.includes('CANCELLED') && this.shouldReconnect) { + this.triggerReconnect('Event stream error'); + } }); this.eventStream.on('end', () => { - console.log('Session event stream ended'); + console.log('[gRPC] Session event stream ended'); this.emit('streamEnd'); + + // 流结束也触发重连 + if (this.shouldReconnect && this.eventStreamSubscribed) { + this.triggerReconnect('Event stream ended'); + } }); } /** - * 订阅 MPC 消息 + * 取消订阅会话事件 + */ + unsubscribeSessionEvents(): void { + this.eventStreamSubscribed = false; + if (this.eventStream) { + try { + this.eventStream.cancel(); + } catch (e) { + // 忽略 + } + this.eventStream = null; + } + } + + /** + * 订阅 MPC 消息(带自动重连) */ subscribeMessages(sessionId: string, partyId: string): void { if (!this.client) { throw new Error('Not connected'); } + // 保存订阅状态(用于重连后恢复) + this.activeMessageSubscription = { sessionId, partyId }; + + // 取消现有流 + if (this.messageStream) { + try { + this.messageStream.cancel(); + } catch (e) { + // 忽略 + } + } + this.messageStream = (this.client as grpc.Client & { subscribeMessages: (req: unknown) => grpc.ClientReadableStream }) .subscribeMessages({ session_id: sessionId, @@ -340,16 +543,41 @@ export class GrpcClient extends EventEmitter { }); this.messageStream.on('error', (err: Error) => { - console.error('Message stream error:', err.message); + console.error('[gRPC] Message stream error:', err.message); this.emit('messageStreamError', err); + + // 非主动取消的错误触发重连 + if (!err.message.includes('CANCELLED') && this.shouldReconnect && this.activeMessageSubscription) { + this.triggerReconnect('Message stream error'); + } }); this.messageStream.on('end', () => { - console.log('Message stream ended'); + console.log('[gRPC] Message stream ended'); this.emit('messageStreamEnd'); + + // 流结束也触发重连 + if (this.shouldReconnect && this.activeMessageSubscription) { + this.triggerReconnect('Message stream ended'); + } }); } + /** + * 取消订阅 MPC 消息 + */ + unsubscribeMessages(): void { + this.activeMessageSubscription = null; + if (this.messageStream) { + try { + this.messageStream.cancel(); + } catch (e) { + // 忽略 + } + this.messageStream = null; + } + } + /** * 发送 MPC 消息 */ diff --git a/backend/mpc-system/services/service-party-app/electron/modules/tss-handler.ts b/backend/mpc-system/services/service-party-app/electron/modules/tss-handler.ts index d670c2c9..32f02a8c 100644 --- a/backend/mpc-system/services/service-party-app/electron/modules/tss-handler.ts +++ b/backend/mpc-system/services/service-party-app/electron/modules/tss-handler.ts @@ -3,6 +3,7 @@ import * as path from 'path'; import * as fs from 'fs'; import { EventEmitter } from 'events'; import { GrpcClient } from './grpc-client'; +import { DatabaseManager } from './database'; /** * TSS 协议处理结果 @@ -54,6 +55,7 @@ interface ParticipantInfo { export class TSSHandler extends EventEmitter { private tssProcess: ChildProcess | null = null; private grpcClient: GrpcClient; + private database: DatabaseManager | null = null; private sessionId: string | null = null; private partyId: string | null = null; private partyIndex: number = -1; @@ -61,9 +63,26 @@ export class TSSHandler extends EventEmitter { private partyIndexMap: Map = new Map(); private isRunning = false; - constructor(grpcClient: GrpcClient) { + // 消息缓冲:在 TSS 进程启动前缓冲收到的消息 + private messageBuffer: Array<{ + messageId: string; + fromParty: string; + isBroadcast: boolean; + payload: Buffer; + }> = []; + private isProcessReady = false; + + constructor(grpcClient: GrpcClient, database?: DatabaseManager) { super(); this.grpcClient = grpcClient; + this.database = database || null; + } + + /** + * 设置数据库管理器(用于消息去重) + */ + setDatabase(database: DatabaseManager): void { + this.database = database; } /** @@ -114,6 +133,8 @@ export class TSSHandler extends EventEmitter { this.partyIndex = partyIndex; this.participants = participants; this.isRunning = true; + this.isProcessReady = false; + this.messageBuffer = []; // 清空消息缓冲 // 构建 party index map this.partyIndexMap.clear(); @@ -142,10 +163,21 @@ export class TSSHandler extends EventEmitter { let resultData = ''; + // 先订阅消息(可能在进程就绪前就收到消息,会被缓冲) + this.grpcClient.on('mpcMessage', this.handleIncomingMessage.bind(this)); + this.grpcClient.subscribeMessages(sessionId, partyId); + // 处理标准输出 (JSON 消息) this.tssProcess.stdout?.on('data', (data: Buffer) => { const lines = data.toString().split('\n').filter(line => line.trim()); + // 收到第一条输出时,标记进程就绪并发送缓冲的消息 + if (!this.isProcessReady && this.tssProcess?.stdin) { + this.isProcessReady = true; + console.log(`[TSS] Process ready, flushing ${this.messageBuffer.length} buffered messages`); + this.flushMessageBuffer(); + } + for (const line of lines) { try { const message: TSSMessage = JSON.parse(line); @@ -168,13 +200,22 @@ export class TSSHandler extends EventEmitter { // 处理进程退出 this.tssProcess.on('close', (code) => { + const completedSessionId = this.sessionId; this.isRunning = false; + this.isProcessReady = false; + this.messageBuffer = []; this.tssProcess = null; + // 清理消息监听器,防止下次 keygen 时重复注册 + this.grpcClient.removeAllListeners('mpcMessage'); if (code === 0 && resultData) { try { const result: TSSMessage = JSON.parse(resultData); if (result.publicKey && result.encryptedShare) { + // 成功完成后清理该会话的已处理消息记录 + if (this.database && completedSessionId) { + this.database.clearProcessedMessages(completedSessionId); + } resolve({ success: true, publicKey: Buffer.from(result.publicKey, 'base64'), @@ -195,14 +236,14 @@ export class TSSHandler extends EventEmitter { // 处理进程错误 this.tssProcess.on('error', (err) => { this.isRunning = false; + this.isProcessReady = false; + this.messageBuffer = []; this.tssProcess = null; + // 清理消息监听器 + this.grpcClient.removeAllListeners('mpcMessage'); reject(err); }); - // 订阅 MPC 消息并转发给 TSS 进程 - this.grpcClient.on('mpcMessage', this.handleIncomingMessage.bind(this)); - this.grpcClient.subscribeMessages(sessionId, partyId); - } catch (err) { this.isRunning = false; reject(err); @@ -254,14 +295,44 @@ export class TSSHandler extends EventEmitter { } /** - * 处理从 gRPC 接收的 MPC 消息 + * 处理从 gRPC 接收的 MPC 消息(带去重) */ private handleIncomingMessage(message: { + messageId: string; fromParty: string; isBroadcast: boolean; payload: Buffer; }): void { - if (!this.tssProcess || !this.tssProcess.stdin) { + // 消息去重检查 + if (this.database && message.messageId) { + if (this.database.isMessageProcessed(message.messageId)) { + console.log(`[TSS] Skipping duplicate message: ${message.messageId.substring(0, 8)}...`); + return; + } + } + + // 如果进程未就绪,缓冲消息 + if (!this.isProcessReady || !this.tssProcess || !this.tssProcess.stdin) { + if (this.isRunning) { + console.log(`[TSS] Buffering message from ${message.fromParty.substring(0, 8)}... (process not ready)`); + this.messageBuffer.push(message); + } + return; + } + + this.sendMessageToProcess(message); + } + + /** + * 发送消息给 TSS 进程(并标记为已处理) + */ + private sendMessageToProcess(message: { + messageId: string; + fromParty: string; + isBroadcast: boolean; + payload: Buffer; + }): void { + if (!this.tssProcess?.stdin) { return; } @@ -280,6 +351,26 @@ export class TSSHandler extends EventEmitter { }); this.tssProcess.stdin.write(inputMessage + '\n'); + + // 标记消息为已处理(防止重连后重复处理) + if (this.database && message.messageId && this.sessionId) { + this.database.markMessageProcessed(message.messageId, this.sessionId); + } + } + + /** + * 发送缓冲的消息 + */ + private flushMessageBuffer(): void { + if (this.messageBuffer.length === 0) { + return; + } + + console.log(`[TSS] Flushing ${this.messageBuffer.length} buffered messages`); + for (const msg of this.messageBuffer) { + this.sendMessageToProcess(msg); + } + this.messageBuffer = []; } /** @@ -291,6 +382,8 @@ export class TSSHandler extends EventEmitter { this.tssProcess = null; } this.isRunning = false; + this.isProcessReady = false; + this.messageBuffer = []; this.grpcClient.removeAllListeners('mpcMessage'); }