From be94a6ab1850358e560804e7b72d930c282e3031 Mon Sep 17 00:00:00 2001 From: hailin Date: Mon, 29 Dec 2025 01:25:30 -0800 Subject: [PATCH] =?UTF-8?q?fix(server-party):=20session=20=E4=BA=8B?= =?UTF-8?q?=E4=BB=B6=E8=AE=A2=E9=98=85=E6=96=AD=E5=BC=80=E5=90=8E=E8=87=AA?= =?UTF-8?q?=E5=8A=A8=E9=87=8D=E8=BF=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Message Router 重启后,server-party 的 gRPC stream 会断开, 之前的实现会直接退出 goroutine 导致无法收到新的 session 事件。 修改内容: - 添加自动重连逻辑,stream 断开时会尝试重新订阅 - 使用指数退避策略,从 1 秒到最大 30 秒 - 重连成功后重置退避时间 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../output/grpc/message_router_client.go | 71 ++++++++++++++----- 1 file changed, 54 insertions(+), 17 deletions(-) diff --git a/backend/mpc-system/services/server-party/adapters/output/grpc/message_router_client.go b/backend/mpc-system/services/server-party/adapters/output/grpc/message_router_client.go index 116123a2..1d7a724d 100644 --- a/backend/mpc-system/services/server-party/adapters/output/grpc/message_router_client.go +++ b/backend/mpc-system/services/server-party/adapters/output/grpc/message_router_client.go @@ -385,7 +385,7 @@ func (c *MessageRouterClient) UpdateNotificationChannels( return c.RegisterPartyWithNotification(ctx, partyID, partyRole, version, notification) } -// SubscribeSessionEvents subscribes to session lifecycle events +// SubscribeSessionEvents subscribes to session lifecycle events with auto-reconnect func (c *MessageRouterClient) SubscribeSessionEvents( ctx context.Context, partyID string, @@ -396,7 +396,7 @@ func (c *MessageRouterClient) SubscribeSessionEvents( EventTypes: []string{}, // Subscribe to all event types } - // Create a streaming connection + // Create initial streaming connection stream, err := c.createSessionEventStream(ctx, req) if err != nil { logger.Error("Failed to subscribe to session events", @@ -408,8 +408,12 @@ func (c *MessageRouterClient) SubscribeSessionEvents( logger.Info("Subscribed to session events", zap.String("party_id", partyID)) - // Start goroutine to receive events + // Start goroutine to receive events with auto-reconnect go func() { + currentStream := stream + reconnectBackoff := time.Second // Start with 1 second backoff + maxBackoff := 30 * time.Second + for { select { case <-ctx.Done(): @@ -418,27 +422,60 @@ func (c *MessageRouterClient) SubscribeSessionEvents( return default: event := &router.SessionEvent{} - err := stream.RecvMsg(event) + err := currentStream.RecvMsg(event) if err == io.EOF { - logger.Info("Session event stream ended", + logger.Warn("Session event stream ended, reconnecting...", zap.String("party_id", partyID)) - return - } - if err != nil { - logger.Error("Error receiving session event", + } else if err != nil { + logger.Warn("Error receiving session event, reconnecting...", zap.Error(err), zap.String("party_id", partyID)) - return + } else { + // Successfully received event, reset backoff + reconnectBackoff = time.Second + + logger.Info("Received session event", + zap.String("event_type", event.EventType), + zap.String("session_id", event.SessionId), + zap.String("party_id", partyID)) + + // Call event handler + if eventHandler != nil { + eventHandler(event) + } + continue } - logger.Info("Received session event", - zap.String("event_type", event.EventType), - zap.String("session_id", event.SessionId), - zap.String("party_id", partyID)) + // Reconnect with exponential backoff + for { + select { + case <-ctx.Done(): + return + case <-time.After(reconnectBackoff): + logger.Info("Attempting to reconnect session event stream", + zap.String("party_id", partyID), + zap.Duration("backoff", reconnectBackoff)) - // Call event handler - if eventHandler != nil { - eventHandler(event) + newStream, err := c.createSessionEventStream(ctx, req) + if err != nil { + logger.Error("Failed to reconnect session event stream", + zap.Error(err), + zap.String("party_id", partyID)) + // Increase backoff for next attempt + reconnectBackoff = reconnectBackoff * 2 + if reconnectBackoff > maxBackoff { + reconnectBackoff = maxBackoff + } + continue + } + + logger.Info("Successfully reconnected to session events", + zap.String("party_id", partyID)) + currentStream = newStream + reconnectBackoff = time.Second // Reset backoff on success + break + } + break } } }