From 98731cc133fa0716ed0ddc146540dbe3e6064aef Mon Sep 17 00:00:00 2001 From: hailin Date: Sat, 6 Dec 2025 09:57:34 -0800 Subject: [PATCH] debug: add more logging to message broker for broadcast diagnostics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../adapters/output/memory/message_broker.go | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/backend/mpc-system/services/message-router/adapters/output/memory/message_broker.go b/backend/mpc-system/services/message-router/adapters/output/memory/message_broker.go index dbe26e48..078d1d1f 100644 --- a/backend/mpc-system/services/message-router/adapters/output/memory/message_broker.go +++ b/backend/mpc-system/services/message-router/adapters/output/memory/message_broker.go @@ -66,13 +66,31 @@ func (a *MessageBrokerAdapter) PublishToSession( defer a.mu.RUnlock() prefix := sessionID + ":" + + // Debug: log all session channels + var allKeys []string + for key := range a.sessionChannels { + allKeys = append(allKeys, key) + } + logger.Debug("PublishToSession looking for channels", + zap.String("session_id", sessionID), + zap.String("exclude_party", excludeParty), + zap.String("prefix", prefix), + zap.Int("total_session_channels", len(a.sessionChannels)), + zap.Strings("all_keys", allKeys)) + + foundCount := 0 for key, ch := range a.sessionChannels { if len(key) > len(prefix) && key[:len(prefix)] == prefix { partyID := key[len(prefix):] if partyID == excludeParty { + logger.Debug("skipping sender party", + zap.String("session_id", sessionID), + zap.String("party_id", partyID)) continue } + foundCount++ select { case ch <- message: logger.Debug("broadcast message to party", @@ -88,6 +106,12 @@ func (a *MessageBrokerAdapter) PublishToSession( } } + if foundCount == 0 { + logger.Warn("No parties found for broadcast", + zap.String("session_id", sessionID), + zap.String("exclude_party", excludeParty)) + } + return nil } @@ -141,9 +165,17 @@ func (a *MessageBrokerAdapter) SubscribeToSessionMessages( key := sessionID + ":" + partyID + logger.Info("SubscribeToSessionMessages called", + zap.String("session_id", sessionID), + zap.String("party_id", partyID), + zap.String("key", key), + zap.Int("current_channel_count", len(a.sessionChannels))) + // Create channel if not exists if _, exists := a.sessionChannels[key]; !exists { a.sessionChannels[key] = make(chan *entities.MessageDTO, 100) + logger.Info("Created new session channel", + zap.String("key", key)) } ch := a.sessionChannels[key]