From 635cca18fa20676bcb53c357384f39c74914d8c1 Mon Sep 17 00:00:00 2001 From: hailin Date: Wed, 4 Mar 2026 04:01:02 -0800 Subject: [PATCH] feat(voice): long-lived agent session with proper hangup termination Replace the per-turn POST /tasks approach for voice calls with a long-lived agent run loop tied to the call lifecycle: agent-service: - Add AsyncQueue utility for blocking message relay - Add VoiceSessionManager: spawns one background run loop per voice call, accepts injected messages, terminates cleanly on hangup - Add VoiceSessionController with 3 endpoints: POST /api/v1/agent/sessions/voice/start (call start) POST /api/v1/agent/sessions/:id/voice/inject (each speech turn) DELETE /api/v1/agent/sessions/:id/voice (user hung up) - Register VoiceSessionManager + VoiceSessionController in agent.module.ts voice-agent: - AgentServiceLLM: add start_voice_session(), terminate_voice_session(), inject_text_message() (voice/inject-aware), _do_inject_voice() - AgentServiceLLMStream._run(): use voice/inject path when voice session is active; fall back to per-task POST for text-chat / non-SDK engines - entrypoint(): call start_voice_session() after session.start(); register _on_room_disconnect that calls terminate_voice_session() so the agent is always killed when the user hangs up Co-Authored-By: Claude Sonnet 4.6 --- .../agent-service/src/agent.module.ts | 6 +- .../services/voice-session-manager.service.ts | 333 ++++++++++++ .../src/infrastructure/voice/async-queue.ts | 39 ++ .../controllers/voice-session.controller.ts | 148 ++++++ packages/services/voice-agent/src/agent.py | 48 +- .../voice-agent/src/plugins/agent_llm.py | 481 ++++++++++++++---- 6 files changed, 938 insertions(+), 117 deletions(-) create mode 100644 packages/services/agent-service/src/domain/services/voice-session-manager.service.ts create mode 100644 packages/services/agent-service/src/infrastructure/voice/async-queue.ts create mode 100644 packages/services/agent-service/src/interfaces/rest/controllers/voice-session.controller.ts diff --git a/packages/services/agent-service/src/agent.module.ts b/packages/services/agent-service/src/agent.module.ts index 7d2caaf..b93ef1e 100644 --- a/packages/services/agent-service/src/agent.module.ts +++ b/packages/services/agent-service/src/agent.module.ts @@ -43,7 +43,9 @@ import { VoiceConfigRepository } from './infrastructure/repositories/voice-confi import { UsageRecordRepository } from './infrastructure/repositories/usage-record.repository'; import { VoiceConfigService } from './infrastructure/services/voice-config.service'; import { VoiceConfigController } from './interfaces/rest/controllers/voice-config.controller'; +import { VoiceSessionController } from './interfaces/rest/controllers/voice-session.controller'; import { ConversationContextService } from './domain/services/conversation-context.service'; +import { VoiceSessionManager } from './domain/services/voice-session-manager.service'; import { EventPublisherService } from './infrastructure/messaging/event-publisher.service'; @Module({ @@ -58,7 +60,8 @@ import { EventPublisherService } from './infrastructure/messaging/event-publishe ], controllers: [ AgentController, SessionController, RiskRulesController, - TenantAgentConfigController, AgentConfigController, VoiceConfigController, SkillsController, HooksController, + TenantAgentConfigController, AgentConfigController, VoiceConfigController, + VoiceSessionController, SkillsController, HooksController, ], providers: [ AgentStreamGateway, @@ -72,6 +75,7 @@ import { EventPublisherService } from './infrastructure/messaging/event-publishe StandingOrderExtractorService, AllowedToolsResolverService, ConversationContextService, + VoiceSessionManager, SessionRepository, TaskRepository, MessageRepository, diff --git a/packages/services/agent-service/src/domain/services/voice-session-manager.service.ts b/packages/services/agent-service/src/domain/services/voice-session-manager.service.ts new file mode 100644 index 0000000..528dec9 --- /dev/null +++ b/packages/services/agent-service/src/domain/services/voice-session-manager.service.ts @@ -0,0 +1,333 @@ +/** + * VoiceSessionManager + * + * Manages long-lived agent run loops for voice calls. + * + * Lifecycle: + * startSession(sessionId) → spawn background run loop, ready for messages + * injectMessage(sessionId) → enqueue speech turn; loop processes sequentially + * terminateSession(sessionId) → send poison-pill + abort; loop exits cleanly + * + * Run-loop design (per voice session): + * while not terminated: + * message ← queue.dequeue() ← blocks between speech turns + * executeTask(message, resume) ← one SDK turn, streams events via gateway + * capture new sdkSessionId ← for next turn's native resume + * + * This replaces the per-turn "POST /tasks" model used by text chat. + * The SDK session is kept alive across turns via the `resume` option, + * and the run loop is explicitly terminated when the user hangs up. + */ +import { Injectable, Logger } from '@nestjs/common'; +import { AsyncQueue } from '../../infrastructure/voice/async-queue'; +import { EngineRegistry } from '../../infrastructure/engines/engine-registry'; +import { ClaudeAgentSdkEngine } from '../../infrastructure/engines/claude-agent-sdk/claude-agent-sdk-engine'; +import { AgentStreamGateway } from '../../interfaces/ws/agent-stream.gateway'; +import { SessionRepository } from '../../infrastructure/repositories/session.repository'; +import { TaskRepository } from '../../infrastructure/repositories/task.repository'; +import { ConversationContextService } from './conversation-context.service'; +import { AgentEngineType } from '../value-objects/agent-engine-type.vo'; +import { TaskStatus } from '../value-objects/task-status.vo'; +import { AgentTask } from '../entities/agent-task.entity'; +import { TenantContextService } from '@it0/common'; +import * as crypto from 'crypto'; + +/** Sentinel value enqueued to signal the run loop to exit. */ +const TERMINATE: null = null; + +/** In-memory handle for a running voice session. */ +interface VoiceSessionHandle { + /** Message queue: string = user speech turn; null = terminate signal. */ + queue: AsyncQueue; + /** Allows aborting the currently-running SDK executeTask call. */ + abortController: AbortController; + /** Tenant who owns this voice session. */ + tenantId: string; + /** Background run-loop promise (resolved when loop exits). */ + runLoop: Promise; +} + +@Injectable() +export class VoiceSessionManager { + private readonly logger = new Logger(VoiceSessionManager.name); + + /** Map from agentSessionId → handle. */ + private readonly sessions = new Map(); + + constructor( + private readonly engineRegistry: EngineRegistry, + private readonly gateway: AgentStreamGateway, + private readonly sessionRepository: SessionRepository, + private readonly taskRepository: TaskRepository, + private readonly contextService: ConversationContextService, + ) {} + + // --------------------------------------------------------------------------- + // Public API + // --------------------------------------------------------------------------- + + /** + * Start a long-lived voice agent session. + * Safe to call multiple times for the same sessionId (idempotent). + */ + async startSession(sessionId: string, tenantId: string): Promise { + if (this.sessions.has(sessionId)) { + this.logger.warn(`[VoiceSession ${sessionId}] Already active — ignoring duplicate start`); + return; + } + + const queue = new AsyncQueue(); + const abortController = new AbortController(); + + const handle: VoiceSessionHandle = { + queue, + abortController, + tenantId, + runLoop: this.runLoop(sessionId, tenantId, queue, abortController), + }; + + this.sessions.set(sessionId, handle); + this.logger.log(`[VoiceSession ${sessionId}] Started for tenant=${tenantId}`); + } + + /** + * Inject a new speech-turn message into the running voice session. + * Returns true if injected, false if session not found. + */ + async injectMessage(sessionId: string, message: string): Promise { + const handle = this.sessions.get(sessionId); + if (!handle) return false; + handle.queue.enqueue(message); + this.logger.log(`[VoiceSession ${sessionId}] Injected: "${message.slice(0, 80)}"`); + return true; + } + + /** + * Terminate the voice session (user hung up or explicitly ended). + * Enqueues the poison-pill, aborts any in-flight SDK call, + * and waits for the run loop to exit. + */ + async terminateSession(sessionId: string): Promise { + const handle = this.sessions.get(sessionId); + if (!handle) { + this.logger.debug(`[VoiceSession ${sessionId}] Already gone — ignoring terminate`); + return; + } + + this.logger.log(`[VoiceSession ${sessionId}] Terminating…`); + + // Abort any in-flight executeTask call + handle.abortController.abort(); + + // Poison-pill: unblock the queue.dequeue() if loop is idle between turns + handle.queue.drain(TERMINATE); + handle.queue.enqueue(TERMINATE); + + // Wait for loop to exit (max 5 s to avoid hanging the HTTP request) + await Promise.race([ + handle.runLoop.catch(() => {}), + new Promise((r) => setTimeout(r, 5_000)), + ]); + + this.sessions.delete(sessionId); + this.logger.log(`[VoiceSession ${sessionId}] Terminated`); + } + + /** Returns true if a run loop is active for this session. */ + isActive(sessionId: string): boolean { + return this.sessions.has(sessionId); + } + + // --------------------------------------------------------------------------- + // Private: run loop + // --------------------------------------------------------------------------- + + /** + * Background run loop — one instance per voice session. + * Blocks on queue.dequeue() between speech turns, + * then executes one SDK turn and streams events to the gateway. + */ + private async runLoop( + sessionId: string, + tenantId: string, + queue: AsyncQueue, + abortController: AbortController, + ): Promise { + // Always get the SDK engine for voice + const engine = this.engineRegistry.switchEngine(AgentEngineType.CLAUDE_AGENT_SDK) as ClaudeAgentSdkEngine; + + try { + while (!abortController.signal.aborted) { + // Block until next speech turn (or terminate signal) + const message = await queue.dequeue(); + if (message === null) break; + + // Execute one turn within the tenant's AsyncLocalStorage context + await TenantContextService.run( + { + tenantId, + tenantName: tenantId, + plan: 'enterprise', + schemaName: `it0_t_${tenantId}`, + maxServers: -1, + maxUsers: -1, + maxStandingOrders: -1, + maxAgentTokensPerMonth: -1, + }, + async () => { + await this.executeTurn(engine, sessionId, tenantId, message, abortController); + }, + ); + } + } catch (err: any) { + if (err?.name !== 'AbortError') { + this.logger.error(`[VoiceSession ${sessionId}] Run loop error: ${err?.message}`); + } + } finally { + this.sessions.delete(sessionId); + this.logger.log(`[VoiceSession ${sessionId}] Run loop exited`); + } + } + + /** + * Execute a single speech turn: + * 1. Create a task record + * 2. Save user message to conversation history + * 3. Load context + SDK resume session ID + * 4. Run engine.executeTask() and stream events to gateway + * 5. Save assistant response + capture new SDK session ID for next resume + */ + private async executeTurn( + engine: ClaudeAgentSdkEngine, + sessionId: string, + tenantId: string, + message: string, + abortController: AbortController, + ): Promise { + const session = await this.sessionRepository.findById(sessionId); + if (!session) { + this.logger.error(`[VoiceSession ${sessionId}] Session not found in DB — cannot execute turn`); + return; + } + + // Create task record for tracking + const task = new AgentTask(); + task.id = crypto.randomUUID(); + task.tenantId = tenantId; + task.sessionId = sessionId; + task.prompt = message; + task.status = TaskStatus.RUNNING; + task.startedAt = new Date(); + task.createdAt = new Date(); + await this.taskRepository.save(task); + + // Notify WS subscribers about the new task so they can track it + this.gateway.emitStreamEvent(sessionId, { type: 'task_info', taskId: task.id }); + + // Save user message to conversation history + await this.contextService.saveUserMessage(sessionId, message); + + // Load context (max 20 messages) — strip the latest user message since + // the engine adds it as the explicit prompt parameter + const history = await this.contextService.loadContext(sessionId, 20); + const historyForEngine = history.slice(0, -1); + + // SDK resume: continue from previous turn if available + const resumeSessionId = (session.metadata as any)?.sdkSessionId as string | undefined; + if (resumeSessionId) { + this.logger.log(`[VoiceSession ${sessionId}] Resuming SDK session: ${resumeSessionId}`); + } + + // Voice-mode system prompt: concise oral Chinese, no markdown / tool details + const voiceSystemPrompt = + '你正在通过语音与用户实时对话。请严格遵守以下规则:\n' + + '1. 只输出用户关注的最终答案,不要输出工具调用过程、中间步骤或技术细节\n' + + '2. 用简洁自然的口语中文回答,像面对面对话一样\n' + + '3. 回复要简短精炼,适合语音播报,通常1-3句话即可\n' + + '4. 不要使用markdown格式、代码块、列表符号等文本格式'; + + // Events to suppress in voice mode (only text/completed/error reach TTS) + const voiceFilteredTypes = new Set(['thinking', 'tool_use', 'tool_result']); + + const textParts: string[] = []; + let finished = false; + + try { + const stream = engine.executeTask({ + sessionId, + prompt: message, + systemPrompt: voiceSystemPrompt, + allowedTools: [], + maxTurns: 10, + conversationHistory: historyForEngine.length > 0 ? historyForEngine : undefined, + resumeSessionId, + }); + + for await (const event of stream) { + // Exit early if the voice session was terminated mid-turn + if (abortController.signal.aborted) break; + + if (!voiceFilteredTypes.has(event.type)) { + this.gateway.emitStreamEvent(sessionId, event); + } + + if (event.type === 'text') { + textParts.push(event.content); + } + + if (event.type === 'completed' && !finished) { + finished = true; + task.status = TaskStatus.COMPLETED; + task.result = event.summary; + task.tokensUsed = event.tokensUsed; + task.completedAt = new Date(); + await this.taskRepository.save(task); + + // Persist assistant response to conversation history + const assistantText = textParts.join('') || event.summary; + if (assistantText) { + await this.contextService.saveAssistantMessage(sessionId, assistantText); + } + + // Capture the new SDK session ID so the next turn can resume from here + const sdkSessionId = engine.getSdkSessionId(sessionId); + session.metadata = { + ...session.metadata as Record, + sdkSessionId: sdkSessionId ?? (session.metadata as any)?.sdkSessionId, + }; + session.status = 'active'; + session.updatedAt = new Date(); + await this.sessionRepository.save(session); + } + + if (event.type === 'error' && !finished) { + finished = true; + task.status = TaskStatus.FAILED; + task.result = event.message; + task.completedAt = new Date(); + await this.taskRepository.save(task); + } + } + } catch (err: any) { + if (!finished && err?.name !== 'AbortError') { + task.status = TaskStatus.FAILED; + task.result = err?.message ?? 'Voice session turn error'; + task.completedAt = new Date(); + await this.taskRepository.save(task); + + this.gateway.emitStreamEvent(sessionId, { + type: 'error', + message: task.result ?? 'Voice session turn error', + code: 'VOICE_TURN_ERROR', + }); + } + } finally { + // If aborted mid-turn, save any partial text accumulated before the abort + if (!finished && textParts.length > 0) { + await this.contextService + .saveAssistantMessage(sessionId, textParts.join('') + '\n[中断]') + .catch(() => {}); + } + } + } +} diff --git a/packages/services/agent-service/src/infrastructure/voice/async-queue.ts b/packages/services/agent-service/src/infrastructure/voice/async-queue.ts new file mode 100644 index 0000000..eb3b660 --- /dev/null +++ b/packages/services/agent-service/src/infrastructure/voice/async-queue.ts @@ -0,0 +1,39 @@ +/** + * AsyncQueue — simple async message queue with blocking dequeue. + * + * Used by VoiceSessionManager to relay speech-turn messages to the + * long-lived agent run loop. The run loop calls dequeue() which + * suspends until the next message is enqueued (or a sentinel null + * is sent to signal termination). + */ +export class AsyncQueue { + private readonly items: T[] = []; + private readonly waiters: Array<(item: T) => void> = []; + + enqueue(item: T): void { + const waiter = this.waiters.shift(); + if (waiter) { + waiter(item); + } else { + this.items.push(item); + } + } + + dequeue(): Promise { + return new Promise((resolve) => { + if (this.items.length > 0) { + resolve(this.items.shift()!); + } else { + this.waiters.push(resolve); + } + }); + } + + /** Resolve all pending dequeue waiters with the given sentinel value (e.g. on shutdown). */ + drain(sentinel: T): void { + for (const waiter of this.waiters) { + waiter(sentinel); + } + this.waiters.length = 0; + } +} diff --git a/packages/services/agent-service/src/interfaces/rest/controllers/voice-session.controller.ts b/packages/services/agent-service/src/interfaces/rest/controllers/voice-session.controller.ts new file mode 100644 index 0000000..dd8a295 --- /dev/null +++ b/packages/services/agent-service/src/interfaces/rest/controllers/voice-session.controller.ts @@ -0,0 +1,148 @@ +/** + * VoiceSessionController + * + * HTTP endpoints for the long-lived voice agent session lifecycle: + * + * POST /api/v1/agent/sessions/voice/start — called when voice call starts + * POST /api/v1/agent/sessions/:sessionId/voice/inject — called per speech turn + * DELETE /api/v1/agent/sessions/:sessionId/voice — called when user hangs up + * + * These endpoints replace the per-turn "POST /api/v1/agent/tasks" flow used + * by text chat. The voice-agent calls voice/start once, injects messages as + * the user speaks, and calls voice/terminate on room disconnect. + */ +import { + Controller, Post, Delete, Param, Body, + NotFoundException, BadRequestException, Logger, +} from '@nestjs/common'; +import { TenantId } from '@it0/common'; +import { VoiceSessionManager } from '../../../domain/services/voice-session-manager.service'; +import { SessionRepository } from '../../../infrastructure/repositories/session.repository'; +import { AgentEngineType } from '../../../domain/value-objects/agent-engine-type.vo'; +import { AgentSession } from '../../../domain/entities/agent-session.entity'; +import * as crypto from 'crypto'; + +@Controller('api/v1/agent/sessions') +export class VoiceSessionController { + private readonly logger = new Logger(VoiceSessionController.name); + + constructor( + private readonly voiceSessionManager: VoiceSessionManager, + private readonly sessionRepository: SessionRepository, + ) {} + + /** + * Start a long-lived voice agent session. + * + * Called ONCE when the user opens a voice call. + * Creates (or reuses) an AgentSession and starts the background run loop. + * Idempotent: safe to call multiple times for the same sessionId. + */ + @Post('voice/start') + async startVoiceSession( + @TenantId() tenantId: string, + @Body() body: { sessionId?: string; systemPrompt?: string }, + ) { + let session: AgentSession | null = null; + + if (body.sessionId) { + session = await this.sessionRepository.findById(body.sessionId); + // Reject cross-tenant access + if (session && session.tenantId !== tenantId) { + session = null; + } + } + + if (!session) { + // Create a fresh session pre-marked as voice mode + session = new AgentSession(); + session.id = crypto.randomUUID(); + session.tenantId = tenantId; + session.engineType = AgentEngineType.CLAUDE_AGENT_SDK; + session.status = 'active'; + session.systemPrompt = body.systemPrompt; + session.metadata = { voiceMode: true, title: '', titleSet: true }; + session.createdAt = new Date(); + session.updatedAt = new Date(); + } else { + // Reuse existing session; mark as voice mode + session.metadata = { + ...session.metadata as Record, + voiceMode: true, + }; + session.status = 'active'; + session.updatedAt = new Date(); + } + + await this.sessionRepository.save(session); + + // Start (or no-op if already running) the long-lived run loop + await this.voiceSessionManager.startSession(session.id, tenantId); + + this.logger.log(`Voice session started: ${session.id} tenant=${tenantId}`); + return { sessionId: session.id }; + } + + /** + * Inject a new speech-turn message into the running voice session. + * + * Called each time STT produces a user utterance. + * The background run loop picks up the message and executes one SDK turn, + * streaming the response back via the WebSocket gateway. + */ + @Post(':sessionId/voice/inject') + async injectVoiceMessage( + @TenantId() tenantId: string, + @Param('sessionId') sessionId: string, + @Body() body: { message: string }, + ) { + if (!body.message?.trim()) { + throw new BadRequestException('message is required'); + } + + // Verify session ownership + const session = await this.sessionRepository.findById(sessionId); + if (!session || session.tenantId !== tenantId) { + throw new NotFoundException(`Voice session ${sessionId} not found`); + } + + // Auto-start the run loop if the voice-agent reconnected (e.g. container restart) + if (!this.voiceSessionManager.isActive(sessionId)) { + this.logger.warn(`[VoiceSession ${sessionId}] Run loop not active — auto-starting`); + await this.voiceSessionManager.startSession(sessionId, tenantId); + } + + const injected = await this.voiceSessionManager.injectMessage(sessionId, body.message); + if (!injected) { + throw new NotFoundException(`Voice session ${sessionId} is not active`); + } + + return { sessionId, injected: true }; + } + + /** + * Terminate the voice session (user hung up). + * + * Sends a poison-pill to the run loop, aborts any in-flight SDK call, + * and marks the AgentSession as completed. + * Idempotent: safe to call even if the session is already gone. + */ + @Delete(':sessionId/voice') + async terminateVoiceSession( + @TenantId() tenantId: string, + @Param('sessionId') sessionId: string, + ) { + const session = await this.sessionRepository.findById(sessionId); + + if (session && session.tenantId === tenantId) { + await this.voiceSessionManager.terminateSession(sessionId); + + session.status = 'completed'; + session.updatedAt = new Date(); + await this.sessionRepository.save(session); + } + + this.logger.log(`Voice session terminated: ${sessionId}`); + return { sessionId, terminated: true }; + } +} diff --git a/packages/services/voice-agent/src/agent.py b/packages/services/voice-agent/src/agent.py index 3ac2f73..27ae001 100644 --- a/packages/services/voice-agent/src/agent.py +++ b/packages/services/voice-agent/src/agent.py @@ -200,18 +200,15 @@ async def entrypoint(ctx: JobContext) -> None: # httpx clients to close when the room disconnects _http_clients: list = [] - async def _on_room_disconnect() -> None: - """Clean up httpx clients when the room disconnects.""" - for client in _http_clients: - try: - await client.aclose() - except Exception: - pass - logger.info("Cleaned up %d httpx client(s) for room %s", - len(_http_clients), ctx.room.name) + # _on_room_disconnect is defined AFTER llm is built (it calls terminate_voice_session). + # We register it via a mutable reference cell so the event listener can find it + # even before the function is defined. + _cleanup_ref: list = [] - # Register cleanup before anything else so it fires even on errors - ctx.room.on("disconnected", lambda *_: asyncio.ensure_future(_on_room_disconnect())) + ctx.room.on( + "disconnected", + lambda *_: asyncio.ensure_future(_cleanup_ref[0]()) if _cleanup_ref else None, + ) try: # Extract auth header from job metadata @@ -331,6 +328,35 @@ async def entrypoint(ctx: JobContext) -> None: room_output_options=room_io.RoomOutputOptions(), ) + # ── Voice session lifecycle ─────────────────────────────────────────── + # For Agent SDK engine: start the long-lived voice session in agent-service. + # This spawns a persistent run loop that accepts injected messages for the + # duration of this call, replacing the per-turn POST /tasks approach. + if engine_type == "claude_agent_sdk": + started_session_id = await llm.start_voice_session() + if started_session_id: + logger.info("Long-lived voice session ready: %s", started_session_id) + else: + logger.warning("start_voice_session failed — falling back to per-task mode") + + # Register the disconnect handler now that llm is ready. + # This is the ONLY disconnect handler; it terminates the voice session + # (signals the agent to stop) AND closes any httpx clients. + async def _on_room_disconnect() -> None: + logger.info("Room disconnected: %s — terminating voice session", ctx.room.name) + # 1. Terminate the long-lived agent run loop (user hung up) + await llm.terminate_voice_session() + # 2. Close httpx clients + for client in _http_clients: + try: + await client.aclose() + except Exception: + pass + logger.info("Cleaned up %d httpx client(s) for room %s", + len(_http_clients), ctx.room.name) + + _cleanup_ref.append(_on_room_disconnect) + # --- Thinking state audio feedback --- # BackgroundAudioPlayer listens for AgentStateChangedEvent from the # session. When state transitions to "thinking" (STT done, waiting for diff --git a/packages/services/voice-agent/src/plugins/agent_llm.py b/packages/services/voice-agent/src/plugins/agent_llm.py index cc9eb32..e011460 100644 --- a/packages/services/voice-agent/src/plugins/agent_llm.py +++ b/packages/services/voice-agent/src/plugins/agent_llm.py @@ -6,8 +6,22 @@ Instead of calling Claude directly, this plugin: 2. Subscribes to the agent-service WebSocket /ws/agent for streaming text events 3. Emits ChatChunk objects into the LiveKit pipeline -In Agent SDK mode, the prompt is wrapped with voice-conversation instructions -so the agent outputs concise spoken Chinese without tool-call details. +Voice Session Mode (long-lived agent) +-------------------------------------- +When engine_type == "claude_agent_sdk", the plugin uses a long-lived voice session: + + Voice call starts → start_voice_session() + → POST /api/v1/agent/sessions/voice/start + → agent-service spawns a persistent run loop for this session + + User speaks (each turn) → AgentServiceLLMStream._run() + → POST /api/v1/agent/sessions/{sessionId}/voice/inject + → run loop receives the message and executes one SDK turn + → streams events back via WebSocket → ChatChunks → TTS + + User hangs up → terminate_voice_session() + → DELETE /api/v1/agent/sessions/{sessionId}/voice + → agent-service kills the run loop and marks session completed This preserves all agent-service capabilities: Tool Use, conversation history, tenant isolation, and session management. @@ -48,7 +62,10 @@ class AgentServiceLLM(llm.LLM): self._auth_header = auth_header self._engine_type = engine_type self._agent_session_id: str | None = None - self._injecting = False # guard: don't clear session during inject + + # Voice session mode: long-lived agent process tied to the call duration. + # True once start_voice_session() completes successfully. + self._voice_session_started: bool = False @property def model(self) -> str: @@ -75,79 +92,179 @@ class AgentServiceLLM(llm.LLM): conn_options=conn_options, ) + # ------------------------------------------------------------------------- + # Voice session lifecycle + # ------------------------------------------------------------------------- + + async def start_voice_session(self, session_id: str | None = None) -> str: + """ + Start a long-lived voice agent session in agent-service. + + Called ONCE when the LiveKit room is connected. The agent-service + spawns a background run loop that stays alive for the entire call, + accepting injected messages via voice/inject. + + Returns the session ID to use for all subsequent inject calls. + """ + headers: dict[str, str] = {"Content-Type": "application/json"} + if self._auth_header: + headers["Authorization"] = self._auth_header + + body: dict[str, Any] = {} + if session_id: + body["sessionId"] = session_id + + try: + async with httpx.AsyncClient( + timeout=httpx.Timeout(connect=10, read=15, write=10, pool=10), + ) as client: + resp = await client.post( + f"{self._agent_service_url}/api/v1/agent/sessions/voice/start", + json=body, + headers=headers, + ) + + if resp.status_code not in (200, 201): + logger.error( + "start_voice_session failed: %d %s", + resp.status_code, resp.text[:200], + ) + return "" + + data = resp.json() + new_session_id: str = data.get("sessionId", "") + self._agent_session_id = new_session_id + self._voice_session_started = True + logger.info("Voice session started: %s", new_session_id) + return new_session_id + + except Exception as exc: + logger.error("start_voice_session error: %s: %s", type(exc).__name__, exc) + return "" + async def inject_text_message( self, *, text: str = "", attachments: list[dict] | None = None, ) -> str: - """Inject a text message (with optional attachments) into the agent session. + """ + Inject a text message (sent via data channel, not speech) into the agent. + + In voice session mode, uses voice/inject and collects the streamed response. + In text mode, uses POST /tasks directly. Returns the complete response text for TTS playback via session.say(). - Uses the same session ID so conversation context is preserved. """ if not text and not attachments: return "" - self._injecting = True try: - return await self._do_inject(text, attachments) + if self._voice_session_started and self._agent_session_id: + return await self._do_inject_voice(text) + else: + return await self._do_inject(text, attachments) except Exception as exc: logger.error("inject_text_message error: %s: %s", type(exc).__name__, exc) return "" - finally: - self._injecting = False + + async def _do_inject_voice(self, text: str) -> str: + """Inject via voice/inject endpoint and collect response text from WS.""" + import time + + agent_url = self._agent_service_url + session_id = self._agent_session_id + headers: dict[str, str] = {"Content-Type": "application/json"} + if self._auth_header: + headers["Authorization"] = self._auth_header + + ws_url = agent_url.replace("http://", "ws://").replace("https://", "wss://") + ws_url = f"{ws_url}/ws/agent" + + timeout_secs = 120 + collected_text = "" + + async with websockets.connect( + ws_url, + open_timeout=10, + close_timeout=5, + ping_interval=20, + ping_timeout=10, + ) as ws: + await ws.send(json.dumps({ + "event": "subscribe_session", + "data": {"sessionId": session_id}, + })) + + async with httpx.AsyncClient( + timeout=httpx.Timeout(connect=10, read=15, write=10, pool=10), + ) as client: + resp = await client.post( + f"{agent_url}/api/v1/agent/sessions/{session_id}/voice/inject", + json={"message": text}, + headers=headers, + ) + if resp.status_code not in (200, 201): + logger.error("_do_inject_voice failed: %d", resp.status_code) + return "" + + deadline = time.time() + timeout_secs + while time.time() < deadline: + remaining = deadline - time.time() + try: + raw = await asyncio.wait_for(ws.recv(), timeout=min(30.0, remaining)) + except asyncio.TimeoutError: + continue + except websockets.exceptions.ConnectionClosed: + break + + try: + msg = json.loads(raw) + except (json.JSONDecodeError, TypeError): + continue + + if msg.get("event") != "stream_event": + continue + + evt_data = msg.get("data", {}) + evt_type = evt_data.get("type", "") + + if evt_type == "text": + collected_text += evt_data.get("content", "") + elif evt_type in ("completed", "error"): + break + + return collected_text async def _do_inject( self, text: str, attachments: list[dict] | None, ) -> str: - """Execute inject: WS+HTTP stream, collect full response text.""" + """Text-mode inject: POST /tasks and collect streamed response text.""" import time agent_url = self._agent_service_url - auth_header = self._auth_header - headers: dict[str, str] = {"Content-Type": "application/json"} - if auth_header: - headers["Authorization"] = auth_header + if self._auth_header: + headers["Authorization"] = self._auth_header ws_url = agent_url.replace("http://", "ws://").replace("https://", "wss://") ws_url = f"{ws_url}/ws/agent" timeout_secs = 120 engine_type = self._engine_type - voice_mode = engine_type == "claude_agent_sdk" body: dict[str, Any] = { "prompt": text if text else "(see attachments)", "engineType": engine_type, - "voiceMode": voice_mode, + "voiceMode": False, } - - if voice_mode: - body["systemPrompt"] = ( - "你正在通过语音与用户实时对话。请严格遵守以下规则:\n" - "1. 只输出用户关注的最终答案,不要输出工具调用过程、中间步骤或技术细节\n" - "2. 用简洁自然的口语中文回答,像面对面对话一样\n" - "3. 回复要简短精炼,适合语音播报,通常1-3句话即可\n" - "4. 不要使用markdown格式、代码块、列表符号等文本格式" - ) - if self._agent_session_id: body["sessionId"] = self._agent_session_id - if attachments: body["attachments"] = attachments - logger.info( - "inject POST /tasks engine=%s text=%s attachments=%d", - engine_type, - text[:80] if text else "(empty)", - len(attachments) if attachments else 0, - ) - collected_text = "" async with websockets.connect( @@ -157,14 +274,12 @@ class AgentServiceLLM(llm.LLM): ping_interval=20, ping_timeout=10, ) as ws: - # Pre-subscribe if self._agent_session_id: await ws.send(json.dumps({ "event": "subscribe_session", "data": {"sessionId": self._agent_session_id}, })) - # Create task async with httpx.AsyncClient( timeout=httpx.Timeout(connect=10, read=30, write=10, pool=10), ) as client: @@ -173,44 +288,28 @@ class AgentServiceLLM(llm.LLM): json=body, headers=headers, ) + if resp.status_code not in (200, 201): + logger.error("_do_inject task failed: %d", resp.status_code) + return "" - if resp.status_code not in (200, 201): - logger.error( - "inject task creation failed: %d %s", - resp.status_code, resp.text[:200], - ) - return "" + data = resp.json() + session_id = data.get("sessionId", "") + task_id = data.get("taskId", "") + self._agent_session_id = session_id - data = resp.json() - session_id = data.get("sessionId", "") - task_id = data.get("taskId", "") - self._agent_session_id = session_id - logger.info( - "inject task created: session=%s, task=%s", - session_id, task_id, - ) - - # Subscribe with actual IDs await ws.send(json.dumps({ "event": "subscribe_session", "data": {"sessionId": session_id, "taskId": task_id}, })) - # Stream events → collect text deadline = time.time() + timeout_secs - while time.time() < deadline: remaining = deadline - time.time() try: - raw = await asyncio.wait_for( - ws.recv(), timeout=min(30.0, remaining) - ) + raw = await asyncio.wait_for(ws.recv(), timeout=min(30.0, remaining)) except asyncio.TimeoutError: - if time.time() >= deadline: - logger.warning("inject stream timeout after %ds", timeout_secs) continue except websockets.exceptions.ConnectionClosed: - logger.warning("inject WS connection closed") break try: @@ -218,33 +317,54 @@ class AgentServiceLLM(llm.LLM): except (json.JSONDecodeError, TypeError): continue - event_type = msg.get("event", "") + if msg.get("event") != "stream_event": + continue - if event_type == "stream_event": - evt_data = msg.get("data", {}) - evt_type = evt_data.get("type", "") + evt_data = msg.get("data", {}) + evt_type = evt_data.get("type", "") - if evt_type == "text": - content = evt_data.get("content", "") - if content: - collected_text += content - - elif evt_type == "completed": - logger.info( - "inject stream completed, text length=%d", - len(collected_text), - ) - return collected_text - - elif evt_type == "error": - err_msg = evt_data.get("message", "Unknown error") - logger.error("inject error: %s", err_msg) - if "aborted" in err_msg.lower() or "exited" in err_msg.lower(): - self._agent_session_id = None - return collected_text if collected_text else "" + if evt_type == "text": + collected_text += evt_data.get("content", "") + elif evt_type == "completed": + return collected_text + elif evt_type == "error": + break return collected_text + async def terminate_voice_session(self) -> None: + """ + Terminate the long-lived voice agent session. + + Called when the user hangs up (LiveKit room disconnects). + Signals agent-service to abort any running SDK task and clean up. + """ + if not self._voice_session_started or not self._agent_session_id: + return + + session_id = self._agent_session_id + headers: dict[str, str] = {"Content-Type": "application/json"} + if self._auth_header: + headers["Authorization"] = self._auth_header + + try: + async with httpx.AsyncClient( + timeout=httpx.Timeout(connect=5, read=10, write=5, pool=5), + ) as client: + resp = await client.delete( + f"{self._agent_service_url}/api/v1/agent/sessions/{session_id}/voice", + headers=headers, + ) + logger.info( + "Voice session terminated: %s (status=%d)", + session_id, resp.status_code, + ) + except Exception as exc: + logger.error("terminate_voice_session error: %s: %s", type(exc).__name__, exc) + finally: + self._voice_session_started = False + self._agent_session_id = None + class AgentServiceLLMStream(llm.LLMStream): """Streams text from agent-service via WebSocket.""" @@ -265,13 +385,12 @@ class AgentServiceLLMStream(llm.LLMStream): ) self._llm_instance = llm_instance - # Retry configuration + # Retry configuration (used in fallback / non-voice mode only) _MAX_RETRIES = 2 _RETRY_DELAYS = [1.0, 3.0] # seconds between retries async def _run(self) -> None: # Extract the latest user message from ChatContext - # items can contain ChatMessage and AgentConfigUpdate; filter by type user_text = "" for item in reversed(self._chat_ctx.items): if getattr(item, "type", None) != "message": @@ -295,6 +414,32 @@ class AgentServiceLLMStream(llm.LLMStream): return request_id = f"agent-{uuid.uuid4().hex[:12]}" + + # ----------------------------------------------------------------- + # Voice session mode: inject into the long-lived run loop + # ----------------------------------------------------------------- + if self._llm_instance._voice_session_started: + try: + await self._do_stream_voice(user_text, request_id) + except Exception as exc: + logger.error( + "Voice inject stream error: %s: %s", + type(exc).__name__, exc, exc_info=True, + ) + self._event_ch.send_nowait( + llm.ChatChunk( + id=request_id, + delta=llm.ChoiceDelta( + role="assistant", + content="抱歉,语音服务暂时不可用。", + ), + ) + ) + return + + # ----------------------------------------------------------------- + # Fallback / text mode: per-turn POST /tasks with retry + # ----------------------------------------------------------------- last_error: Exception | None = None for attempt in range(self._MAX_RETRIES + 1): @@ -308,7 +453,6 @@ class AgentServiceLLMStream(llm.LLMStream): return # success except (httpx.ConnectError, httpx.ConnectTimeout, OSError) as exc: - # Network-level errors — retryable last_error = exc logger.warning( "Agent stream attempt %d failed (network): %s: %s", @@ -321,7 +465,6 @@ class AgentServiceLLMStream(llm.LLMStream): attempt + 1, getattr(exc, "status_code", "?"), ) except Exception as exc: - # Non-retryable errors — fail immediately logger.error("Agent stream error: %s: %s", type(exc).__name__, exc) self._event_ch.send_nowait( llm.ChatChunk( @@ -334,7 +477,6 @@ class AgentServiceLLMStream(llm.LLMStream): ) return - # All retries exhausted logger.error( "Agent stream failed after %d attempts: %s", self._MAX_RETRIES + 1, last_error, @@ -349,8 +491,150 @@ class AgentServiceLLMStream(llm.LLMStream): ) ) + # ------------------------------------------------------------------------- + # Voice session stream: inject + subscribe + # ------------------------------------------------------------------------- + + async def _do_stream_voice(self, user_text: str, request_id: str) -> None: + """ + Voice session mode: inject the user utterance into the long-lived + agent run loop via POST voice/inject, then subscribe to the WebSocket + to receive the streaming response as ChatChunks. + + Unlike the per-task approach (_do_stream), no new task is created here — + the VoiceSessionManager's run loop handles task creation internally. + """ + import time + + agent_url = self._llm_instance._agent_service_url + auth_header = self._llm_instance._auth_header + session_id = self._llm_instance._agent_session_id + + if not session_id: + logger.error("_do_stream_voice: no session_id — falling back to per-task mode") + await self._do_stream(user_text, request_id) + return + + headers: dict[str, str] = {"Content-Type": "application/json"} + if auth_header: + headers["Authorization"] = auth_header + + ws_url = agent_url.replace("http://", "ws://").replace("https://", "wss://") + ws_url = f"{ws_url}/ws/agent" + + timeout_secs = 120 + + logger.info( + "Voice inject: session=%s text=%s", + session_id, user_text[:80], + ) + + async with websockets.connect( + ws_url, + open_timeout=10, + close_timeout=5, + ping_interval=20, + ping_timeout=10, + ) as ws: + # 1. Pre-subscribe to receive events as soon as the task starts + await ws.send(json.dumps({ + "event": "subscribe_session", + "data": {"sessionId": session_id}, + })) + + # 2. Inject the message (enqueues to the run loop; async return) + async with httpx.AsyncClient( + timeout=httpx.Timeout(connect=10, read=15, write=10, pool=10), + ) as client: + resp = await client.post( + f"{agent_url}/api/v1/agent/sessions/{session_id}/voice/inject", + json={"message": user_text}, + headers=headers, + ) + + if resp.status_code not in (200, 201): + logger.error( + "Voice inject failed: %d %s", + resp.status_code, resp.text[:200], + ) + self._event_ch.send_nowait( + llm.ChatChunk( + id=request_id, + delta=llm.ChoiceDelta( + role="assistant", + content="抱歉,语音指令注入失败。", + ), + ) + ) + return + + # 3. Emit initial role delta (LiveKit convention) + self._event_ch.send_nowait( + llm.ChatChunk( + id=request_id, + delta=llm.ChoiceDelta(role="assistant"), + ) + ) + + # 4. Stream events → ChatChunks until completed/error + deadline = time.time() + timeout_secs + + while time.time() < deadline: + remaining = deadline - time.time() + try: + raw = await asyncio.wait_for( + ws.recv(), timeout=min(30.0, remaining) + ) + except asyncio.TimeoutError: + if time.time() >= deadline: + logger.warning("Voice inject stream timeout after %ds", timeout_secs) + continue + except websockets.exceptions.ConnectionClosed: + logger.warning("Voice inject WS connection closed") + break + + try: + msg = json.loads(raw) + except (json.JSONDecodeError, TypeError): + continue + + if msg.get("event") != "stream_event": + continue + + evt_data = msg.get("data", {}) + evt_type = evt_data.get("type", "") + + if evt_type == "text": + content = evt_data.get("content", "") + if content: + self._event_ch.send_nowait( + llm.ChatChunk( + id=request_id, + delta=llm.ChoiceDelta(content=content), + ) + ) + + elif evt_type == "completed": + logger.info("Voice inject stream completed") + return + + elif evt_type == "error": + err_msg = evt_data.get("message", "Unknown error") + logger.error("Voice inject agent error: %s", err_msg) + self._event_ch.send_nowait( + llm.ChatChunk( + id=request_id, + delta=llm.ChoiceDelta(content=f"Agent 错误: {err_msg}"), + ) + ) + return + + # ------------------------------------------------------------------------- + # Legacy per-task stream (text chat / fallback) + # ------------------------------------------------------------------------- + async def _do_stream(self, user_text: str, request_id: str) -> None: - """Execute a single WS+HTTP streaming attempt.""" + """Execute a single WS+HTTP streaming attempt (text-chat / non-voice mode).""" import time agent_url = self._llm_instance._agent_service_url @@ -382,19 +666,14 @@ class AgentServiceLLMStream(llm.LLMStream): # 2. Create agent task (with timeout) engine_type = self._llm_instance._engine_type - - # Voice mode flag: tell agent-service to filter intermediate events - # (tool_use, tool_result, thinking) — only stream text + completed + error voice_mode = engine_type == "claude_agent_sdk" body: dict[str, Any] = { - "prompt": user_text, # always send clean user text (no wrapping) + "prompt": user_text, "engineType": engine_type, "voiceMode": voice_mode, } - # Agent SDK mode: set systemPrompt once (not per-message) so - # conversation history stays clean — identical to text chat pattern if voice_mode: body["systemPrompt"] = ( "你正在通过语音与用户实时对话。请严格遵守以下规则:\n" @@ -409,9 +688,7 @@ class AgentServiceLLMStream(llm.LLMStream): logger.info( "POST /tasks engine=%s voiceMode=%s user_text=%s", - engine_type, - voice_mode, - user_text[:80], + engine_type, voice_mode, user_text[:80], ) async with httpx.AsyncClient( timeout=httpx.Timeout(connect=10, read=30, write=10, pool=10), @@ -505,15 +782,9 @@ class AgentServiceLLMStream(llm.LLMStream): elif evt_type == "error": err_msg = evt_data.get("message", "Unknown error") logger.error("Agent error: %s", err_msg) - # Clear session so next task starts fresh - # (don't try to resume a dead/aborted session) - # But skip if inject is in progress — it owns the session if "aborted" in err_msg.lower() or "exited" in err_msg.lower(): - if not self._llm_instance._injecting: - logger.info("Clearing agent session after abort/exit") - self._llm_instance._agent_session_id = None - else: - logger.info("Skipping session clear — inject in progress") + logger.info("Clearing agent session after abort/exit") + self._llm_instance._agent_session_id = None self._event_ch.send_nowait( llm.ChatChunk( id=request_id,