fix(grpc): prevent stream race condition from triggering reconnection
When switching message/event streams, the old stream's 'end' or 'error' events could fire after the new stream was created. Since activeMessageSubscription was already updated to the new session, the old stream's events would incorrectly trigger reconnection, causing TSS message routing to fail. Fix: - Remove event listeners from old stream before canceling - Use closure to capture current stream reference - Check if event is from current active stream before triggering reconnect This fixes the "Not connected" error during co-sign TSS message routing. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
1708a03aaf
commit
b231667aba
|
|
@ -473,10 +473,12 @@ export class GrpcClient extends EventEmitter {
|
|||
// 标记已订阅(用于重连后恢复)
|
||||
this.eventStreamSubscribed = true;
|
||||
|
||||
// 取消现有流
|
||||
// 取消现有流 - 先移除事件监听器再取消,防止旧流的 end 事件触发重连
|
||||
if (this.eventStream) {
|
||||
const oldStream = this.eventStream;
|
||||
oldStream.removeAllListeners();
|
||||
try {
|
||||
this.eventStream.cancel();
|
||||
oldStream.cancel();
|
||||
} catch (e) {
|
||||
// 忽略
|
||||
}
|
||||
|
|
@ -485,6 +487,9 @@ export class GrpcClient extends EventEmitter {
|
|||
this.eventStream = (this.client as grpc.Client & { subscribeSessionEvents: (req: unknown) => grpc.ClientReadableStream<SessionEvent> })
|
||||
.subscribeSessionEvents({ party_id: partyId });
|
||||
|
||||
// 保存当前流的引用,用于在事件处理器中检查是否是当前活跃的流
|
||||
const currentStream = this.eventStream;
|
||||
|
||||
this.eventStream.on('data', (event: SessionEvent) => {
|
||||
this.emit('sessionEvent', event);
|
||||
});
|
||||
|
|
@ -493,6 +498,12 @@ export class GrpcClient extends EventEmitter {
|
|||
console.error('[gRPC] Session event stream error:', err.message);
|
||||
this.emit('streamError', err);
|
||||
|
||||
// 只有当前活跃的流才触发重连,防止旧流的事件干扰
|
||||
if (currentStream !== this.eventStream) {
|
||||
console.log('[gRPC] Ignoring error from old event stream');
|
||||
return;
|
||||
}
|
||||
|
||||
// 非主动取消的错误触发重连
|
||||
if (!err.message.includes('CANCELLED') && this.shouldReconnect) {
|
||||
this.triggerReconnect('Event stream error');
|
||||
|
|
@ -503,6 +514,12 @@ export class GrpcClient extends EventEmitter {
|
|||
console.log('[gRPC] Session event stream ended');
|
||||
this.emit('streamEnd');
|
||||
|
||||
// 只有当前活跃的流才触发重连,防止旧流的事件干扰
|
||||
if (currentStream !== this.eventStream) {
|
||||
console.log('[gRPC] Ignoring end from old event stream');
|
||||
return;
|
||||
}
|
||||
|
||||
// 流结束也触发重连
|
||||
if (this.shouldReconnect && this.eventStreamSubscribed) {
|
||||
this.triggerReconnect('Event stream ended');
|
||||
|
|
@ -536,10 +553,12 @@ export class GrpcClient extends EventEmitter {
|
|||
// 保存订阅状态(用于重连后恢复)
|
||||
this.activeMessageSubscription = { sessionId, partyId };
|
||||
|
||||
// 取消现有流
|
||||
// 取消现有流 - 先移除事件监听器再取消,防止旧流的 end 事件触发重连
|
||||
if (this.messageStream) {
|
||||
const oldStream = this.messageStream;
|
||||
oldStream.removeAllListeners();
|
||||
try {
|
||||
this.messageStream.cancel();
|
||||
oldStream.cancel();
|
||||
} catch (e) {
|
||||
// 忽略
|
||||
}
|
||||
|
|
@ -551,6 +570,9 @@ export class GrpcClient extends EventEmitter {
|
|||
party_id: partyId,
|
||||
});
|
||||
|
||||
// 保存当前流的引用,用于在事件处理器中检查是否是当前活跃的流
|
||||
const currentStream = this.messageStream;
|
||||
|
||||
this.messageStream.on('data', (message: MPCMessage) => {
|
||||
this.emit('mpcMessage', message);
|
||||
});
|
||||
|
|
@ -559,6 +581,12 @@ export class GrpcClient extends EventEmitter {
|
|||
console.error('[gRPC] Message stream error:', err.message);
|
||||
this.emit('messageStreamError', err);
|
||||
|
||||
// 只有当前活跃的流才触发重连,防止旧流的事件干扰
|
||||
if (currentStream !== this.messageStream) {
|
||||
console.log('[gRPC] Ignoring error from old message stream');
|
||||
return;
|
||||
}
|
||||
|
||||
// 非主动取消的错误触发重连
|
||||
if (!err.message.includes('CANCELLED') && this.shouldReconnect && this.activeMessageSubscription) {
|
||||
this.triggerReconnect('Message stream error');
|
||||
|
|
@ -569,6 +597,12 @@ export class GrpcClient extends EventEmitter {
|
|||
console.log('[gRPC] Message stream ended');
|
||||
this.emit('messageStreamEnd');
|
||||
|
||||
// 只有当前活跃的流才触发重连,防止旧流的事件干扰
|
||||
if (currentStream !== this.messageStream) {
|
||||
console.log('[gRPC] Ignoring end from old message stream');
|
||||
return;
|
||||
}
|
||||
|
||||
// 流结束也触发重连
|
||||
if (this.shouldReconnect && this.activeMessageSubscription) {
|
||||
this.triggerReconnect('Message stream ended');
|
||||
|
|
|
|||
Loading…
Reference in New Issue