feat(voice): add per-turn interrupt support to VoiceSessionManager
Implements a two-level abort controller design to support real-time
interruption when the user speaks while the agent is still responding:
sessionAbortController (session-scoped)
- Created once when startSession() is called
- Fired only by terminateSession() (user hangs up)
- Propagated into each turn via addEventListener
turnAbort (per-turn, stored as handle.currentTurnAbort)
- Created fresh at the start of each executeTurn() call
- Stored on the VoiceSessionHandle so injectMessage() can abort it
- When a new inject arrives while a turn is running, injectMessage()
calls turnAbort.abort() BEFORE enqueuing the new message
Interruption flow:
1. User speaks mid-response → LiveKit stops TTS playback (client-side)
2. STT utterance → POST voice/inject → injectMessage() fires
3. handle.currentTurnAbort.abort() called → sets aborted flag
4. for-await loop checks turnAbort.signal.aborted on next SDK event → break
5. catch block NOT reached (break ≠ exception) → no error event emitted
6. finally block saves partial text with "[中断]" suffix to history
7. New message dequeued → fresh executeTurn() starts immediately
Why no "Agent error" message plays to the user:
- break exits the for-await loop silently, not via exception
- The catch block's error-event emission is guarded by err?.name !== 'AbortError'
AND requires an actual exception; a plain break never enters catch
- Empty or partial responses are filtered by `if response:` in agent.py
Also update module-level JSDoc with full architecture explanation covering
the long-lived run loop design, two-level abort hierarchy, tenant context
injection pattern, and SDK session resume across turns.
Update agent.py module docstring to document voice session lifecycle and
interruption flow for future maintainers.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
635cca18fa
commit
d097c64c81
|
|
@ -1,22 +1,68 @@
|
||||||
/**
|
/**
|
||||||
* VoiceSessionManager
|
* VoiceSessionManager
|
||||||
*
|
*
|
||||||
* Manages long-lived agent run loops for voice calls.
|
* Manages long-lived Claude Agent SDK run loops for voice calls.
|
||||||
*
|
*
|
||||||
* Lifecycle:
|
* Architecture overview
|
||||||
* startSession(sessionId) → spawn background run loop, ready for messages
|
* ---------------------
|
||||||
* injectMessage(sessionId) → enqueue speech turn; loop processes sequentially
|
* Text chat uses a stateless per-turn model: each user message becomes an
|
||||||
* terminateSession(sessionId) → send poison-pill + abort; loop exits cleanly
|
* independent POST /tasks request that starts a new SDK process. Voice calls
|
||||||
|
* need a different model because:
|
||||||
|
* 1. Calls can last minutes; spawning a new process per utterance is too slow.
|
||||||
|
* 2. The SDK supports native session resume (sdkSessionId), letting it carry
|
||||||
|
* tool state and conversation context across turns without re-sending history.
|
||||||
|
* 3. The agent must be explicitly shut down when the user hangs up, not just
|
||||||
|
* left running until it times out.
|
||||||
*
|
*
|
||||||
* Run-loop design (per voice session):
|
* This service implements the alternative model:
|
||||||
* while not terminated:
|
* • One background run loop per active voice call (Node.js async, not a thread).
|
||||||
* message ← queue.dequeue() ← blocks between speech turns
|
* • An AsyncQueue<string | null> bridges HTTP inject requests to the loop.
|
||||||
* executeTask(message, resume) ← one SDK turn, streams events via gateway
|
* • The loop blocks on queue.dequeue() between turns (zero CPU while idle).
|
||||||
* capture new sdkSessionId ← for next turn's native resume
|
* • On each turn it calls ClaudeAgentSdkEngine.executeTask() and streams the
|
||||||
|
* resulting events to the WebSocket gateway (picked up by voice-agent → TTS).
|
||||||
|
* • The SDK session ID returned after each turn is saved to AgentSession.metadata
|
||||||
|
* so the NEXT turn's executeTask() call can resume from where the previous
|
||||||
|
* turn left off (native SDK resume, no re-sending of conversation history).
|
||||||
*
|
*
|
||||||
* This replaces the per-turn "POST /tasks" model used by text chat.
|
* Lifecycle
|
||||||
* The SDK session is kept alive across turns via the `resume` option,
|
* ---------
|
||||||
* and the run loop is explicitly terminated when the user hangs up.
|
* startSession(sessionId) — create queue + AbortController, start loop
|
||||||
|
* injectMessage(sessionId) — enqueue speech turn; if a turn is running,
|
||||||
|
* abort it first (per-turn interrupt support)
|
||||||
|
* terminateSession(sessionId) — abort session + drain queue + enqueue null
|
||||||
|
* sentinel; wait ≤5 s for loop to exit
|
||||||
|
*
|
||||||
|
* Interruption model (per-turn AbortController)
|
||||||
|
* ---------------------------------------------
|
||||||
|
* Two levels of abort exist:
|
||||||
|
*
|
||||||
|
* sessionAbortController — session-scoped; fired only on terminateSession().
|
||||||
|
* Propagates into each turn via an event listener.
|
||||||
|
*
|
||||||
|
* turnAbort (per turn) — created fresh for each executeTurn() call.
|
||||||
|
* Stored as handle.currentTurnAbort so that
|
||||||
|
* injectMessage() can abort the RUNNING turn before
|
||||||
|
* enqueuing the new message.
|
||||||
|
*
|
||||||
|
* When the user interrupts (speaks while the agent is responding):
|
||||||
|
* 1. voice-agent LiveKit framework stops TTS playback immediately.
|
||||||
|
* 2. voice-agent calls POST /:sessionId/voice/inject with new utterance.
|
||||||
|
* 3. injectMessage() sees handle.currentTurnAbort !== null → aborts it.
|
||||||
|
* 4. The for-await loop in executeTurn() checks turnAbort.signal.aborted
|
||||||
|
* on the NEXT received SDK event → breaks silently (no error emitted).
|
||||||
|
* 5. The new message is enqueued; the loop dequeues it and starts a fresh turn.
|
||||||
|
*
|
||||||
|
* Because `break` does not throw, the catch block's error-event emission is
|
||||||
|
* never triggered by an interrupt — the user hears no "agent error" message.
|
||||||
|
* Any partial assistant text accumulated before the break is saved to
|
||||||
|
* conversation history with a "[中断]" suffix for context continuity.
|
||||||
|
*
|
||||||
|
* Tenant context
|
||||||
|
* --------------
|
||||||
|
* The run loop is a background Promise, outside any HTTP request context.
|
||||||
|
* TenantContextService.run() wraps each executeTurn() call to inject the
|
||||||
|
* tenant's AsyncLocalStorage context (schema name, quotas, etc.) — the same
|
||||||
|
* pattern used by the standing-order executor in ops-service.
|
||||||
*/
|
*/
|
||||||
import { Injectable, Logger } from '@nestjs/common';
|
import { Injectable, Logger } from '@nestjs/common';
|
||||||
import { AsyncQueue } from '../../infrastructure/voice/async-queue';
|
import { AsyncQueue } from '../../infrastructure/voice/async-queue';
|
||||||
|
|
@ -39,8 +85,10 @@ const TERMINATE: null = null;
|
||||||
interface VoiceSessionHandle {
|
interface VoiceSessionHandle {
|
||||||
/** Message queue: string = user speech turn; null = terminate signal. */
|
/** Message queue: string = user speech turn; null = terminate signal. */
|
||||||
queue: AsyncQueue<string | null>;
|
queue: AsyncQueue<string | null>;
|
||||||
/** Allows aborting the currently-running SDK executeTask call. */
|
/** Aborts the entire run loop (used on session terminate). */
|
||||||
abortController: AbortController;
|
abortController: AbortController;
|
||||||
|
/** Aborts the currently-executing SDK turn only (replaced each turn). */
|
||||||
|
currentTurnAbort: AbortController | null;
|
||||||
/** Tenant who owns this voice session. */
|
/** Tenant who owns this voice session. */
|
||||||
tenantId: string;
|
tenantId: string;
|
||||||
/** Background run-loop promise (resolved when loop exits). */
|
/** Background run-loop promise (resolved when loop exits). */
|
||||||
|
|
@ -82,6 +130,7 @@ export class VoiceSessionManager {
|
||||||
const handle: VoiceSessionHandle = {
|
const handle: VoiceSessionHandle = {
|
||||||
queue,
|
queue,
|
||||||
abortController,
|
abortController,
|
||||||
|
currentTurnAbort: null,
|
||||||
tenantId,
|
tenantId,
|
||||||
runLoop: this.runLoop(sessionId, tenantId, queue, abortController),
|
runLoop: this.runLoop(sessionId, tenantId, queue, abortController),
|
||||||
};
|
};
|
||||||
|
|
@ -97,6 +146,15 @@ export class VoiceSessionManager {
|
||||||
async injectMessage(sessionId: string, message: string): Promise<boolean> {
|
async injectMessage(sessionId: string, message: string): Promise<boolean> {
|
||||||
const handle = this.sessions.get(sessionId);
|
const handle = this.sessions.get(sessionId);
|
||||||
if (!handle) return false;
|
if (!handle) return false;
|
||||||
|
|
||||||
|
// If a turn is currently running, abort it immediately so the new message
|
||||||
|
// can be processed without waiting for the old SDK call to finish.
|
||||||
|
if (handle.currentTurnAbort) {
|
||||||
|
this.logger.log(`[VoiceSession ${sessionId}] Interrupting current turn for new message`);
|
||||||
|
handle.currentTurnAbort.abort();
|
||||||
|
handle.currentTurnAbort = null;
|
||||||
|
}
|
||||||
|
|
||||||
handle.queue.enqueue(message);
|
handle.queue.enqueue(message);
|
||||||
this.logger.log(`[VoiceSession ${sessionId}] Injected: "${message.slice(0, 80)}"`);
|
this.logger.log(`[VoiceSession ${sessionId}] Injected: "${message.slice(0, 80)}"`);
|
||||||
return true;
|
return true;
|
||||||
|
|
@ -202,8 +260,17 @@ export class VoiceSessionManager {
|
||||||
sessionId: string,
|
sessionId: string,
|
||||||
tenantId: string,
|
tenantId: string,
|
||||||
message: string,
|
message: string,
|
||||||
abortController: AbortController,
|
sessionAbortController: AbortController,
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
|
// Create a per-turn abort controller so this turn can be interrupted
|
||||||
|
// independently when the user speaks again mid-response.
|
||||||
|
const turnAbort = new AbortController();
|
||||||
|
const handle = this.sessions.get(sessionId);
|
||||||
|
if (handle) handle.currentTurnAbort = turnAbort;
|
||||||
|
|
||||||
|
// Combine session-level abort with turn-level abort: if either fires, abort the turn.
|
||||||
|
const onSessionAbort = () => turnAbort.abort();
|
||||||
|
sessionAbortController.signal.addEventListener('abort', onSessionAbort, { once: true });
|
||||||
const session = await this.sessionRepository.findById(sessionId);
|
const session = await this.sessionRepository.findById(sessionId);
|
||||||
if (!session) {
|
if (!session) {
|
||||||
this.logger.error(`[VoiceSession ${sessionId}] Session not found in DB — cannot execute turn`);
|
this.logger.error(`[VoiceSession ${sessionId}] Session not found in DB — cannot execute turn`);
|
||||||
|
|
@ -264,8 +331,8 @@ export class VoiceSessionManager {
|
||||||
});
|
});
|
||||||
|
|
||||||
for await (const event of stream) {
|
for await (const event of stream) {
|
||||||
// Exit early if the voice session was terminated mid-turn
|
// Exit early if this turn was interrupted (user spoke again) or session terminated
|
||||||
if (abortController.signal.aborted) break;
|
if (turnAbort.signal.aborted) break;
|
||||||
|
|
||||||
if (!voiceFilteredTypes.has(event.type)) {
|
if (!voiceFilteredTypes.has(event.type)) {
|
||||||
this.gateway.emitStreamEvent(sessionId, event);
|
this.gateway.emitStreamEvent(sessionId, event);
|
||||||
|
|
@ -322,6 +389,14 @@ export class VoiceSessionManager {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
// Remove the session-abort listener to avoid memory leaks
|
||||||
|
sessionAbortController.signal.removeEventListener('abort', onSessionAbort);
|
||||||
|
|
||||||
|
// Clear the per-turn abort ref on the handle (if it still points to this turn)
|
||||||
|
if (handle && handle.currentTurnAbort === turnAbort) {
|
||||||
|
handle.currentTurnAbort = null;
|
||||||
|
}
|
||||||
|
|
||||||
// If aborted mid-turn, save any partial text accumulated before the abort
|
// If aborted mid-turn, save any partial text accumulated before the abort
|
||||||
if (!finished && textParts.length > 0) {
|
if (!finished && textParts.length > 0) {
|
||||||
await this.contextService
|
await this.contextService
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,34 @@ IT0 Voice Agent — LiveKit Agents v1.x entry point.
|
||||||
Uses the official AgentServer + @server.rtc_session() pattern.
|
Uses the official AgentServer + @server.rtc_session() pattern.
|
||||||
Pipeline: VAD → STT → LLM (via agent-service) → TTS.
|
Pipeline: VAD → STT → LLM (via agent-service) → TTS.
|
||||||
|
|
||||||
|
Voice Session Lifecycle (long-lived agent run loop)
|
||||||
|
----------------------------------------------------
|
||||||
|
Each voice call maps to ONE long-lived agent session in agent-service,
|
||||||
|
instead of spawning a new process for every speech turn.
|
||||||
|
|
||||||
|
Call starts → POST /api/v1/agent/sessions/voice/start
|
||||||
|
agent-service creates an AgentSession, starts a background
|
||||||
|
run loop, and returns a sessionId.
|
||||||
|
|
||||||
|
User speaks → LiveKit STT → AgentServiceLLM._run()
|
||||||
|
→ POST /:sessionId/voice/inject
|
||||||
|
agent-service enqueues the utterance; run loop picks it up,
|
||||||
|
calls Claude Agent SDK, streams events back via WebSocket.
|
||||||
|
|
||||||
|
User hangs up → room "disconnected" event → _on_room_disconnect()
|
||||||
|
→ DELETE /:sessionId/voice
|
||||||
|
agent-service aborts the run loop and marks session completed.
|
||||||
|
|
||||||
|
Interruption (mid-turn abort)
|
||||||
|
------------------------------
|
||||||
|
When the user speaks while the agent is still responding:
|
||||||
|
1. LiveKit framework stops TTS playback immediately (client-side).
|
||||||
|
2. STT produces the new utterance → voice/inject is called.
|
||||||
|
3. agent-service detects a turn is already running → aborts it (per-turn
|
||||||
|
AbortController) → enqueues the new message.
|
||||||
|
4. The SDK loop breaks silently; no error message is emitted to TTS.
|
||||||
|
5. The new turn starts, producing the response to the interrupting utterance.
|
||||||
|
|
||||||
Agent State & Thinking Indicator
|
Agent State & Thinking Indicator
|
||||||
---------------------------------
|
---------------------------------
|
||||||
LiveKit AgentSession (v1.4.3+) automatically publishes the participant
|
LiveKit AgentSession (v1.4.3+) automatically publishes the participant
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue