From 1d5c834dfedfc843cb9295ef4031b5b6a61f98f3 Mon Sep 17 00:00:00 2001 From: hailin Date: Tue, 24 Feb 2026 02:41:38 -0800 Subject: [PATCH] feat: add event buffering to agent WS gateway for late subscribers Buffer stream events when no WS clients are subscribed yet, then replay them when a client subscribes. This eliminates the race condition where events are lost between task creation and WS subscription. Co-Authored-By: Claude Opus 4.6 --- .../src/interfaces/ws/agent-stream.gateway.ts | 54 +++++++++++++++++-- 1 file changed, 49 insertions(+), 5 deletions(-) diff --git a/packages/services/agent-service/src/interfaces/ws/agent-stream.gateway.ts b/packages/services/agent-service/src/interfaces/ws/agent-stream.gateway.ts index c85c4b3..33d954e 100644 --- a/packages/services/agent-service/src/interfaces/ws/agent-stream.gateway.ts +++ b/packages/services/agent-service/src/interfaces/ws/agent-stream.gateway.ts @@ -31,6 +31,12 @@ export class AgentStreamGateway implements OnGatewayConnection, OnGatewayDisconn private readonly sessionClients = new Map>(); /** client → Set of sessionIds it subscribed to (for cleanup on disconnect) */ private readonly clientSessions = new Map>(); + /** sessionId → buffered events (replayed on late subscription) */ + private readonly sessionBuffer = new Map(); + /** Max events to buffer per session before discarding oldest */ + private readonly MAX_BUFFER_SIZE = 500; + /** TTL for orphan buffers (ms) — cleaned up if no subscriber after this time */ + private readonly BUFFER_TTL_MS = 120_000; handleConnection(client: WebSocket) { this.logger.log('Client connected'); @@ -84,21 +90,59 @@ export class AgentStreamGateway implements OnGatewayConnection, OnGatewayDisconn data: { sessionId, taskId: payload.taskId }, }); + // Replay any buffered events that arrived before this client subscribed + const buffered = this.sessionBuffer.get(sessionId); + if (buffered && buffered.length > 0) { + this.logger.log(`Replaying ${buffered.length} buffered events for session ${sessionId}`); + for (const evt of buffered) { + this.sendTo(client, { event: 'stream_event', data: evt }); + } + // Clear buffer after replay (all events delivered) + this.sessionBuffer.delete(sessionId); + } + this.logger.log(`Client subscribed to session ${sessionId}`); } /** * Emits a stream event to all WebSocket clients subscribed to the session. + * If no clients are subscribed yet, the event is buffered for late subscribers. */ emitStreamEvent(sessionId: string, event: any) { const clients = this.sessionClients.get(sessionId); - if (!clients || clients.size === 0) return; + const hasSubscribers = clients && clients.size > 0; - const message = JSON.stringify({ event: 'stream_event', data: event }); - for (const client of clients) { - if (client.readyState === 1 /* WebSocket.OPEN */) { - client.send(message); + if (hasSubscribers) { + const message = JSON.stringify({ event: 'stream_event', data: event }); + for (const client of clients) { + if (client.readyState === 1 /* WebSocket.OPEN */) { + client.send(message); + } } + } else { + // No subscribers yet — buffer the event for late joiners + if (!this.sessionBuffer.has(sessionId)) { + this.sessionBuffer.set(sessionId, []); + // Schedule cleanup in case no one ever subscribes + setTimeout(() => { + if (this.sessionBuffer.has(sessionId)) { + this.sessionBuffer.delete(sessionId); + } + }, this.BUFFER_TTL_MS); + } + const buffer = this.sessionBuffer.get(sessionId)!; + buffer.push(event); + if (buffer.length > this.MAX_BUFFER_SIZE) { + buffer.shift(); // Drop oldest event + } + } + + // Clean up buffer on terminal events + if (event.type === 'completed' || event.type === 'error') { + // Give late subscribers a small window to connect and receive replay + setTimeout(() => { + this.sessionBuffer.delete(sessionId); + }, 10_000); } }