fix(server-party): session 事件订阅断开后自动重连

Message Router 重启后,server-party 的 gRPC stream 会断开,
之前的实现会直接退出 goroutine 导致无法收到新的 session 事件。

修改内容:
- 添加自动重连逻辑,stream 断开时会尝试重新订阅
- 使用指数退避策略,从 1 秒到最大 30 秒
- 重连成功后重置退避时间

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2025-12-29 01:25:30 -08:00
parent 40a257e55c
commit be94a6ab18
1 changed files with 54 additions and 17 deletions

View File

@ -385,7 +385,7 @@ func (c *MessageRouterClient) UpdateNotificationChannels(
return c.RegisterPartyWithNotification(ctx, partyID, partyRole, version, notification) return c.RegisterPartyWithNotification(ctx, partyID, partyRole, version, notification)
} }
// SubscribeSessionEvents subscribes to session lifecycle events // SubscribeSessionEvents subscribes to session lifecycle events with auto-reconnect
func (c *MessageRouterClient) SubscribeSessionEvents( func (c *MessageRouterClient) SubscribeSessionEvents(
ctx context.Context, ctx context.Context,
partyID string, partyID string,
@ -396,7 +396,7 @@ func (c *MessageRouterClient) SubscribeSessionEvents(
EventTypes: []string{}, // Subscribe to all event types EventTypes: []string{}, // Subscribe to all event types
} }
// Create a streaming connection // Create initial streaming connection
stream, err := c.createSessionEventStream(ctx, req) stream, err := c.createSessionEventStream(ctx, req)
if err != nil { if err != nil {
logger.Error("Failed to subscribe to session events", logger.Error("Failed to subscribe to session events",
@ -408,8 +408,12 @@ func (c *MessageRouterClient) SubscribeSessionEvents(
logger.Info("Subscribed to session events", logger.Info("Subscribed to session events",
zap.String("party_id", partyID)) zap.String("party_id", partyID))
// Start goroutine to receive events // Start goroutine to receive events with auto-reconnect
go func() { go func() {
currentStream := stream
reconnectBackoff := time.Second // Start with 1 second backoff
maxBackoff := 30 * time.Second
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -418,27 +422,60 @@ func (c *MessageRouterClient) SubscribeSessionEvents(
return return
default: default:
event := &router.SessionEvent{} event := &router.SessionEvent{}
err := stream.RecvMsg(event) err := currentStream.RecvMsg(event)
if err == io.EOF { if err == io.EOF {
logger.Info("Session event stream ended", logger.Warn("Session event stream ended, reconnecting...",
zap.String("party_id", partyID)) zap.String("party_id", partyID))
return } else if err != nil {
} logger.Warn("Error receiving session event, reconnecting...",
if err != nil {
logger.Error("Error receiving session event",
zap.Error(err), zap.Error(err),
zap.String("party_id", partyID)) zap.String("party_id", partyID))
return } else {
// Successfully received event, reset backoff
reconnectBackoff = time.Second
logger.Info("Received session event",
zap.String("event_type", event.EventType),
zap.String("session_id", event.SessionId),
zap.String("party_id", partyID))
// Call event handler
if eventHandler != nil {
eventHandler(event)
}
continue
} }
logger.Info("Received session event", // Reconnect with exponential backoff
zap.String("event_type", event.EventType), for {
zap.String("session_id", event.SessionId), select {
zap.String("party_id", partyID)) case <-ctx.Done():
return
case <-time.After(reconnectBackoff):
logger.Info("Attempting to reconnect session event stream",
zap.String("party_id", partyID),
zap.Duration("backoff", reconnectBackoff))
// Call event handler newStream, err := c.createSessionEventStream(ctx, req)
if eventHandler != nil { if err != nil {
eventHandler(event) logger.Error("Failed to reconnect session event stream",
zap.Error(err),
zap.String("party_id", partyID))
// Increase backoff for next attempt
reconnectBackoff = reconnectBackoff * 2
if reconnectBackoff > maxBackoff {
reconnectBackoff = maxBackoff
}
continue
}
logger.Info("Successfully reconnected to session events",
zap.String("party_id", partyID))
currentStream = newStream
reconnectBackoff = time.Second // Reset backoff on success
break
}
break
} }
} }
} }