diff --git a/packages/services/agent-service/src/domain/services/voice-session-manager.service.ts b/packages/services/agent-service/src/domain/services/voice-session-manager.service.ts index 528dec9..31bea9d 100644 --- a/packages/services/agent-service/src/domain/services/voice-session-manager.service.ts +++ b/packages/services/agent-service/src/domain/services/voice-session-manager.service.ts @@ -1,22 +1,68 @@ /** * VoiceSessionManager * - * Manages long-lived agent run loops for voice calls. + * Manages long-lived Claude Agent SDK run loops for voice calls. * - * Lifecycle: - * startSession(sessionId) → spawn background run loop, ready for messages - * injectMessage(sessionId) → enqueue speech turn; loop processes sequentially - * terminateSession(sessionId) → send poison-pill + abort; loop exits cleanly + * Architecture overview + * --------------------- + * Text chat uses a stateless per-turn model: each user message becomes an + * independent POST /tasks request that starts a new SDK process. Voice calls + * need a different model because: + * 1. Calls can last minutes; spawning a new process per utterance is too slow. + * 2. The SDK supports native session resume (sdkSessionId), letting it carry + * tool state and conversation context across turns without re-sending history. + * 3. The agent must be explicitly shut down when the user hangs up, not just + * left running until it times out. * - * Run-loop design (per voice session): - * while not terminated: - * message ← queue.dequeue() ← blocks between speech turns - * executeTask(message, resume) ← one SDK turn, streams events via gateway - * capture new sdkSessionId ← for next turn's native resume + * This service implements the alternative model: + * • One background run loop per active voice call (Node.js async, not a thread). + * • An AsyncQueue bridges HTTP inject requests to the loop. + * • The loop blocks on queue.dequeue() between turns (zero CPU while idle). + * • On each turn it calls ClaudeAgentSdkEngine.executeTask() and streams the + * resulting events to the WebSocket gateway (picked up by voice-agent → TTS). + * • The SDK session ID returned after each turn is saved to AgentSession.metadata + * so the NEXT turn's executeTask() call can resume from where the previous + * turn left off (native SDK resume, no re-sending of conversation history). * - * This replaces the per-turn "POST /tasks" model used by text chat. - * The SDK session is kept alive across turns via the `resume` option, - * and the run loop is explicitly terminated when the user hangs up. + * Lifecycle + * --------- + * startSession(sessionId) — create queue + AbortController, start loop + * injectMessage(sessionId) — enqueue speech turn; if a turn is running, + * abort it first (per-turn interrupt support) + * terminateSession(sessionId) — abort session + drain queue + enqueue null + * sentinel; wait ≤5 s for loop to exit + * + * Interruption model (per-turn AbortController) + * --------------------------------------------- + * Two levels of abort exist: + * + * sessionAbortController — session-scoped; fired only on terminateSession(). + * Propagates into each turn via an event listener. + * + * turnAbort (per turn) — created fresh for each executeTurn() call. + * Stored as handle.currentTurnAbort so that + * injectMessage() can abort the RUNNING turn before + * enqueuing the new message. + * + * When the user interrupts (speaks while the agent is responding): + * 1. voice-agent LiveKit framework stops TTS playback immediately. + * 2. voice-agent calls POST /:sessionId/voice/inject with new utterance. + * 3. injectMessage() sees handle.currentTurnAbort !== null → aborts it. + * 4. The for-await loop in executeTurn() checks turnAbort.signal.aborted + * on the NEXT received SDK event → breaks silently (no error emitted). + * 5. The new message is enqueued; the loop dequeues it and starts a fresh turn. + * + * Because `break` does not throw, the catch block's error-event emission is + * never triggered by an interrupt — the user hears no "agent error" message. + * Any partial assistant text accumulated before the break is saved to + * conversation history with a "[中断]" suffix for context continuity. + * + * Tenant context + * -------------- + * The run loop is a background Promise, outside any HTTP request context. + * TenantContextService.run() wraps each executeTurn() call to inject the + * tenant's AsyncLocalStorage context (schema name, quotas, etc.) — the same + * pattern used by the standing-order executor in ops-service. */ import { Injectable, Logger } from '@nestjs/common'; import { AsyncQueue } from '../../infrastructure/voice/async-queue'; @@ -39,8 +85,10 @@ const TERMINATE: null = null; interface VoiceSessionHandle { /** Message queue: string = user speech turn; null = terminate signal. */ queue: AsyncQueue; - /** Allows aborting the currently-running SDK executeTask call. */ + /** Aborts the entire run loop (used on session terminate). */ abortController: AbortController; + /** Aborts the currently-executing SDK turn only (replaced each turn). */ + currentTurnAbort: AbortController | null; /** Tenant who owns this voice session. */ tenantId: string; /** Background run-loop promise (resolved when loop exits). */ @@ -82,6 +130,7 @@ export class VoiceSessionManager { const handle: VoiceSessionHandle = { queue, abortController, + currentTurnAbort: null, tenantId, runLoop: this.runLoop(sessionId, tenantId, queue, abortController), }; @@ -97,6 +146,15 @@ export class VoiceSessionManager { async injectMessage(sessionId: string, message: string): Promise { const handle = this.sessions.get(sessionId); if (!handle) return false; + + // If a turn is currently running, abort it immediately so the new message + // can be processed without waiting for the old SDK call to finish. + if (handle.currentTurnAbort) { + this.logger.log(`[VoiceSession ${sessionId}] Interrupting current turn for new message`); + handle.currentTurnAbort.abort(); + handle.currentTurnAbort = null; + } + handle.queue.enqueue(message); this.logger.log(`[VoiceSession ${sessionId}] Injected: "${message.slice(0, 80)}"`); return true; @@ -202,8 +260,17 @@ export class VoiceSessionManager { sessionId: string, tenantId: string, message: string, - abortController: AbortController, + sessionAbortController: AbortController, ): Promise { + // Create a per-turn abort controller so this turn can be interrupted + // independently when the user speaks again mid-response. + const turnAbort = new AbortController(); + const handle = this.sessions.get(sessionId); + if (handle) handle.currentTurnAbort = turnAbort; + + // Combine session-level abort with turn-level abort: if either fires, abort the turn. + const onSessionAbort = () => turnAbort.abort(); + sessionAbortController.signal.addEventListener('abort', onSessionAbort, { once: true }); const session = await this.sessionRepository.findById(sessionId); if (!session) { this.logger.error(`[VoiceSession ${sessionId}] Session not found in DB — cannot execute turn`); @@ -264,8 +331,8 @@ export class VoiceSessionManager { }); for await (const event of stream) { - // Exit early if the voice session was terminated mid-turn - if (abortController.signal.aborted) break; + // Exit early if this turn was interrupted (user spoke again) or session terminated + if (turnAbort.signal.aborted) break; if (!voiceFilteredTypes.has(event.type)) { this.gateway.emitStreamEvent(sessionId, event); @@ -322,6 +389,14 @@ export class VoiceSessionManager { }); } } finally { + // Remove the session-abort listener to avoid memory leaks + sessionAbortController.signal.removeEventListener('abort', onSessionAbort); + + // Clear the per-turn abort ref on the handle (if it still points to this turn) + if (handle && handle.currentTurnAbort === turnAbort) { + handle.currentTurnAbort = null; + } + // If aborted mid-turn, save any partial text accumulated before the abort if (!finished && textParts.length > 0) { await this.contextService diff --git a/packages/services/voice-agent/src/agent.py b/packages/services/voice-agent/src/agent.py index 27ae001..9581aac 100644 --- a/packages/services/voice-agent/src/agent.py +++ b/packages/services/voice-agent/src/agent.py @@ -4,6 +4,34 @@ IT0 Voice Agent — LiveKit Agents v1.x entry point. Uses the official AgentServer + @server.rtc_session() pattern. Pipeline: VAD → STT → LLM (via agent-service) → TTS. +Voice Session Lifecycle (long-lived agent run loop) +---------------------------------------------------- +Each voice call maps to ONE long-lived agent session in agent-service, +instead of spawning a new process for every speech turn. + + Call starts → POST /api/v1/agent/sessions/voice/start + agent-service creates an AgentSession, starts a background + run loop, and returns a sessionId. + + User speaks → LiveKit STT → AgentServiceLLM._run() + → POST /:sessionId/voice/inject + agent-service enqueues the utterance; run loop picks it up, + calls Claude Agent SDK, streams events back via WebSocket. + + User hangs up → room "disconnected" event → _on_room_disconnect() + → DELETE /:sessionId/voice + agent-service aborts the run loop and marks session completed. + +Interruption (mid-turn abort) +------------------------------ +When the user speaks while the agent is still responding: + 1. LiveKit framework stops TTS playback immediately (client-side). + 2. STT produces the new utterance → voice/inject is called. + 3. agent-service detects a turn is already running → aborts it (per-turn + AbortController) → enqueues the new message. + 4. The SDK loop breaks silently; no error message is emitted to TTS. + 5. The new turn starts, producing the response to the interrupting utterance. + Agent State & Thinking Indicator --------------------------------- LiveKit AgentSession (v1.4.3+) automatically publishes the participant