feat(voice): long-lived agent session with proper hangup termination

Replace the per-turn POST /tasks approach for voice calls with a
long-lived agent run loop tied to the call lifecycle:

agent-service:
- Add AsyncQueue<T> utility for blocking message relay
- Add VoiceSessionManager: spawns one background run loop per voice call,
  accepts injected messages, terminates cleanly on hangup
- Add VoiceSessionController with 3 endpoints:
    POST   /api/v1/agent/sessions/voice/start  (call start)
    POST   /api/v1/agent/sessions/:id/voice/inject  (each speech turn)
    DELETE /api/v1/agent/sessions/:id/voice    (user hung up)
- Register VoiceSessionManager + VoiceSessionController in agent.module.ts

voice-agent:
- AgentServiceLLM: add start_voice_session(), terminate_voice_session(),
  inject_text_message() (voice/inject-aware), _do_inject_voice()
- AgentServiceLLMStream._run(): use voice/inject path when voice session
  is active; fall back to per-task POST for text-chat / non-SDK engines
- entrypoint(): call start_voice_session() after session.start();
  register _on_room_disconnect that calls terminate_voice_session()
  so the agent is always killed when the user hangs up

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-03-04 04:01:02 -08:00
parent 6ca8aab243
commit 635cca18fa
6 changed files with 938 additions and 117 deletions

View File

@ -43,7 +43,9 @@ import { VoiceConfigRepository } from './infrastructure/repositories/voice-confi
import { UsageRecordRepository } from './infrastructure/repositories/usage-record.repository';
import { VoiceConfigService } from './infrastructure/services/voice-config.service';
import { VoiceConfigController } from './interfaces/rest/controllers/voice-config.controller';
import { VoiceSessionController } from './interfaces/rest/controllers/voice-session.controller';
import { ConversationContextService } from './domain/services/conversation-context.service';
import { VoiceSessionManager } from './domain/services/voice-session-manager.service';
import { EventPublisherService } from './infrastructure/messaging/event-publisher.service';
@Module({
@ -58,7 +60,8 @@ import { EventPublisherService } from './infrastructure/messaging/event-publishe
],
controllers: [
AgentController, SessionController, RiskRulesController,
TenantAgentConfigController, AgentConfigController, VoiceConfigController, SkillsController, HooksController,
TenantAgentConfigController, AgentConfigController, VoiceConfigController,
VoiceSessionController, SkillsController, HooksController,
],
providers: [
AgentStreamGateway,
@ -72,6 +75,7 @@ import { EventPublisherService } from './infrastructure/messaging/event-publishe
StandingOrderExtractorService,
AllowedToolsResolverService,
ConversationContextService,
VoiceSessionManager,
SessionRepository,
TaskRepository,
MessageRepository,

View File

@ -0,0 +1,333 @@
/**
* VoiceSessionManager
*
* Manages long-lived agent run loops for voice calls.
*
* Lifecycle:
* startSession(sessionId) spawn background run loop, ready for messages
* injectMessage(sessionId) enqueue speech turn; loop processes sequentially
* terminateSession(sessionId) send poison-pill + abort; loop exits cleanly
*
* Run-loop design (per voice session):
* while not terminated:
* message queue.dequeue() blocks between speech turns
* executeTask(message, resume) one SDK turn, streams events via gateway
* capture new sdkSessionId for next turn's native resume
*
* This replaces the per-turn "POST /tasks" model used by text chat.
* The SDK session is kept alive across turns via the `resume` option,
* and the run loop is explicitly terminated when the user hangs up.
*/
import { Injectable, Logger } from '@nestjs/common';
import { AsyncQueue } from '../../infrastructure/voice/async-queue';
import { EngineRegistry } from '../../infrastructure/engines/engine-registry';
import { ClaudeAgentSdkEngine } from '../../infrastructure/engines/claude-agent-sdk/claude-agent-sdk-engine';
import { AgentStreamGateway } from '../../interfaces/ws/agent-stream.gateway';
import { SessionRepository } from '../../infrastructure/repositories/session.repository';
import { TaskRepository } from '../../infrastructure/repositories/task.repository';
import { ConversationContextService } from './conversation-context.service';
import { AgentEngineType } from '../value-objects/agent-engine-type.vo';
import { TaskStatus } from '../value-objects/task-status.vo';
import { AgentTask } from '../entities/agent-task.entity';
import { TenantContextService } from '@it0/common';
import * as crypto from 'crypto';
/** Sentinel value enqueued to signal the run loop to exit. */
const TERMINATE: null = null;
/** In-memory handle for a running voice session. */
interface VoiceSessionHandle {
/** Message queue: string = user speech turn; null = terminate signal. */
queue: AsyncQueue<string | null>;
/** Allows aborting the currently-running SDK executeTask call. */
abortController: AbortController;
/** Tenant who owns this voice session. */
tenantId: string;
/** Background run-loop promise (resolved when loop exits). */
runLoop: Promise<void>;
}
@Injectable()
export class VoiceSessionManager {
private readonly logger = new Logger(VoiceSessionManager.name);
/** Map from agentSessionId → handle. */
private readonly sessions = new Map<string, VoiceSessionHandle>();
constructor(
private readonly engineRegistry: EngineRegistry,
private readonly gateway: AgentStreamGateway,
private readonly sessionRepository: SessionRepository,
private readonly taskRepository: TaskRepository,
private readonly contextService: ConversationContextService,
) {}
// ---------------------------------------------------------------------------
// Public API
// ---------------------------------------------------------------------------
/**
* Start a long-lived voice agent session.
* Safe to call multiple times for the same sessionId (idempotent).
*/
async startSession(sessionId: string, tenantId: string): Promise<void> {
if (this.sessions.has(sessionId)) {
this.logger.warn(`[VoiceSession ${sessionId}] Already active — ignoring duplicate start`);
return;
}
const queue = new AsyncQueue<string | null>();
const abortController = new AbortController();
const handle: VoiceSessionHandle = {
queue,
abortController,
tenantId,
runLoop: this.runLoop(sessionId, tenantId, queue, abortController),
};
this.sessions.set(sessionId, handle);
this.logger.log(`[VoiceSession ${sessionId}] Started for tenant=${tenantId}`);
}
/**
* Inject a new speech-turn message into the running voice session.
* Returns true if injected, false if session not found.
*/
async injectMessage(sessionId: string, message: string): Promise<boolean> {
const handle = this.sessions.get(sessionId);
if (!handle) return false;
handle.queue.enqueue(message);
this.logger.log(`[VoiceSession ${sessionId}] Injected: "${message.slice(0, 80)}"`);
return true;
}
/**
* Terminate the voice session (user hung up or explicitly ended).
* Enqueues the poison-pill, aborts any in-flight SDK call,
* and waits for the run loop to exit.
*/
async terminateSession(sessionId: string): Promise<void> {
const handle = this.sessions.get(sessionId);
if (!handle) {
this.logger.debug(`[VoiceSession ${sessionId}] Already gone — ignoring terminate`);
return;
}
this.logger.log(`[VoiceSession ${sessionId}] Terminating…`);
// Abort any in-flight executeTask call
handle.abortController.abort();
// Poison-pill: unblock the queue.dequeue() if loop is idle between turns
handle.queue.drain(TERMINATE);
handle.queue.enqueue(TERMINATE);
// Wait for loop to exit (max 5 s to avoid hanging the HTTP request)
await Promise.race([
handle.runLoop.catch(() => {}),
new Promise<void>((r) => setTimeout(r, 5_000)),
]);
this.sessions.delete(sessionId);
this.logger.log(`[VoiceSession ${sessionId}] Terminated`);
}
/** Returns true if a run loop is active for this session. */
isActive(sessionId: string): boolean {
return this.sessions.has(sessionId);
}
// ---------------------------------------------------------------------------
// Private: run loop
// ---------------------------------------------------------------------------
/**
* Background run loop one instance per voice session.
* Blocks on queue.dequeue() between speech turns,
* then executes one SDK turn and streams events to the gateway.
*/
private async runLoop(
sessionId: string,
tenantId: string,
queue: AsyncQueue<string | null>,
abortController: AbortController,
): Promise<void> {
// Always get the SDK engine for voice
const engine = this.engineRegistry.switchEngine(AgentEngineType.CLAUDE_AGENT_SDK) as ClaudeAgentSdkEngine;
try {
while (!abortController.signal.aborted) {
// Block until next speech turn (or terminate signal)
const message = await queue.dequeue();
if (message === null) break;
// Execute one turn within the tenant's AsyncLocalStorage context
await TenantContextService.run(
{
tenantId,
tenantName: tenantId,
plan: 'enterprise',
schemaName: `it0_t_${tenantId}`,
maxServers: -1,
maxUsers: -1,
maxStandingOrders: -1,
maxAgentTokensPerMonth: -1,
},
async () => {
await this.executeTurn(engine, sessionId, tenantId, message, abortController);
},
);
}
} catch (err: any) {
if (err?.name !== 'AbortError') {
this.logger.error(`[VoiceSession ${sessionId}] Run loop error: ${err?.message}`);
}
} finally {
this.sessions.delete(sessionId);
this.logger.log(`[VoiceSession ${sessionId}] Run loop exited`);
}
}
/**
* Execute a single speech turn:
* 1. Create a task record
* 2. Save user message to conversation history
* 3. Load context + SDK resume session ID
* 4. Run engine.executeTask() and stream events to gateway
* 5. Save assistant response + capture new SDK session ID for next resume
*/
private async executeTurn(
engine: ClaudeAgentSdkEngine,
sessionId: string,
tenantId: string,
message: string,
abortController: AbortController,
): Promise<void> {
const session = await this.sessionRepository.findById(sessionId);
if (!session) {
this.logger.error(`[VoiceSession ${sessionId}] Session not found in DB — cannot execute turn`);
return;
}
// Create task record for tracking
const task = new AgentTask();
task.id = crypto.randomUUID();
task.tenantId = tenantId;
task.sessionId = sessionId;
task.prompt = message;
task.status = TaskStatus.RUNNING;
task.startedAt = new Date();
task.createdAt = new Date();
await this.taskRepository.save(task);
// Notify WS subscribers about the new task so they can track it
this.gateway.emitStreamEvent(sessionId, { type: 'task_info', taskId: task.id });
// Save user message to conversation history
await this.contextService.saveUserMessage(sessionId, message);
// Load context (max 20 messages) — strip the latest user message since
// the engine adds it as the explicit prompt parameter
const history = await this.contextService.loadContext(sessionId, 20);
const historyForEngine = history.slice(0, -1);
// SDK resume: continue from previous turn if available
const resumeSessionId = (session.metadata as any)?.sdkSessionId as string | undefined;
if (resumeSessionId) {
this.logger.log(`[VoiceSession ${sessionId}] Resuming SDK session: ${resumeSessionId}`);
}
// Voice-mode system prompt: concise oral Chinese, no markdown / tool details
const voiceSystemPrompt =
'你正在通过语音与用户实时对话。请严格遵守以下规则:\n' +
'1. 只输出用户关注的最终答案,不要输出工具调用过程、中间步骤或技术细节\n' +
'2. 用简洁自然的口语中文回答,像面对面对话一样\n' +
'3. 回复要简短精炼适合语音播报通常1-3句话即可\n' +
'4. 不要使用markdown格式、代码块、列表符号等文本格式';
// Events to suppress in voice mode (only text/completed/error reach TTS)
const voiceFilteredTypes = new Set(['thinking', 'tool_use', 'tool_result']);
const textParts: string[] = [];
let finished = false;
try {
const stream = engine.executeTask({
sessionId,
prompt: message,
systemPrompt: voiceSystemPrompt,
allowedTools: [],
maxTurns: 10,
conversationHistory: historyForEngine.length > 0 ? historyForEngine : undefined,
resumeSessionId,
});
for await (const event of stream) {
// Exit early if the voice session was terminated mid-turn
if (abortController.signal.aborted) break;
if (!voiceFilteredTypes.has(event.type)) {
this.gateway.emitStreamEvent(sessionId, event);
}
if (event.type === 'text') {
textParts.push(event.content);
}
if (event.type === 'completed' && !finished) {
finished = true;
task.status = TaskStatus.COMPLETED;
task.result = event.summary;
task.tokensUsed = event.tokensUsed;
task.completedAt = new Date();
await this.taskRepository.save(task);
// Persist assistant response to conversation history
const assistantText = textParts.join('') || event.summary;
if (assistantText) {
await this.contextService.saveAssistantMessage(sessionId, assistantText);
}
// Capture the new SDK session ID so the next turn can resume from here
const sdkSessionId = engine.getSdkSessionId(sessionId);
session.metadata = {
...session.metadata as Record<string, unknown>,
sdkSessionId: sdkSessionId ?? (session.metadata as any)?.sdkSessionId,
};
session.status = 'active';
session.updatedAt = new Date();
await this.sessionRepository.save(session);
}
if (event.type === 'error' && !finished) {
finished = true;
task.status = TaskStatus.FAILED;
task.result = event.message;
task.completedAt = new Date();
await this.taskRepository.save(task);
}
}
} catch (err: any) {
if (!finished && err?.name !== 'AbortError') {
task.status = TaskStatus.FAILED;
task.result = err?.message ?? 'Voice session turn error';
task.completedAt = new Date();
await this.taskRepository.save(task);
this.gateway.emitStreamEvent(sessionId, {
type: 'error',
message: task.result ?? 'Voice session turn error',
code: 'VOICE_TURN_ERROR',
});
}
} finally {
// If aborted mid-turn, save any partial text accumulated before the abort
if (!finished && textParts.length > 0) {
await this.contextService
.saveAssistantMessage(sessionId, textParts.join('') + '\n[中断]')
.catch(() => {});
}
}
}
}

View File

@ -0,0 +1,39 @@
/**
* AsyncQueue simple async message queue with blocking dequeue.
*
* Used by VoiceSessionManager to relay speech-turn messages to the
* long-lived agent run loop. The run loop calls dequeue() which
* suspends until the next message is enqueued (or a sentinel null
* is sent to signal termination).
*/
export class AsyncQueue<T> {
private readonly items: T[] = [];
private readonly waiters: Array<(item: T) => void> = [];
enqueue(item: T): void {
const waiter = this.waiters.shift();
if (waiter) {
waiter(item);
} else {
this.items.push(item);
}
}
dequeue(): Promise<T> {
return new Promise<T>((resolve) => {
if (this.items.length > 0) {
resolve(this.items.shift()!);
} else {
this.waiters.push(resolve);
}
});
}
/** Resolve all pending dequeue waiters with the given sentinel value (e.g. on shutdown). */
drain(sentinel: T): void {
for (const waiter of this.waiters) {
waiter(sentinel);
}
this.waiters.length = 0;
}
}

View File

@ -0,0 +1,148 @@
/**
* VoiceSessionController
*
* HTTP endpoints for the long-lived voice agent session lifecycle:
*
* POST /api/v1/agent/sessions/voice/start called when voice call starts
* POST /api/v1/agent/sessions/:sessionId/voice/inject called per speech turn
* DELETE /api/v1/agent/sessions/:sessionId/voice called when user hangs up
*
* These endpoints replace the per-turn "POST /api/v1/agent/tasks" flow used
* by text chat. The voice-agent calls voice/start once, injects messages as
* the user speaks, and calls voice/terminate on room disconnect.
*/
import {
Controller, Post, Delete, Param, Body,
NotFoundException, BadRequestException, Logger,
} from '@nestjs/common';
import { TenantId } from '@it0/common';
import { VoiceSessionManager } from '../../../domain/services/voice-session-manager.service';
import { SessionRepository } from '../../../infrastructure/repositories/session.repository';
import { AgentEngineType } from '../../../domain/value-objects/agent-engine-type.vo';
import { AgentSession } from '../../../domain/entities/agent-session.entity';
import * as crypto from 'crypto';
@Controller('api/v1/agent/sessions')
export class VoiceSessionController {
private readonly logger = new Logger(VoiceSessionController.name);
constructor(
private readonly voiceSessionManager: VoiceSessionManager,
private readonly sessionRepository: SessionRepository,
) {}
/**
* Start a long-lived voice agent session.
*
* Called ONCE when the user opens a voice call.
* Creates (or reuses) an AgentSession and starts the background run loop.
* Idempotent: safe to call multiple times for the same sessionId.
*/
@Post('voice/start')
async startVoiceSession(
@TenantId() tenantId: string,
@Body() body: { sessionId?: string; systemPrompt?: string },
) {
let session: AgentSession | null = null;
if (body.sessionId) {
session = await this.sessionRepository.findById(body.sessionId);
// Reject cross-tenant access
if (session && session.tenantId !== tenantId) {
session = null;
}
}
if (!session) {
// Create a fresh session pre-marked as voice mode
session = new AgentSession();
session.id = crypto.randomUUID();
session.tenantId = tenantId;
session.engineType = AgentEngineType.CLAUDE_AGENT_SDK;
session.status = 'active';
session.systemPrompt = body.systemPrompt;
session.metadata = { voiceMode: true, title: '', titleSet: true };
session.createdAt = new Date();
session.updatedAt = new Date();
} else {
// Reuse existing session; mark as voice mode
session.metadata = {
...session.metadata as Record<string, unknown>,
voiceMode: true,
};
session.status = 'active';
session.updatedAt = new Date();
}
await this.sessionRepository.save(session);
// Start (or no-op if already running) the long-lived run loop
await this.voiceSessionManager.startSession(session.id, tenantId);
this.logger.log(`Voice session started: ${session.id} tenant=${tenantId}`);
return { sessionId: session.id };
}
/**
* Inject a new speech-turn message into the running voice session.
*
* Called each time STT produces a user utterance.
* The background run loop picks up the message and executes one SDK turn,
* streaming the response back via the WebSocket gateway.
*/
@Post(':sessionId/voice/inject')
async injectVoiceMessage(
@TenantId() tenantId: string,
@Param('sessionId') sessionId: string,
@Body() body: { message: string },
) {
if (!body.message?.trim()) {
throw new BadRequestException('message is required');
}
// Verify session ownership
const session = await this.sessionRepository.findById(sessionId);
if (!session || session.tenantId !== tenantId) {
throw new NotFoundException(`Voice session ${sessionId} not found`);
}
// Auto-start the run loop if the voice-agent reconnected (e.g. container restart)
if (!this.voiceSessionManager.isActive(sessionId)) {
this.logger.warn(`[VoiceSession ${sessionId}] Run loop not active — auto-starting`);
await this.voiceSessionManager.startSession(sessionId, tenantId);
}
const injected = await this.voiceSessionManager.injectMessage(sessionId, body.message);
if (!injected) {
throw new NotFoundException(`Voice session ${sessionId} is not active`);
}
return { sessionId, injected: true };
}
/**
* Terminate the voice session (user hung up).
*
* Sends a poison-pill to the run loop, aborts any in-flight SDK call,
* and marks the AgentSession as completed.
* Idempotent: safe to call even if the session is already gone.
*/
@Delete(':sessionId/voice')
async terminateVoiceSession(
@TenantId() tenantId: string,
@Param('sessionId') sessionId: string,
) {
const session = await this.sessionRepository.findById(sessionId);
if (session && session.tenantId === tenantId) {
await this.voiceSessionManager.terminateSession(sessionId);
session.status = 'completed';
session.updatedAt = new Date();
await this.sessionRepository.save(session);
}
this.logger.log(`Voice session terminated: ${sessionId}`);
return { sessionId, terminated: true };
}
}

View File

@ -200,18 +200,15 @@ async def entrypoint(ctx: JobContext) -> None:
# httpx clients to close when the room disconnects
_http_clients: list = []
async def _on_room_disconnect() -> None:
"""Clean up httpx clients when the room disconnects."""
for client in _http_clients:
try:
await client.aclose()
except Exception:
pass
logger.info("Cleaned up %d httpx client(s) for room %s",
len(_http_clients), ctx.room.name)
# _on_room_disconnect is defined AFTER llm is built (it calls terminate_voice_session).
# We register it via a mutable reference cell so the event listener can find it
# even before the function is defined.
_cleanup_ref: list = []
# Register cleanup before anything else so it fires even on errors
ctx.room.on("disconnected", lambda *_: asyncio.ensure_future(_on_room_disconnect()))
ctx.room.on(
"disconnected",
lambda *_: asyncio.ensure_future(_cleanup_ref[0]()) if _cleanup_ref else None,
)
try:
# Extract auth header from job metadata
@ -331,6 +328,35 @@ async def entrypoint(ctx: JobContext) -> None:
room_output_options=room_io.RoomOutputOptions(),
)
# ── Voice session lifecycle ───────────────────────────────────────────
# For Agent SDK engine: start the long-lived voice session in agent-service.
# This spawns a persistent run loop that accepts injected messages for the
# duration of this call, replacing the per-turn POST /tasks approach.
if engine_type == "claude_agent_sdk":
started_session_id = await llm.start_voice_session()
if started_session_id:
logger.info("Long-lived voice session ready: %s", started_session_id)
else:
logger.warning("start_voice_session failed — falling back to per-task mode")
# Register the disconnect handler now that llm is ready.
# This is the ONLY disconnect handler; it terminates the voice session
# (signals the agent to stop) AND closes any httpx clients.
async def _on_room_disconnect() -> None:
logger.info("Room disconnected: %s — terminating voice session", ctx.room.name)
# 1. Terminate the long-lived agent run loop (user hung up)
await llm.terminate_voice_session()
# 2. Close httpx clients
for client in _http_clients:
try:
await client.aclose()
except Exception:
pass
logger.info("Cleaned up %d httpx client(s) for room %s",
len(_http_clients), ctx.room.name)
_cleanup_ref.append(_on_room_disconnect)
# --- Thinking state audio feedback ---
# BackgroundAudioPlayer listens for AgentStateChangedEvent from the
# session. When state transitions to "thinking" (STT done, waiting for

View File

@ -6,8 +6,22 @@ Instead of calling Claude directly, this plugin:
2. Subscribes to the agent-service WebSocket /ws/agent for streaming text events
3. Emits ChatChunk objects into the LiveKit pipeline
In Agent SDK mode, the prompt is wrapped with voice-conversation instructions
so the agent outputs concise spoken Chinese without tool-call details.
Voice Session Mode (long-lived agent)
--------------------------------------
When engine_type == "claude_agent_sdk", the plugin uses a long-lived voice session:
Voice call starts start_voice_session()
POST /api/v1/agent/sessions/voice/start
agent-service spawns a persistent run loop for this session
User speaks (each turn) AgentServiceLLMStream._run()
POST /api/v1/agent/sessions/{sessionId}/voice/inject
run loop receives the message and executes one SDK turn
streams events back via WebSocket ChatChunks TTS
User hangs up terminate_voice_session()
DELETE /api/v1/agent/sessions/{sessionId}/voice
agent-service kills the run loop and marks session completed
This preserves all agent-service capabilities: Tool Use, conversation history,
tenant isolation, and session management.
@ -48,7 +62,10 @@ class AgentServiceLLM(llm.LLM):
self._auth_header = auth_header
self._engine_type = engine_type
self._agent_session_id: str | None = None
self._injecting = False # guard: don't clear session during inject
# Voice session mode: long-lived agent process tied to the call duration.
# True once start_voice_session() completes successfully.
self._voice_session_started: bool = False
@property
def model(self) -> str:
@ -75,79 +92,179 @@ class AgentServiceLLM(llm.LLM):
conn_options=conn_options,
)
# -------------------------------------------------------------------------
# Voice session lifecycle
# -------------------------------------------------------------------------
async def start_voice_session(self, session_id: str | None = None) -> str:
"""
Start a long-lived voice agent session in agent-service.
Called ONCE when the LiveKit room is connected. The agent-service
spawns a background run loop that stays alive for the entire call,
accepting injected messages via voice/inject.
Returns the session ID to use for all subsequent inject calls.
"""
headers: dict[str, str] = {"Content-Type": "application/json"}
if self._auth_header:
headers["Authorization"] = self._auth_header
body: dict[str, Any] = {}
if session_id:
body["sessionId"] = session_id
try:
async with httpx.AsyncClient(
timeout=httpx.Timeout(connect=10, read=15, write=10, pool=10),
) as client:
resp = await client.post(
f"{self._agent_service_url}/api/v1/agent/sessions/voice/start",
json=body,
headers=headers,
)
if resp.status_code not in (200, 201):
logger.error(
"start_voice_session failed: %d %s",
resp.status_code, resp.text[:200],
)
return ""
data = resp.json()
new_session_id: str = data.get("sessionId", "")
self._agent_session_id = new_session_id
self._voice_session_started = True
logger.info("Voice session started: %s", new_session_id)
return new_session_id
except Exception as exc:
logger.error("start_voice_session error: %s: %s", type(exc).__name__, exc)
return ""
async def inject_text_message(
self,
*,
text: str = "",
attachments: list[dict] | None = None,
) -> str:
"""Inject a text message (with optional attachments) into the agent session.
"""
Inject a text message (sent via data channel, not speech) into the agent.
In voice session mode, uses voice/inject and collects the streamed response.
In text mode, uses POST /tasks directly.
Returns the complete response text for TTS playback via session.say().
Uses the same session ID so conversation context is preserved.
"""
if not text and not attachments:
return ""
self._injecting = True
try:
return await self._do_inject(text, attachments)
if self._voice_session_started and self._agent_session_id:
return await self._do_inject_voice(text)
else:
return await self._do_inject(text, attachments)
except Exception as exc:
logger.error("inject_text_message error: %s: %s", type(exc).__name__, exc)
return ""
finally:
self._injecting = False
async def _do_inject_voice(self, text: str) -> str:
"""Inject via voice/inject endpoint and collect response text from WS."""
import time
agent_url = self._agent_service_url
session_id = self._agent_session_id
headers: dict[str, str] = {"Content-Type": "application/json"}
if self._auth_header:
headers["Authorization"] = self._auth_header
ws_url = agent_url.replace("http://", "ws://").replace("https://", "wss://")
ws_url = f"{ws_url}/ws/agent"
timeout_secs = 120
collected_text = ""
async with websockets.connect(
ws_url,
open_timeout=10,
close_timeout=5,
ping_interval=20,
ping_timeout=10,
) as ws:
await ws.send(json.dumps({
"event": "subscribe_session",
"data": {"sessionId": session_id},
}))
async with httpx.AsyncClient(
timeout=httpx.Timeout(connect=10, read=15, write=10, pool=10),
) as client:
resp = await client.post(
f"{agent_url}/api/v1/agent/sessions/{session_id}/voice/inject",
json={"message": text},
headers=headers,
)
if resp.status_code not in (200, 201):
logger.error("_do_inject_voice failed: %d", resp.status_code)
return ""
deadline = time.time() + timeout_secs
while time.time() < deadline:
remaining = deadline - time.time()
try:
raw = await asyncio.wait_for(ws.recv(), timeout=min(30.0, remaining))
except asyncio.TimeoutError:
continue
except websockets.exceptions.ConnectionClosed:
break
try:
msg = json.loads(raw)
except (json.JSONDecodeError, TypeError):
continue
if msg.get("event") != "stream_event":
continue
evt_data = msg.get("data", {})
evt_type = evt_data.get("type", "")
if evt_type == "text":
collected_text += evt_data.get("content", "")
elif evt_type in ("completed", "error"):
break
return collected_text
async def _do_inject(
self,
text: str,
attachments: list[dict] | None,
) -> str:
"""Execute inject: WS+HTTP stream, collect full response text."""
"""Text-mode inject: POST /tasks and collect streamed response text."""
import time
agent_url = self._agent_service_url
auth_header = self._auth_header
headers: dict[str, str] = {"Content-Type": "application/json"}
if auth_header:
headers["Authorization"] = auth_header
if self._auth_header:
headers["Authorization"] = self._auth_header
ws_url = agent_url.replace("http://", "ws://").replace("https://", "wss://")
ws_url = f"{ws_url}/ws/agent"
timeout_secs = 120
engine_type = self._engine_type
voice_mode = engine_type == "claude_agent_sdk"
body: dict[str, Any] = {
"prompt": text if text else "(see attachments)",
"engineType": engine_type,
"voiceMode": voice_mode,
"voiceMode": False,
}
if voice_mode:
body["systemPrompt"] = (
"你正在通过语音与用户实时对话。请严格遵守以下规则:\n"
"1. 只输出用户关注的最终答案,不要输出工具调用过程、中间步骤或技术细节\n"
"2. 用简洁自然的口语中文回答,像面对面对话一样\n"
"3. 回复要简短精炼适合语音播报通常1-3句话即可\n"
"4. 不要使用markdown格式、代码块、列表符号等文本格式"
)
if self._agent_session_id:
body["sessionId"] = self._agent_session_id
if attachments:
body["attachments"] = attachments
logger.info(
"inject POST /tasks engine=%s text=%s attachments=%d",
engine_type,
text[:80] if text else "(empty)",
len(attachments) if attachments else 0,
)
collected_text = ""
async with websockets.connect(
@ -157,14 +274,12 @@ class AgentServiceLLM(llm.LLM):
ping_interval=20,
ping_timeout=10,
) as ws:
# Pre-subscribe
if self._agent_session_id:
await ws.send(json.dumps({
"event": "subscribe_session",
"data": {"sessionId": self._agent_session_id},
}))
# Create task
async with httpx.AsyncClient(
timeout=httpx.Timeout(connect=10, read=30, write=10, pool=10),
) as client:
@ -173,44 +288,28 @@ class AgentServiceLLM(llm.LLM):
json=body,
headers=headers,
)
if resp.status_code not in (200, 201):
logger.error("_do_inject task failed: %d", resp.status_code)
return ""
if resp.status_code not in (200, 201):
logger.error(
"inject task creation failed: %d %s",
resp.status_code, resp.text[:200],
)
return ""
data = resp.json()
session_id = data.get("sessionId", "")
task_id = data.get("taskId", "")
self._agent_session_id = session_id
data = resp.json()
session_id = data.get("sessionId", "")
task_id = data.get("taskId", "")
self._agent_session_id = session_id
logger.info(
"inject task created: session=%s, task=%s",
session_id, task_id,
)
# Subscribe with actual IDs
await ws.send(json.dumps({
"event": "subscribe_session",
"data": {"sessionId": session_id, "taskId": task_id},
}))
# Stream events → collect text
deadline = time.time() + timeout_secs
while time.time() < deadline:
remaining = deadline - time.time()
try:
raw = await asyncio.wait_for(
ws.recv(), timeout=min(30.0, remaining)
)
raw = await asyncio.wait_for(ws.recv(), timeout=min(30.0, remaining))
except asyncio.TimeoutError:
if time.time() >= deadline:
logger.warning("inject stream timeout after %ds", timeout_secs)
continue
except websockets.exceptions.ConnectionClosed:
logger.warning("inject WS connection closed")
break
try:
@ -218,33 +317,54 @@ class AgentServiceLLM(llm.LLM):
except (json.JSONDecodeError, TypeError):
continue
event_type = msg.get("event", "")
if msg.get("event") != "stream_event":
continue
if event_type == "stream_event":
evt_data = msg.get("data", {})
evt_type = evt_data.get("type", "")
evt_data = msg.get("data", {})
evt_type = evt_data.get("type", "")
if evt_type == "text":
content = evt_data.get("content", "")
if content:
collected_text += content
elif evt_type == "completed":
logger.info(
"inject stream completed, text length=%d",
len(collected_text),
)
return collected_text
elif evt_type == "error":
err_msg = evt_data.get("message", "Unknown error")
logger.error("inject error: %s", err_msg)
if "aborted" in err_msg.lower() or "exited" in err_msg.lower():
self._agent_session_id = None
return collected_text if collected_text else ""
if evt_type == "text":
collected_text += evt_data.get("content", "")
elif evt_type == "completed":
return collected_text
elif evt_type == "error":
break
return collected_text
async def terminate_voice_session(self) -> None:
"""
Terminate the long-lived voice agent session.
Called when the user hangs up (LiveKit room disconnects).
Signals agent-service to abort any running SDK task and clean up.
"""
if not self._voice_session_started or not self._agent_session_id:
return
session_id = self._agent_session_id
headers: dict[str, str] = {"Content-Type": "application/json"}
if self._auth_header:
headers["Authorization"] = self._auth_header
try:
async with httpx.AsyncClient(
timeout=httpx.Timeout(connect=5, read=10, write=5, pool=5),
) as client:
resp = await client.delete(
f"{self._agent_service_url}/api/v1/agent/sessions/{session_id}/voice",
headers=headers,
)
logger.info(
"Voice session terminated: %s (status=%d)",
session_id, resp.status_code,
)
except Exception as exc:
logger.error("terminate_voice_session error: %s: %s", type(exc).__name__, exc)
finally:
self._voice_session_started = False
self._agent_session_id = None
class AgentServiceLLMStream(llm.LLMStream):
"""Streams text from agent-service via WebSocket."""
@ -265,13 +385,12 @@ class AgentServiceLLMStream(llm.LLMStream):
)
self._llm_instance = llm_instance
# Retry configuration
# Retry configuration (used in fallback / non-voice mode only)
_MAX_RETRIES = 2
_RETRY_DELAYS = [1.0, 3.0] # seconds between retries
async def _run(self) -> None:
# Extract the latest user message from ChatContext
# items can contain ChatMessage and AgentConfigUpdate; filter by type
user_text = ""
for item in reversed(self._chat_ctx.items):
if getattr(item, "type", None) != "message":
@ -295,6 +414,32 @@ class AgentServiceLLMStream(llm.LLMStream):
return
request_id = f"agent-{uuid.uuid4().hex[:12]}"
# -----------------------------------------------------------------
# Voice session mode: inject into the long-lived run loop
# -----------------------------------------------------------------
if self._llm_instance._voice_session_started:
try:
await self._do_stream_voice(user_text, request_id)
except Exception as exc:
logger.error(
"Voice inject stream error: %s: %s",
type(exc).__name__, exc, exc_info=True,
)
self._event_ch.send_nowait(
llm.ChatChunk(
id=request_id,
delta=llm.ChoiceDelta(
role="assistant",
content="抱歉,语音服务暂时不可用。",
),
)
)
return
# -----------------------------------------------------------------
# Fallback / text mode: per-turn POST /tasks with retry
# -----------------------------------------------------------------
last_error: Exception | None = None
for attempt in range(self._MAX_RETRIES + 1):
@ -308,7 +453,6 @@ class AgentServiceLLMStream(llm.LLMStream):
return # success
except (httpx.ConnectError, httpx.ConnectTimeout, OSError) as exc:
# Network-level errors — retryable
last_error = exc
logger.warning(
"Agent stream attempt %d failed (network): %s: %s",
@ -321,7 +465,6 @@ class AgentServiceLLMStream(llm.LLMStream):
attempt + 1, getattr(exc, "status_code", "?"),
)
except Exception as exc:
# Non-retryable errors — fail immediately
logger.error("Agent stream error: %s: %s", type(exc).__name__, exc)
self._event_ch.send_nowait(
llm.ChatChunk(
@ -334,7 +477,6 @@ class AgentServiceLLMStream(llm.LLMStream):
)
return
# All retries exhausted
logger.error(
"Agent stream failed after %d attempts: %s",
self._MAX_RETRIES + 1, last_error,
@ -349,8 +491,150 @@ class AgentServiceLLMStream(llm.LLMStream):
)
)
# -------------------------------------------------------------------------
# Voice session stream: inject + subscribe
# -------------------------------------------------------------------------
async def _do_stream_voice(self, user_text: str, request_id: str) -> None:
"""
Voice session mode: inject the user utterance into the long-lived
agent run loop via POST voice/inject, then subscribe to the WebSocket
to receive the streaming response as ChatChunks.
Unlike the per-task approach (_do_stream), no new task is created here
the VoiceSessionManager's run loop handles task creation internally.
"""
import time
agent_url = self._llm_instance._agent_service_url
auth_header = self._llm_instance._auth_header
session_id = self._llm_instance._agent_session_id
if not session_id:
logger.error("_do_stream_voice: no session_id — falling back to per-task mode")
await self._do_stream(user_text, request_id)
return
headers: dict[str, str] = {"Content-Type": "application/json"}
if auth_header:
headers["Authorization"] = auth_header
ws_url = agent_url.replace("http://", "ws://").replace("https://", "wss://")
ws_url = f"{ws_url}/ws/agent"
timeout_secs = 120
logger.info(
"Voice inject: session=%s text=%s",
session_id, user_text[:80],
)
async with websockets.connect(
ws_url,
open_timeout=10,
close_timeout=5,
ping_interval=20,
ping_timeout=10,
) as ws:
# 1. Pre-subscribe to receive events as soon as the task starts
await ws.send(json.dumps({
"event": "subscribe_session",
"data": {"sessionId": session_id},
}))
# 2. Inject the message (enqueues to the run loop; async return)
async with httpx.AsyncClient(
timeout=httpx.Timeout(connect=10, read=15, write=10, pool=10),
) as client:
resp = await client.post(
f"{agent_url}/api/v1/agent/sessions/{session_id}/voice/inject",
json={"message": user_text},
headers=headers,
)
if resp.status_code not in (200, 201):
logger.error(
"Voice inject failed: %d %s",
resp.status_code, resp.text[:200],
)
self._event_ch.send_nowait(
llm.ChatChunk(
id=request_id,
delta=llm.ChoiceDelta(
role="assistant",
content="抱歉,语音指令注入失败。",
),
)
)
return
# 3. Emit initial role delta (LiveKit convention)
self._event_ch.send_nowait(
llm.ChatChunk(
id=request_id,
delta=llm.ChoiceDelta(role="assistant"),
)
)
# 4. Stream events → ChatChunks until completed/error
deadline = time.time() + timeout_secs
while time.time() < deadline:
remaining = deadline - time.time()
try:
raw = await asyncio.wait_for(
ws.recv(), timeout=min(30.0, remaining)
)
except asyncio.TimeoutError:
if time.time() >= deadline:
logger.warning("Voice inject stream timeout after %ds", timeout_secs)
continue
except websockets.exceptions.ConnectionClosed:
logger.warning("Voice inject WS connection closed")
break
try:
msg = json.loads(raw)
except (json.JSONDecodeError, TypeError):
continue
if msg.get("event") != "stream_event":
continue
evt_data = msg.get("data", {})
evt_type = evt_data.get("type", "")
if evt_type == "text":
content = evt_data.get("content", "")
if content:
self._event_ch.send_nowait(
llm.ChatChunk(
id=request_id,
delta=llm.ChoiceDelta(content=content),
)
)
elif evt_type == "completed":
logger.info("Voice inject stream completed")
return
elif evt_type == "error":
err_msg = evt_data.get("message", "Unknown error")
logger.error("Voice inject agent error: %s", err_msg)
self._event_ch.send_nowait(
llm.ChatChunk(
id=request_id,
delta=llm.ChoiceDelta(content=f"Agent 错误: {err_msg}"),
)
)
return
# -------------------------------------------------------------------------
# Legacy per-task stream (text chat / fallback)
# -------------------------------------------------------------------------
async def _do_stream(self, user_text: str, request_id: str) -> None:
"""Execute a single WS+HTTP streaming attempt."""
"""Execute a single WS+HTTP streaming attempt (text-chat / non-voice mode)."""
import time
agent_url = self._llm_instance._agent_service_url
@ -382,19 +666,14 @@ class AgentServiceLLMStream(llm.LLMStream):
# 2. Create agent task (with timeout)
engine_type = self._llm_instance._engine_type
# Voice mode flag: tell agent-service to filter intermediate events
# (tool_use, tool_result, thinking) — only stream text + completed + error
voice_mode = engine_type == "claude_agent_sdk"
body: dict[str, Any] = {
"prompt": user_text, # always send clean user text (no wrapping)
"prompt": user_text,
"engineType": engine_type,
"voiceMode": voice_mode,
}
# Agent SDK mode: set systemPrompt once (not per-message) so
# conversation history stays clean — identical to text chat pattern
if voice_mode:
body["systemPrompt"] = (
"你正在通过语音与用户实时对话。请严格遵守以下规则:\n"
@ -409,9 +688,7 @@ class AgentServiceLLMStream(llm.LLMStream):
logger.info(
"POST /tasks engine=%s voiceMode=%s user_text=%s",
engine_type,
voice_mode,
user_text[:80],
engine_type, voice_mode, user_text[:80],
)
async with httpx.AsyncClient(
timeout=httpx.Timeout(connect=10, read=30, write=10, pool=10),
@ -505,15 +782,9 @@ class AgentServiceLLMStream(llm.LLMStream):
elif evt_type == "error":
err_msg = evt_data.get("message", "Unknown error")
logger.error("Agent error: %s", err_msg)
# Clear session so next task starts fresh
# (don't try to resume a dead/aborted session)
# But skip if inject is in progress — it owns the session
if "aborted" in err_msg.lower() or "exited" in err_msg.lower():
if not self._llm_instance._injecting:
logger.info("Clearing agent session after abort/exit")
self._llm_instance._agent_session_id = None
else:
logger.info("Skipping session clear — inject in progress")
logger.info("Clearing agent session after abort/exit")
self._llm_instance._agent_session_id = None
self._event_ch.send_nowait(
llm.ChatChunk(
id=request_id,