feat: add event buffering to agent WS gateway for late subscribers
Buffer stream events when no WS clients are subscribed yet, then replay them when a client subscribes. This eliminates the race condition where events are lost between task creation and WS subscription. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
370e32599f
commit
1d5c834dfe
|
|
@ -31,6 +31,12 @@ export class AgentStreamGateway implements OnGatewayConnection, OnGatewayDisconn
|
|||
private readonly sessionClients = new Map<string, Set<WebSocket>>();
|
||||
/** client → Set of sessionIds it subscribed to (for cleanup on disconnect) */
|
||||
private readonly clientSessions = new Map<WebSocket, Set<string>>();
|
||||
/** sessionId → buffered events (replayed on late subscription) */
|
||||
private readonly sessionBuffer = new Map<string, any[]>();
|
||||
/** Max events to buffer per session before discarding oldest */
|
||||
private readonly MAX_BUFFER_SIZE = 500;
|
||||
/** TTL for orphan buffers (ms) — cleaned up if no subscriber after this time */
|
||||
private readonly BUFFER_TTL_MS = 120_000;
|
||||
|
||||
handleConnection(client: WebSocket) {
|
||||
this.logger.log('Client connected');
|
||||
|
|
@ -84,21 +90,59 @@ export class AgentStreamGateway implements OnGatewayConnection, OnGatewayDisconn
|
|||
data: { sessionId, taskId: payload.taskId },
|
||||
});
|
||||
|
||||
// Replay any buffered events that arrived before this client subscribed
|
||||
const buffered = this.sessionBuffer.get(sessionId);
|
||||
if (buffered && buffered.length > 0) {
|
||||
this.logger.log(`Replaying ${buffered.length} buffered events for session ${sessionId}`);
|
||||
for (const evt of buffered) {
|
||||
this.sendTo(client, { event: 'stream_event', data: evt });
|
||||
}
|
||||
// Clear buffer after replay (all events delivered)
|
||||
this.sessionBuffer.delete(sessionId);
|
||||
}
|
||||
|
||||
this.logger.log(`Client subscribed to session ${sessionId}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Emits a stream event to all WebSocket clients subscribed to the session.
|
||||
* If no clients are subscribed yet, the event is buffered for late subscribers.
|
||||
*/
|
||||
emitStreamEvent(sessionId: string, event: any) {
|
||||
const clients = this.sessionClients.get(sessionId);
|
||||
if (!clients || clients.size === 0) return;
|
||||
const hasSubscribers = clients && clients.size > 0;
|
||||
|
||||
const message = JSON.stringify({ event: 'stream_event', data: event });
|
||||
for (const client of clients) {
|
||||
if (client.readyState === 1 /* WebSocket.OPEN */) {
|
||||
client.send(message);
|
||||
if (hasSubscribers) {
|
||||
const message = JSON.stringify({ event: 'stream_event', data: event });
|
||||
for (const client of clients) {
|
||||
if (client.readyState === 1 /* WebSocket.OPEN */) {
|
||||
client.send(message);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// No subscribers yet — buffer the event for late joiners
|
||||
if (!this.sessionBuffer.has(sessionId)) {
|
||||
this.sessionBuffer.set(sessionId, []);
|
||||
// Schedule cleanup in case no one ever subscribes
|
||||
setTimeout(() => {
|
||||
if (this.sessionBuffer.has(sessionId)) {
|
||||
this.sessionBuffer.delete(sessionId);
|
||||
}
|
||||
}, this.BUFFER_TTL_MS);
|
||||
}
|
||||
const buffer = this.sessionBuffer.get(sessionId)!;
|
||||
buffer.push(event);
|
||||
if (buffer.length > this.MAX_BUFFER_SIZE) {
|
||||
buffer.shift(); // Drop oldest event
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up buffer on terminal events
|
||||
if (event.type === 'completed' || event.type === 'error') {
|
||||
// Give late subscribers a small window to connect and receive replay
|
||||
setTimeout(() => {
|
||||
this.sessionBuffer.delete(sessionId);
|
||||
}, 10_000);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue