debug: add more logging to message broker for broadcast diagnostics
🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
c257ad1639
commit
98731cc133
|
|
@ -66,13 +66,31 @@ func (a *MessageBrokerAdapter) PublishToSession(
|
||||||
defer a.mu.RUnlock()
|
defer a.mu.RUnlock()
|
||||||
|
|
||||||
prefix := sessionID + ":"
|
prefix := sessionID + ":"
|
||||||
|
|
||||||
|
// Debug: log all session channels
|
||||||
|
var allKeys []string
|
||||||
|
for key := range a.sessionChannels {
|
||||||
|
allKeys = append(allKeys, key)
|
||||||
|
}
|
||||||
|
logger.Debug("PublishToSession looking for channels",
|
||||||
|
zap.String("session_id", sessionID),
|
||||||
|
zap.String("exclude_party", excludeParty),
|
||||||
|
zap.String("prefix", prefix),
|
||||||
|
zap.Int("total_session_channels", len(a.sessionChannels)),
|
||||||
|
zap.Strings("all_keys", allKeys))
|
||||||
|
|
||||||
|
foundCount := 0
|
||||||
for key, ch := range a.sessionChannels {
|
for key, ch := range a.sessionChannels {
|
||||||
if len(key) > len(prefix) && key[:len(prefix)] == prefix {
|
if len(key) > len(prefix) && key[:len(prefix)] == prefix {
|
||||||
partyID := key[len(prefix):]
|
partyID := key[len(prefix):]
|
||||||
if partyID == excludeParty {
|
if partyID == excludeParty {
|
||||||
|
logger.Debug("skipping sender party",
|
||||||
|
zap.String("session_id", sessionID),
|
||||||
|
zap.String("party_id", partyID))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
foundCount++
|
||||||
select {
|
select {
|
||||||
case ch <- message:
|
case ch <- message:
|
||||||
logger.Debug("broadcast message to party",
|
logger.Debug("broadcast message to party",
|
||||||
|
|
@ -88,6 +106,12 @@ func (a *MessageBrokerAdapter) PublishToSession(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if foundCount == 0 {
|
||||||
|
logger.Warn("No parties found for broadcast",
|
||||||
|
zap.String("session_id", sessionID),
|
||||||
|
zap.String("exclude_party", excludeParty))
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -141,9 +165,17 @@ func (a *MessageBrokerAdapter) SubscribeToSessionMessages(
|
||||||
|
|
||||||
key := sessionID + ":" + partyID
|
key := sessionID + ":" + partyID
|
||||||
|
|
||||||
|
logger.Info("SubscribeToSessionMessages called",
|
||||||
|
zap.String("session_id", sessionID),
|
||||||
|
zap.String("party_id", partyID),
|
||||||
|
zap.String("key", key),
|
||||||
|
zap.Int("current_channel_count", len(a.sessionChannels)))
|
||||||
|
|
||||||
// Create channel if not exists
|
// Create channel if not exists
|
||||||
if _, exists := a.sessionChannels[key]; !exists {
|
if _, exists := a.sessionChannels[key]; !exists {
|
||||||
a.sessionChannels[key] = make(chan *entities.MessageDTO, 100)
|
a.sessionChannels[key] = make(chan *entities.MessageDTO, 100)
|
||||||
|
logger.Info("Created new session channel",
|
||||||
|
zap.String("key", key))
|
||||||
}
|
}
|
||||||
|
|
||||||
ch := a.sessionChannels[key]
|
ch := a.sessionChannels[key]
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue