fix(message-router): prevent subscription race condition on gRPC reconnect
When a party re-subscribes (e.g., Android reconnects), the old gRPC stream's defer Unsubscribe() was accidentally removing the NEW subscription from the subscribers map, causing the party to miss session_started events. Fix: - Subscribe() now returns the channel to the caller - Unsubscribe() now takes the channel and only removes if it matches - This prevents older streams from removing newer subscriptions 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
b7fc488dcf
commit
549b21f298
|
|
@ -350,8 +350,10 @@ func (s *MessageRouterServer) SubscribeSessionEvents(
|
||||||
zap.String("party_id", req.PartyId))
|
zap.String("party_id", req.PartyId))
|
||||||
|
|
||||||
// Subscribe to events
|
// Subscribe to events
|
||||||
eventCh := s.eventBroadcaster.Subscribe(req.PartyId)
|
// The channel is used for identity check in Unsubscribe to prevent
|
||||||
defer s.eventBroadcaster.Unsubscribe(req.PartyId)
|
// accidentally removing a newer subscription when this stream exits
|
||||||
|
eventCh, _ := s.eventBroadcaster.Subscribe(req.PartyId)
|
||||||
|
defer s.eventBroadcaster.Unsubscribe(req.PartyId, eventCh)
|
||||||
|
|
||||||
// Stream events
|
// Stream events
|
||||||
for {
|
for {
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@ package domain
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
pb "github.com/rwadurian/mpc-system/api/grpc/router/v1"
|
pb "github.com/rwadurian/mpc-system/api/grpc/router/v1"
|
||||||
"github.com/rwadurian/mpc-system/pkg/logger"
|
"github.com/rwadurian/mpc-system/pkg/logger"
|
||||||
|
|
@ -22,32 +23,51 @@ func NewSessionEventBroadcaster() *SessionEventBroadcaster {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Subscribe subscribes a party to session events
|
// Subscribe subscribes a party to session events
|
||||||
// If the party already has an active subscription, the old channel is closed first
|
// Returns the channel for receiving events and a unique subscription ID
|
||||||
// to prevent memory leaks and ensure clean reconnection
|
// The subscription ID is used to safely unsubscribe without affecting newer subscriptions
|
||||||
func (b *SessionEventBroadcaster) Subscribe(partyID string) <-chan *pb.SessionEvent {
|
func (b *SessionEventBroadcaster) Subscribe(partyID string) (<-chan *pb.SessionEvent, int64) {
|
||||||
b.mu.Lock()
|
b.mu.Lock()
|
||||||
defer b.mu.Unlock()
|
defer b.mu.Unlock()
|
||||||
|
|
||||||
// Close existing channel if party is re-subscribing (e.g., after reconnect)
|
// Close existing channel if party is re-subscribing (e.g., after reconnect)
|
||||||
|
// This will cause the old gRPC stream to exit cleanly
|
||||||
if oldCh, exists := b.subscribers[partyID]; exists {
|
if oldCh, exists := b.subscribers[partyID]; exists {
|
||||||
close(oldCh)
|
close(oldCh)
|
||||||
|
logger.Debug("Closed old subscription channel for re-subscribing party",
|
||||||
|
zap.String("party_id", partyID))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create buffered channel for this subscriber
|
// Create buffered channel for this subscriber
|
||||||
ch := make(chan *pb.SessionEvent, 100)
|
ch := make(chan *pb.SessionEvent, 100)
|
||||||
b.subscribers[partyID] = ch
|
b.subscribers[partyID] = ch
|
||||||
|
|
||||||
return ch
|
// Generate a unique subscription ID (using current time in nanoseconds)
|
||||||
|
subscriptionID := time.Now().UnixNano()
|
||||||
|
|
||||||
|
return ch, subscriptionID
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unsubscribe removes a party's subscription
|
// Unsubscribe removes a party's subscription only if the channel matches
|
||||||
func (b *SessionEventBroadcaster) Unsubscribe(partyID string) {
|
// This prevents a race condition where a newer subscription is accidentally removed
|
||||||
|
// when an old gRPC stream exits after the party has already re-subscribed
|
||||||
|
func (b *SessionEventBroadcaster) Unsubscribe(partyID string, ch <-chan *pb.SessionEvent) {
|
||||||
b.mu.Lock()
|
b.mu.Lock()
|
||||||
defer b.mu.Unlock()
|
defer b.mu.Unlock()
|
||||||
|
|
||||||
if ch, exists := b.subscribers[partyID]; exists {
|
if currentCh, exists := b.subscribers[partyID]; exists {
|
||||||
close(ch)
|
// Only delete if the channel matches (i.e., this is still our subscription)
|
||||||
delete(b.subscribers, partyID)
|
// If the channel doesn't match, a newer subscription has been created
|
||||||
|
// and we should not delete it
|
||||||
|
if currentCh == ch {
|
||||||
|
// Don't close the channel here - it was already closed by Subscribe
|
||||||
|
// when the new subscription was created, or we're the last one
|
||||||
|
delete(b.subscribers, partyID)
|
||||||
|
logger.Debug("Unsubscribed party from session events",
|
||||||
|
zap.String("party_id", partyID))
|
||||||
|
} else {
|
||||||
|
logger.Debug("Skipping unsubscribe - channel mismatch (newer subscription exists)",
|
||||||
|
zap.String("party_id", partyID))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue