diff --git a/backend/mpc-system/services/server-party/adapters/output/grpc/message_router_client.go b/backend/mpc-system/services/server-party/adapters/output/grpc/message_router_client.go index 116123a2..1d7a724d 100644 --- a/backend/mpc-system/services/server-party/adapters/output/grpc/message_router_client.go +++ b/backend/mpc-system/services/server-party/adapters/output/grpc/message_router_client.go @@ -385,7 +385,7 @@ func (c *MessageRouterClient) UpdateNotificationChannels( return c.RegisterPartyWithNotification(ctx, partyID, partyRole, version, notification) } -// SubscribeSessionEvents subscribes to session lifecycle events +// SubscribeSessionEvents subscribes to session lifecycle events with auto-reconnect func (c *MessageRouterClient) SubscribeSessionEvents( ctx context.Context, partyID string, @@ -396,7 +396,7 @@ func (c *MessageRouterClient) SubscribeSessionEvents( EventTypes: []string{}, // Subscribe to all event types } - // Create a streaming connection + // Create initial streaming connection stream, err := c.createSessionEventStream(ctx, req) if err != nil { logger.Error("Failed to subscribe to session events", @@ -408,8 +408,12 @@ func (c *MessageRouterClient) SubscribeSessionEvents( logger.Info("Subscribed to session events", zap.String("party_id", partyID)) - // Start goroutine to receive events + // Start goroutine to receive events with auto-reconnect go func() { + currentStream := stream + reconnectBackoff := time.Second // Start with 1 second backoff + maxBackoff := 30 * time.Second + for { select { case <-ctx.Done(): @@ -418,27 +422,60 @@ func (c *MessageRouterClient) SubscribeSessionEvents( return default: event := &router.SessionEvent{} - err := stream.RecvMsg(event) + err := currentStream.RecvMsg(event) if err == io.EOF { - logger.Info("Session event stream ended", + logger.Warn("Session event stream ended, reconnecting...", zap.String("party_id", partyID)) - return - } - if err != nil { - logger.Error("Error receiving session event", + } else if err != nil { + logger.Warn("Error receiving session event, reconnecting...", zap.Error(err), zap.String("party_id", partyID)) - return + } else { + // Successfully received event, reset backoff + reconnectBackoff = time.Second + + logger.Info("Received session event", + zap.String("event_type", event.EventType), + zap.String("session_id", event.SessionId), + zap.String("party_id", partyID)) + + // Call event handler + if eventHandler != nil { + eventHandler(event) + } + continue } - logger.Info("Received session event", - zap.String("event_type", event.EventType), - zap.String("session_id", event.SessionId), - zap.String("party_id", partyID)) + // Reconnect with exponential backoff + for { + select { + case <-ctx.Done(): + return + case <-time.After(reconnectBackoff): + logger.Info("Attempting to reconnect session event stream", + zap.String("party_id", partyID), + zap.Duration("backoff", reconnectBackoff)) - // Call event handler - if eventHandler != nil { - eventHandler(event) + newStream, err := c.createSessionEventStream(ctx, req) + if err != nil { + logger.Error("Failed to reconnect session event stream", + zap.Error(err), + zap.String("party_id", partyID)) + // Increase backoff for next attempt + reconnectBackoff = reconnectBackoff * 2 + if reconnectBackoff > maxBackoff { + reconnectBackoff = maxBackoff + } + continue + } + + logger.Info("Successfully reconnected to session events", + zap.String("party_id", partyID)) + currentStream = newStream + reconnectBackoff = time.Second // Reset backoff on success + break + } + break } } }