diff --git a/backend/mpc-system/services/message-router/adapters/output/memory/message_broker.go b/backend/mpc-system/services/message-router/adapters/output/memory/message_broker.go index dbe26e48..078d1d1f 100644 --- a/backend/mpc-system/services/message-router/adapters/output/memory/message_broker.go +++ b/backend/mpc-system/services/message-router/adapters/output/memory/message_broker.go @@ -66,13 +66,31 @@ func (a *MessageBrokerAdapter) PublishToSession( defer a.mu.RUnlock() prefix := sessionID + ":" + + // Debug: log all session channels + var allKeys []string + for key := range a.sessionChannels { + allKeys = append(allKeys, key) + } + logger.Debug("PublishToSession looking for channels", + zap.String("session_id", sessionID), + zap.String("exclude_party", excludeParty), + zap.String("prefix", prefix), + zap.Int("total_session_channels", len(a.sessionChannels)), + zap.Strings("all_keys", allKeys)) + + foundCount := 0 for key, ch := range a.sessionChannels { if len(key) > len(prefix) && key[:len(prefix)] == prefix { partyID := key[len(prefix):] if partyID == excludeParty { + logger.Debug("skipping sender party", + zap.String("session_id", sessionID), + zap.String("party_id", partyID)) continue } + foundCount++ select { case ch <- message: logger.Debug("broadcast message to party", @@ -88,6 +106,12 @@ func (a *MessageBrokerAdapter) PublishToSession( } } + if foundCount == 0 { + logger.Warn("No parties found for broadcast", + zap.String("session_id", sessionID), + zap.String("exclude_party", excludeParty)) + } + return nil } @@ -141,9 +165,17 @@ func (a *MessageBrokerAdapter) SubscribeToSessionMessages( key := sessionID + ":" + partyID + logger.Info("SubscribeToSessionMessages called", + zap.String("session_id", sessionID), + zap.String("party_id", partyID), + zap.String("key", key), + zap.Int("current_channel_count", len(a.sessionChannels))) + // Create channel if not exists if _, exists := a.sessionChannels[key]; !exists { a.sessionChannels[key] = make(chan *entities.MessageDTO, 100) + logger.Info("Created new session channel", + zap.String("key", key)) } ch := a.sessionChannels[key]