fix(grpc-client): add connection check and better error handling in subscribeMessages
Additional safeguards to prevent "CANCELLED: Cancelled on client" error: 1. Add `this.connected` check at the start of subscribeMessages() 2. Set messageStream to null after canceling old stream 3. Wrap new stream creation in try-catch to handle creation errors 4. Add logging for ignored cancel errors These changes ensure that: - subscribeMessages won't proceed if connection is lost - Old stream is fully cleaned up before creating new one - Errors during stream creation are properly caught and logged 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
66a718ea72
commit
ff995a827b
|
|
@ -550,6 +550,10 @@ export class GrpcClient extends EventEmitter {
|
|||
throw new Error('Not connected');
|
||||
}
|
||||
|
||||
if (!this.connected) {
|
||||
throw new Error('gRPC client not connected');
|
||||
}
|
||||
|
||||
// 保存订阅状态(用于重连后恢复)
|
||||
this.activeMessageSubscription = { sessionId, partyId };
|
||||
|
||||
|
|
@ -560,15 +564,22 @@ export class GrpcClient extends EventEmitter {
|
|||
try {
|
||||
oldStream.cancel();
|
||||
} catch (e) {
|
||||
// 忽略
|
||||
console.log('[gRPC] Ignored error while canceling old message stream:', (e as Error).message);
|
||||
}
|
||||
this.messageStream = null;
|
||||
}
|
||||
|
||||
this.messageStream = (this.client as grpc.Client & { subscribeMessages: (req: unknown) => grpc.ClientReadableStream<MPCMessage> })
|
||||
.subscribeMessages({
|
||||
session_id: sessionId,
|
||||
party_id: partyId,
|
||||
});
|
||||
try {
|
||||
this.messageStream = (this.client as grpc.Client & { subscribeMessages: (req: unknown) => grpc.ClientReadableStream<MPCMessage> })
|
||||
.subscribeMessages({
|
||||
session_id: sessionId,
|
||||
party_id: partyId,
|
||||
});
|
||||
} catch (e) {
|
||||
console.error('[gRPC] Failed to create message stream:', (e as Error).message);
|
||||
this.activeMessageSubscription = null;
|
||||
throw e;
|
||||
}
|
||||
|
||||
// 保存当前流的引用,用于在事件处理器中检查是否是当前活跃的流
|
||||
const currentStream = this.messageStream;
|
||||
|
|
|
|||
Loading…
Reference in New Issue