diff --git a/backend/mpc-system/services/message-router/domain/session_event_broadcaster.go b/backend/mpc-system/services/message-router/domain/session_event_broadcaster.go index 76b118b8..165e729a 100644 --- a/backend/mpc-system/services/message-router/domain/session_event_broadcaster.go +++ b/backend/mpc-system/services/message-router/domain/session_event_broadcaster.go @@ -4,6 +4,8 @@ import ( "sync" pb "github.com/rwadurian/mpc-system/api/grpc/router/v1" + "github.com/rwadurian/mpc-system/pkg/logger" + "go.uber.org/zap" ) // SessionEventBroadcaster manages session event subscriptions and broadcasting @@ -69,16 +71,34 @@ func (b *SessionEventBroadcaster) BroadcastToParties(event *pb.SessionEvent, par b.mu.RLock() defer b.mu.RUnlock() + sentCount := 0 + missedParties := []string{} + for _, partyID := range partyIDs { if ch, exists := b.subscribers[partyID]; exists { // Non-blocking send select { case ch <- event: + sentCount++ default: // Channel full, skip this subscriber + missedParties = append(missedParties, partyID+" (channel full)") } + } else { + // Party not subscribed - this is a problem for session_started events! + missedParties = append(missedParties, partyID+" (not subscribed)") } } + + // Log if any parties were missed (helps debug event delivery issues) + if len(missedParties) > 0 { + logger.Warn("Some parties missed session event broadcast", + zap.String("event_type", event.EventType), + zap.String("session_id", event.SessionId), + zap.Int("sent_count", sentCount), + zap.Int("missed_count", len(missedParties)), + zap.Strings("missed_parties", missedParties)) + } } // SubscriberCount returns the number of active subscribers diff --git a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/remote/GrpcClient.kt b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/remote/GrpcClient.kt index daa75d21..bb2427c5 100644 --- a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/remote/GrpcClient.kt +++ b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/remote/GrpcClient.kt @@ -854,9 +854,19 @@ class GrpcClient @Inject constructor() { } } - asyncStub?.subscribeSessionEvents(request, observer) + val currentAsyncStub = asyncStub + if (currentAsyncStub == null) { + Log.e(TAG, "subscribeSessionEvents: asyncStub is null! Cannot subscribe.") + close(Exception("gRPC not connected - asyncStub is null")) + return@callbackFlow + } + + Log.d(TAG, "subscribeSessionEvents: Starting subscription for partyId=$partyId") + currentAsyncStub.subscribeSessionEvents(request, observer) + Log.d(TAG, "subscribeSessionEvents: Subscription request sent") awaitClose { + Log.d(TAG, "subscribeSessionEvents: Flow closed for partyId=$partyId") eventStreamSubscribed.set(false) eventStreamPartyId = null } diff --git a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/repository/TssRepository.kt b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/repository/TssRepository.kt index 2ebf2e17..79b2e0ec 100644 --- a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/repository/TssRepository.kt +++ b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/repository/TssRepository.kt @@ -50,6 +50,10 @@ class TssRepository @Inject constructor( // This allows session_started events to be matched even if _currentSession is not yet set private var pendingSessionId: String? = null + // Fallback polling job for session status (handles gRPC stream disconnection on Android) + // Android gRPC streams can disconnect when app goes to background, so we poll as backup + private var sessionStatusPollingJob: Job? = null + /** * Get the current party ID */