From 549b21f298ec5fcec919f5cad75b4e542a83024d Mon Sep 17 00:00:00 2001 From: hailin Date: Thu, 1 Jan 2026 10:04:11 -0800 Subject: [PATCH] fix(message-router): prevent subscription race condition on gRPC reconnect MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a party re-subscribes (e.g., Android reconnects), the old gRPC stream's defer Unsubscribe() was accidentally removing the NEW subscription from the subscribers map, causing the party to miss session_started events. Fix: - Subscribe() now returns the channel to the caller - Unsubscribe() now takes the channel and only removes if it matches - This prevents older streams from removing newer subscriptions 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../input/grpc/message_grpc_handler.go | 6 ++- .../domain/session_event_broadcaster.go | 38 ++++++++++++++----- 2 files changed, 33 insertions(+), 11 deletions(-) diff --git a/backend/mpc-system/services/message-router/adapters/input/grpc/message_grpc_handler.go b/backend/mpc-system/services/message-router/adapters/input/grpc/message_grpc_handler.go index a1d0294b..f06c5b36 100644 --- a/backend/mpc-system/services/message-router/adapters/input/grpc/message_grpc_handler.go +++ b/backend/mpc-system/services/message-router/adapters/input/grpc/message_grpc_handler.go @@ -350,8 +350,10 @@ func (s *MessageRouterServer) SubscribeSessionEvents( zap.String("party_id", req.PartyId)) // Subscribe to events - eventCh := s.eventBroadcaster.Subscribe(req.PartyId) - defer s.eventBroadcaster.Unsubscribe(req.PartyId) + // The channel is used for identity check in Unsubscribe to prevent + // accidentally removing a newer subscription when this stream exits + eventCh, _ := s.eventBroadcaster.Subscribe(req.PartyId) + defer s.eventBroadcaster.Unsubscribe(req.PartyId, eventCh) // Stream events for { diff --git a/backend/mpc-system/services/message-router/domain/session_event_broadcaster.go b/backend/mpc-system/services/message-router/domain/session_event_broadcaster.go index 165e729a..7ffc2e58 100644 --- a/backend/mpc-system/services/message-router/domain/session_event_broadcaster.go +++ b/backend/mpc-system/services/message-router/domain/session_event_broadcaster.go @@ -2,6 +2,7 @@ package domain import ( "sync" + "time" pb "github.com/rwadurian/mpc-system/api/grpc/router/v1" "github.com/rwadurian/mpc-system/pkg/logger" @@ -22,32 +23,51 @@ func NewSessionEventBroadcaster() *SessionEventBroadcaster { } // Subscribe subscribes a party to session events -// If the party already has an active subscription, the old channel is closed first -// to prevent memory leaks and ensure clean reconnection -func (b *SessionEventBroadcaster) Subscribe(partyID string) <-chan *pb.SessionEvent { +// Returns the channel for receiving events and a unique subscription ID +// The subscription ID is used to safely unsubscribe without affecting newer subscriptions +func (b *SessionEventBroadcaster) Subscribe(partyID string) (<-chan *pb.SessionEvent, int64) { b.mu.Lock() defer b.mu.Unlock() // Close existing channel if party is re-subscribing (e.g., after reconnect) + // This will cause the old gRPC stream to exit cleanly if oldCh, exists := b.subscribers[partyID]; exists { close(oldCh) + logger.Debug("Closed old subscription channel for re-subscribing party", + zap.String("party_id", partyID)) } // Create buffered channel for this subscriber ch := make(chan *pb.SessionEvent, 100) b.subscribers[partyID] = ch - return ch + // Generate a unique subscription ID (using current time in nanoseconds) + subscriptionID := time.Now().UnixNano() + + return ch, subscriptionID } -// Unsubscribe removes a party's subscription -func (b *SessionEventBroadcaster) Unsubscribe(partyID string) { +// Unsubscribe removes a party's subscription only if the channel matches +// This prevents a race condition where a newer subscription is accidentally removed +// when an old gRPC stream exits after the party has already re-subscribed +func (b *SessionEventBroadcaster) Unsubscribe(partyID string, ch <-chan *pb.SessionEvent) { b.mu.Lock() defer b.mu.Unlock() - if ch, exists := b.subscribers[partyID]; exists { - close(ch) - delete(b.subscribers, partyID) + if currentCh, exists := b.subscribers[partyID]; exists { + // Only delete if the channel matches (i.e., this is still our subscription) + // If the channel doesn't match, a newer subscription has been created + // and we should not delete it + if currentCh == ch { + // Don't close the channel here - it was already closed by Subscribe + // when the new subscription was created, or we're the last one + delete(b.subscribers, partyID) + logger.Debug("Unsubscribed party from session events", + zap.String("party_id", partyID)) + } else { + logger.Debug("Skipping unsubscribe - channel mismatch (newer subscription exists)", + zap.String("party_id", partyID)) + } } }