diff --git a/backend/mpc-system/services/service-party-app/electron/modules/grpc-client.ts b/backend/mpc-system/services/service-party-app/electron/modules/grpc-client.ts index 81b58ca9..cbf80b79 100644 --- a/backend/mpc-system/services/service-party-app/electron/modules/grpc-client.ts +++ b/backend/mpc-system/services/service-party-app/electron/modules/grpc-client.ts @@ -473,10 +473,12 @@ export class GrpcClient extends EventEmitter { // 标记已订阅(用于重连后恢复) this.eventStreamSubscribed = true; - // 取消现有流 + // 取消现有流 - 先移除事件监听器再取消,防止旧流的 end 事件触发重连 if (this.eventStream) { + const oldStream = this.eventStream; + oldStream.removeAllListeners(); try { - this.eventStream.cancel(); + oldStream.cancel(); } catch (e) { // 忽略 } @@ -485,6 +487,9 @@ export class GrpcClient extends EventEmitter { this.eventStream = (this.client as grpc.Client & { subscribeSessionEvents: (req: unknown) => grpc.ClientReadableStream }) .subscribeSessionEvents({ party_id: partyId }); + // 保存当前流的引用,用于在事件处理器中检查是否是当前活跃的流 + const currentStream = this.eventStream; + this.eventStream.on('data', (event: SessionEvent) => { this.emit('sessionEvent', event); }); @@ -493,6 +498,12 @@ export class GrpcClient extends EventEmitter { console.error('[gRPC] Session event stream error:', err.message); this.emit('streamError', err); + // 只有当前活跃的流才触发重连,防止旧流的事件干扰 + if (currentStream !== this.eventStream) { + console.log('[gRPC] Ignoring error from old event stream'); + return; + } + // 非主动取消的错误触发重连 if (!err.message.includes('CANCELLED') && this.shouldReconnect) { this.triggerReconnect('Event stream error'); @@ -503,6 +514,12 @@ export class GrpcClient extends EventEmitter { console.log('[gRPC] Session event stream ended'); this.emit('streamEnd'); + // 只有当前活跃的流才触发重连,防止旧流的事件干扰 + if (currentStream !== this.eventStream) { + console.log('[gRPC] Ignoring end from old event stream'); + return; + } + // 流结束也触发重连 if (this.shouldReconnect && this.eventStreamSubscribed) { this.triggerReconnect('Event stream ended'); @@ -536,10 +553,12 @@ export class GrpcClient extends EventEmitter { // 保存订阅状态(用于重连后恢复) this.activeMessageSubscription = { sessionId, partyId }; - // 取消现有流 + // 取消现有流 - 先移除事件监听器再取消,防止旧流的 end 事件触发重连 if (this.messageStream) { + const oldStream = this.messageStream; + oldStream.removeAllListeners(); try { - this.messageStream.cancel(); + oldStream.cancel(); } catch (e) { // 忽略 } @@ -551,6 +570,9 @@ export class GrpcClient extends EventEmitter { party_id: partyId, }); + // 保存当前流的引用,用于在事件处理器中检查是否是当前活跃的流 + const currentStream = this.messageStream; + this.messageStream.on('data', (message: MPCMessage) => { this.emit('mpcMessage', message); }); @@ -559,6 +581,12 @@ export class GrpcClient extends EventEmitter { console.error('[gRPC] Message stream error:', err.message); this.emit('messageStreamError', err); + // 只有当前活跃的流才触发重连,防止旧流的事件干扰 + if (currentStream !== this.messageStream) { + console.log('[gRPC] Ignoring error from old message stream'); + return; + } + // 非主动取消的错误触发重连 if (!err.message.includes('CANCELLED') && this.shouldReconnect && this.activeMessageSubscription) { this.triggerReconnect('Message stream error'); @@ -569,6 +597,12 @@ export class GrpcClient extends EventEmitter { console.log('[gRPC] Message stream ended'); this.emit('messageStreamEnd'); + // 只有当前活跃的流才触发重连,防止旧流的事件干扰 + if (currentStream !== this.messageStream) { + console.log('[gRPC] Ignoring end from old message stream'); + return; + } + // 流结束也触发重连 if (this.shouldReconnect && this.activeMessageSubscription) { this.triggerReconnect('Message stream ended');