feat(session): broadcast participant_joined event via gRPC for real-time UI updates

Backend changes (session-coordinator):
- Add PublishParticipantJoined method to JoinSessionMessageRouterClient interface
- Implement PublishParticipantJoined in MessageRouterClient to broadcast events
- Call PublishParticipantJoined in join_session.go after participant joins
- Add detailed logging for debugging event broadcast

Android changes (service-party-android):
- Add detailed logging in TssRepository for session event handling
- Add detailed logging in MainViewModel for participant_joined processing
- Log activeSession state, event matching, and participant updates

This enables the initiator's waiting screen to receive real-time updates
when participants join the session, matching the expected behavior.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-01 08:34:47 -08:00
parent 13d1e58b84
commit f305a8cd97
4 changed files with 113 additions and 8 deletions

View File

@ -148,27 +148,44 @@ class TssRepository @Inject constructor(
*/
private fun startSessionEventSubscription() {
sessionEventJob?.cancel()
android.util.Log.d("TssRepository", "Starting session event subscription for partyId: $partyId")
sessionEventJob = CoroutineScope(Dispatchers.IO).launch {
grpcClient.subscribeSessionEvents(partyId).collect { event ->
android.util.Log.d("TssRepository", "Session event received: ${event.eventType} for session ${event.sessionId}")
android.util.Log.d("TssRepository", "=== Session event received ===")
android.util.Log.d("TssRepository", " eventType: ${event.eventType}")
android.util.Log.d("TssRepository", " sessionId: ${event.sessionId}")
android.util.Log.d("TssRepository", " selectedParties: ${event.selectedParties}")
// Check if this event is for our active session
val activeSession = _currentSession.value
android.util.Log.d("TssRepository", " activeSession: ${activeSession?.sessionId ?: "null"}")
if (activeSession != null && event.sessionId == activeSession.sessionId) {
android.util.Log.d("TssRepository", " → Event matches active session!")
when (event.eventType) {
"session_started" -> {
android.util.Log.d("TssRepository", "Session started event for our session, triggering keygen")
android.util.Log.d("TssRepository", " → Processing session_started event")
// Notify callback
sessionEventCallback?.invoke(event)
}
"party_joined", "participant_joined" -> {
android.util.Log.d("TssRepository", "Party joined our session")
android.util.Log.d("TssRepository", " → Processing participant_joined event")
sessionEventCallback?.invoke(event)
}
"all_joined" -> {
android.util.Log.d("TssRepository", "All parties joined our session")
android.util.Log.d("TssRepository", " → Processing all_joined event")
sessionEventCallback?.invoke(event)
}
else -> {
android.util.Log.d("TssRepository", " → Unknown event type: ${event.eventType}")
}
}
} else {
android.util.Log.d("TssRepository", " → Event does NOT match active session (ignored)")
if (activeSession == null) {
android.util.Log.d("TssRepository", " Reason: activeSession is null")
} else {
android.util.Log.d("TssRepository", " Reason: sessionId mismatch (event: ${event.sessionId}, active: ${activeSession.sessionId})")
}
}
}

View File

@ -262,8 +262,14 @@ class MainViewModel @Inject constructor(
* - CoSign joiner (参与签名)
*/
private fun setupSessionEventCallback() {
android.util.Log.d("MainViewModel", "Setting up session event callback")
repository.setSessionEventCallback { event ->
android.util.Log.d("MainViewModel", "Session event: ${event.eventType} for session ${event.sessionId}")
android.util.Log.d("MainViewModel", "=== MainViewModel received session event ===")
android.util.Log.d("MainViewModel", " eventType: ${event.eventType}")
android.util.Log.d("MainViewModel", " sessionId: ${event.sessionId}")
android.util.Log.d("MainViewModel", " _currentSessionId: ${_currentSessionId.value}")
android.util.Log.d("MainViewModel", " pendingJoinKeygenInfo?.sessionId: ${pendingJoinKeygenInfo?.sessionId}")
android.util.Log.d("MainViewModel", " pendingJoinSignInfo?.sessionId: ${pendingJoinSignInfo?.sessionId}")
when (event.eventType) {
"session_started" -> {
@ -296,18 +302,25 @@ class MainViewModel @Inject constructor(
}
}
"party_joined", "participant_joined" -> {
android.util.Log.d("MainViewModel", "Processing participant_joined event...")
// Update participant count for initiator's CreateWallet screen
val currentSessionId = _currentSessionId.value
android.util.Log.d("MainViewModel", " Checking for initiator: currentSessionId=$currentSessionId, eventSessionId=${event.sessionId}")
if (currentSessionId != null && event.sessionId == currentSessionId) {
android.util.Log.d("MainViewModel", " → Matched initiator session! Updating _sessionParticipants")
_sessionParticipants.update { current ->
val newParticipant = "参与方 ${current.size + 1}"
android.util.Log.d("MainViewModel", " → Adding participant: $newParticipant, total now: ${current.size + 1}")
current + newParticipant
}
}
// Update participant count for keygen joiner's JoinKeygen screen
val joinKeygenInfo = pendingJoinKeygenInfo
android.util.Log.d("MainViewModel", " Checking for joiner: joinKeygenInfo?.sessionId=${joinKeygenInfo?.sessionId}")
if (joinKeygenInfo != null && event.sessionId == joinKeygenInfo.sessionId) {
android.util.Log.d("MainViewModel", " → Matched joiner session! Updating _joinKeygenParticipants")
_joinKeygenParticipants.update { current ->
val newParticipant = "参与方 ${current.size + 1}"
current + newParticipant
@ -317,6 +330,7 @@ class MainViewModel @Inject constructor(
// Update participant count for sign joiner's CoSign screen
val joinSignInfo = pendingJoinSignInfo
if (joinSignInfo != null && event.sessionId == joinSignInfo.sessionId) {
android.util.Log.d("MainViewModel", " → Matched sign joiner session! Updating _coSignParticipants")
_coSignParticipants.update { current ->
val newParticipant = "参与方 ${current.size + 1}"
current + newParticipant

View File

@ -171,3 +171,32 @@ func (c *MessageRouterClient) PublishSessionStarted(
return c.PublishSessionEvent(ctx, event)
}
// PublishParticipantJoined publishes a participant_joined event to all parties in the session
// This notifies the initiator and other participants that a new party has joined
func (c *MessageRouterClient) PublishParticipantJoined(
ctx context.Context,
sessionID string,
partyID string,
selectedParties []string,
joinedAt int64,
) error {
logger.Info("Publishing participant_joined event to Message Router",
zap.String("session_id", sessionID),
zap.String("joined_party_id", partyID),
zap.Strings("notify_parties", selectedParties),
zap.Int64("joined_at", joinedAt))
event := &router.SessionEvent{
EventId: uuid.New().String(),
EventType: "participant_joined",
SessionId: sessionID,
SelectedParties: selectedParties,
CreatedAt: joinedAt,
// Note: We could add a custom field for the joined party ID, but for now
// the event itself indicates someone joined. The initiator can refresh
// their participant list via API if needed.
}
return c.PublishSessionEvent(ctx, event)
}

View File

@ -32,6 +32,16 @@ type JoinSessionMessageRouterClient interface {
joinTokens map[string]string,
startedAt int64,
) error
// PublishParticipantJoined broadcasts a participant_joined event to all parties in the session
// This allows the initiator's waiting screen to update in real-time when participants join
PublishParticipantJoined(
ctx context.Context,
sessionID string,
partyID string,
selectedParties []string,
joinedAt int64,
) error
}
// JoinSessionUseCase implements the join session use case
@ -271,19 +281,54 @@ func (uc *JoinSessionUseCase) executeWithRetry(
return nil, err
}
// 9. Publish participant joined event
// 9. Publish participant joined event to internal message broker
joinedAt := time.Now().UnixMilli()
event := output.ParticipantJoinedEvent{
SessionID: session.ID.String(),
PartyID: inputData.PartyID,
JoinedAt: time.Now().UnixMilli(),
JoinedAt: joinedAt,
}
if err := uc.eventPublisher.PublishEvent(ctx, output.TopicParticipantJoined, event); err != nil {
logger.Error("failed to publish participant joined event",
logger.Error("failed to publish participant joined event to internal broker",
zap.String("session_id", session.ID.String()),
zap.String("party_id", inputData.PartyID),
zap.Error(err))
}
// 9.1 Publish participant joined event via gRPC to message-router (for real-time UI updates)
// This notifies the initiator and other participants that a new party has joined
if uc.messageRouterClient != nil {
// Get all party IDs in the session to notify them
allPartyIDs := session.GetPartyIDs()
logger.Info("Broadcasting participant_joined event via gRPC",
zap.String("session_id", session.ID.String()),
zap.String("joined_party_id", inputData.PartyID),
zap.Strings("notify_parties", allPartyIDs),
zap.Int("total_participants", len(session.Participants)))
if err := uc.messageRouterClient.PublishParticipantJoined(
ctx,
session.ID.String(),
inputData.PartyID,
allPartyIDs,
joinedAt,
); err != nil {
logger.Error("failed to publish participant joined event to message router",
zap.String("session_id", session.ID.String()),
zap.String("party_id", inputData.PartyID),
zap.Error(err))
} else {
logger.Info("Successfully published participant_joined event to message router",
zap.String("session_id", session.ID.String()),
zap.String("joined_party_id", inputData.PartyID),
zap.Int("notify_count", len(allPartyIDs)))
}
} else {
logger.Warn("messageRouterClient is nil, cannot broadcast participant_joined event",
zap.String("session_id", session.ID.String()),
zap.String("party_id", inputData.PartyID))
}
// 10. Build response with other parties info
otherParties := session.GetOtherParties(partyID)
partyInfos := make([]input.PartyInfo, len(otherParties))