From f305a8cd9720fc2c93e3709251cbd778261d8620 Mon Sep 17 00:00:00 2001 From: hailin Date: Thu, 1 Jan 2026 08:34:47 -0800 Subject: [PATCH] feat(session): broadcast participant_joined event via gRPC for real-time UI updates MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Backend changes (session-coordinator): - Add PublishParticipantJoined method to JoinSessionMessageRouterClient interface - Implement PublishParticipantJoined in MessageRouterClient to broadcast events - Call PublishParticipantJoined in join_session.go after participant joins - Add detailed logging for debugging event broadcast Android changes (service-party-android): - Add detailed logging in TssRepository for session event handling - Add detailed logging in MainViewModel for participant_joined processing - Log activeSession state, event matching, and participant updates This enables the initiator's waiting screen to receive real-time updates when participants join the session, matching the expected behavior. πŸ€– Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../tssparty/data/repository/TssRepository.kt | 25 +++++++-- .../presentation/viewmodel/MainViewModel.kt | 16 +++++- .../output/grpc/message_router_client.go | 29 +++++++++++ .../application/use_cases/join_session.go | 51 +++++++++++++++++-- 4 files changed, 113 insertions(+), 8 deletions(-) diff --git a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/repository/TssRepository.kt b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/repository/TssRepository.kt index e13d0796..47ab8fe7 100644 --- a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/repository/TssRepository.kt +++ b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/data/repository/TssRepository.kt @@ -148,27 +148,44 @@ class TssRepository @Inject constructor( */ private fun startSessionEventSubscription() { sessionEventJob?.cancel() + android.util.Log.d("TssRepository", "Starting session event subscription for partyId: $partyId") sessionEventJob = CoroutineScope(Dispatchers.IO).launch { grpcClient.subscribeSessionEvents(partyId).collect { event -> - android.util.Log.d("TssRepository", "Session event received: ${event.eventType} for session ${event.sessionId}") + android.util.Log.d("TssRepository", "=== Session event received ===") + android.util.Log.d("TssRepository", " eventType: ${event.eventType}") + android.util.Log.d("TssRepository", " sessionId: ${event.sessionId}") + android.util.Log.d("TssRepository", " selectedParties: ${event.selectedParties}") // Check if this event is for our active session val activeSession = _currentSession.value + android.util.Log.d("TssRepository", " activeSession: ${activeSession?.sessionId ?: "null"}") + if (activeSession != null && event.sessionId == activeSession.sessionId) { + android.util.Log.d("TssRepository", " β†’ Event matches active session!") when (event.eventType) { "session_started" -> { - android.util.Log.d("TssRepository", "Session started event for our session, triggering keygen") + android.util.Log.d("TssRepository", " β†’ Processing session_started event") // Notify callback sessionEventCallback?.invoke(event) } "party_joined", "participant_joined" -> { - android.util.Log.d("TssRepository", "Party joined our session") + android.util.Log.d("TssRepository", " β†’ Processing participant_joined event") sessionEventCallback?.invoke(event) } "all_joined" -> { - android.util.Log.d("TssRepository", "All parties joined our session") + android.util.Log.d("TssRepository", " β†’ Processing all_joined event") sessionEventCallback?.invoke(event) } + else -> { + android.util.Log.d("TssRepository", " β†’ Unknown event type: ${event.eventType}") + } + } + } else { + android.util.Log.d("TssRepository", " β†’ Event does NOT match active session (ignored)") + if (activeSession == null) { + android.util.Log.d("TssRepository", " Reason: activeSession is null") + } else { + android.util.Log.d("TssRepository", " Reason: sessionId mismatch (event: ${event.sessionId}, active: ${activeSession.sessionId})") } } } diff --git a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/presentation/viewmodel/MainViewModel.kt b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/presentation/viewmodel/MainViewModel.kt index de346562..d8beb014 100644 --- a/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/presentation/viewmodel/MainViewModel.kt +++ b/backend/mpc-system/services/service-party-android/app/src/main/java/com/durian/tssparty/presentation/viewmodel/MainViewModel.kt @@ -262,8 +262,14 @@ class MainViewModel @Inject constructor( * - CoSign joiner (ε‚δΈŽη­Ύε) */ private fun setupSessionEventCallback() { + android.util.Log.d("MainViewModel", "Setting up session event callback") repository.setSessionEventCallback { event -> - android.util.Log.d("MainViewModel", "Session event: ${event.eventType} for session ${event.sessionId}") + android.util.Log.d("MainViewModel", "=== MainViewModel received session event ===") + android.util.Log.d("MainViewModel", " eventType: ${event.eventType}") + android.util.Log.d("MainViewModel", " sessionId: ${event.sessionId}") + android.util.Log.d("MainViewModel", " _currentSessionId: ${_currentSessionId.value}") + android.util.Log.d("MainViewModel", " pendingJoinKeygenInfo?.sessionId: ${pendingJoinKeygenInfo?.sessionId}") + android.util.Log.d("MainViewModel", " pendingJoinSignInfo?.sessionId: ${pendingJoinSignInfo?.sessionId}") when (event.eventType) { "session_started" -> { @@ -296,18 +302,25 @@ class MainViewModel @Inject constructor( } } "party_joined", "participant_joined" -> { + android.util.Log.d("MainViewModel", "Processing participant_joined event...") + // Update participant count for initiator's CreateWallet screen val currentSessionId = _currentSessionId.value + android.util.Log.d("MainViewModel", " Checking for initiator: currentSessionId=$currentSessionId, eventSessionId=${event.sessionId}") if (currentSessionId != null && event.sessionId == currentSessionId) { + android.util.Log.d("MainViewModel", " β†’ Matched initiator session! Updating _sessionParticipants") _sessionParticipants.update { current -> val newParticipant = "ε‚δΈŽζ–Ή ${current.size + 1}" + android.util.Log.d("MainViewModel", " β†’ Adding participant: $newParticipant, total now: ${current.size + 1}") current + newParticipant } } // Update participant count for keygen joiner's JoinKeygen screen val joinKeygenInfo = pendingJoinKeygenInfo + android.util.Log.d("MainViewModel", " Checking for joiner: joinKeygenInfo?.sessionId=${joinKeygenInfo?.sessionId}") if (joinKeygenInfo != null && event.sessionId == joinKeygenInfo.sessionId) { + android.util.Log.d("MainViewModel", " β†’ Matched joiner session! Updating _joinKeygenParticipants") _joinKeygenParticipants.update { current -> val newParticipant = "ε‚δΈŽζ–Ή ${current.size + 1}" current + newParticipant @@ -317,6 +330,7 @@ class MainViewModel @Inject constructor( // Update participant count for sign joiner's CoSign screen val joinSignInfo = pendingJoinSignInfo if (joinSignInfo != null && event.sessionId == joinSignInfo.sessionId) { + android.util.Log.d("MainViewModel", " β†’ Matched sign joiner session! Updating _coSignParticipants") _coSignParticipants.update { current -> val newParticipant = "ε‚δΈŽζ–Ή ${current.size + 1}" current + newParticipant diff --git a/backend/mpc-system/services/session-coordinator/adapters/output/grpc/message_router_client.go b/backend/mpc-system/services/session-coordinator/adapters/output/grpc/message_router_client.go index 5fad7e07..28c614bd 100644 --- a/backend/mpc-system/services/session-coordinator/adapters/output/grpc/message_router_client.go +++ b/backend/mpc-system/services/session-coordinator/adapters/output/grpc/message_router_client.go @@ -171,3 +171,32 @@ func (c *MessageRouterClient) PublishSessionStarted( return c.PublishSessionEvent(ctx, event) } + +// PublishParticipantJoined publishes a participant_joined event to all parties in the session +// This notifies the initiator and other participants that a new party has joined +func (c *MessageRouterClient) PublishParticipantJoined( + ctx context.Context, + sessionID string, + partyID string, + selectedParties []string, + joinedAt int64, +) error { + logger.Info("Publishing participant_joined event to Message Router", + zap.String("session_id", sessionID), + zap.String("joined_party_id", partyID), + zap.Strings("notify_parties", selectedParties), + zap.Int64("joined_at", joinedAt)) + + event := &router.SessionEvent{ + EventId: uuid.New().String(), + EventType: "participant_joined", + SessionId: sessionID, + SelectedParties: selectedParties, + CreatedAt: joinedAt, + // Note: We could add a custom field for the joined party ID, but for now + // the event itself indicates someone joined. The initiator can refresh + // their participant list via API if needed. + } + + return c.PublishSessionEvent(ctx, event) +} diff --git a/backend/mpc-system/services/session-coordinator/application/use_cases/join_session.go b/backend/mpc-system/services/session-coordinator/application/use_cases/join_session.go index e1423953..96ee8420 100644 --- a/backend/mpc-system/services/session-coordinator/application/use_cases/join_session.go +++ b/backend/mpc-system/services/session-coordinator/application/use_cases/join_session.go @@ -32,6 +32,16 @@ type JoinSessionMessageRouterClient interface { joinTokens map[string]string, startedAt int64, ) error + + // PublishParticipantJoined broadcasts a participant_joined event to all parties in the session + // This allows the initiator's waiting screen to update in real-time when participants join + PublishParticipantJoined( + ctx context.Context, + sessionID string, + partyID string, + selectedParties []string, + joinedAt int64, + ) error } // JoinSessionUseCase implements the join session use case @@ -271,19 +281,54 @@ func (uc *JoinSessionUseCase) executeWithRetry( return nil, err } - // 9. Publish participant joined event + // 9. Publish participant joined event to internal message broker + joinedAt := time.Now().UnixMilli() event := output.ParticipantJoinedEvent{ SessionID: session.ID.String(), PartyID: inputData.PartyID, - JoinedAt: time.Now().UnixMilli(), + JoinedAt: joinedAt, } if err := uc.eventPublisher.PublishEvent(ctx, output.TopicParticipantJoined, event); err != nil { - logger.Error("failed to publish participant joined event", + logger.Error("failed to publish participant joined event to internal broker", zap.String("session_id", session.ID.String()), zap.String("party_id", inputData.PartyID), zap.Error(err)) } + // 9.1 Publish participant joined event via gRPC to message-router (for real-time UI updates) + // This notifies the initiator and other participants that a new party has joined + if uc.messageRouterClient != nil { + // Get all party IDs in the session to notify them + allPartyIDs := session.GetPartyIDs() + logger.Info("Broadcasting participant_joined event via gRPC", + zap.String("session_id", session.ID.String()), + zap.String("joined_party_id", inputData.PartyID), + zap.Strings("notify_parties", allPartyIDs), + zap.Int("total_participants", len(session.Participants))) + + if err := uc.messageRouterClient.PublishParticipantJoined( + ctx, + session.ID.String(), + inputData.PartyID, + allPartyIDs, + joinedAt, + ); err != nil { + logger.Error("failed to publish participant joined event to message router", + zap.String("session_id", session.ID.String()), + zap.String("party_id", inputData.PartyID), + zap.Error(err)) + } else { + logger.Info("Successfully published participant_joined event to message router", + zap.String("session_id", session.ID.String()), + zap.String("joined_party_id", inputData.PartyID), + zap.Int("notify_count", len(allPartyIDs))) + } + } else { + logger.Warn("messageRouterClient is nil, cannot broadcast participant_joined event", + zap.String("session_id", session.ID.String()), + zap.String("party_id", inputData.PartyID)) + } + // 10. Build response with other parties info otherParties := session.GetOtherParties(partyID) partyInfos := make([]input.PartyInfo, len(otherParties))