fix: improve voice pipeline robustness for poor network conditions

Flutter (agent_call_page.dart):
- Add ConnectOptions with 15s timeouts for connection/peerConnection/iceRestart
- Add RoomReconnectingEvent/RoomAttemptReconnectEvent/RoomReconnectedEvent
  listeners with "网络重连中" UI indicator during reconnection
- Add TimeoutException detection in _friendlyError()

voice-agent (agent.py):
- Wrap entrypoint() in try-except with full traceback logging
- Register room disconnect listener to close httpx clients (instead of
  finally block, since session.start() returns while session runs in bg)
- Add asyncio import for ensure_future cleanup

voice-agent LLM proxy (agent_llm.py):
- Add retry with exponential backoff (max 2 retries, 1s/3s delays) for
  network errors (ConnectError/ConnectTimeout/OSError) and WS InvalidStatusCode
- Extract _do_stream() method for single-attempt logic
- Add WebSocket connection params: open_timeout=10, ping_interval=20,
  ping_timeout=10 for keepalive and faster dead-connection detection
- Use granular httpx.Timeout(connect=10, read=30, write=10, pool=10)
- Increase WS recv timeout from 5s to 30s to reduce unnecessary loops

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-03-01 23:34:55 -08:00
parent 32922c6819
commit e66c187353
3 changed files with 351 additions and 208 deletions

View File

@ -42,6 +42,10 @@ class _AgentCallPageState extends ConsumerState<AgentCallPage>
bool _isMuted = false; bool _isMuted = false;
bool _isSpeaker = true; bool _isSpeaker = true;
// Reconnection state
bool _isReconnecting = false;
int _reconnectAttempt = 0;
// Prevent double-actions // Prevent double-actions
bool _userEndedCall = false; bool _userEndedCall = false;
@ -95,16 +99,47 @@ class _AgentCallPageState extends ConsumerState<AgentCallPage>
..on<TrackUnsubscribedEvent>((event) { ..on<TrackUnsubscribedEvent>((event) {
// Agent's audio track removed // Agent's audio track removed
}) })
..on<RoomReconnectingEvent>((event) {
if (mounted) {
setState(() {
_isReconnecting = true;
_reconnectAttempt = 0;
});
}
})
..on<RoomAttemptReconnectEvent>((event) {
if (mounted) {
setState(() {
_reconnectAttempt = event.attempt;
});
}
})
..on<RoomReconnectedEvent>((event) {
if (mounted) {
setState(() {
_isReconnecting = false;
_reconnectAttempt = 0;
});
}
})
..on<RoomDisconnectedEvent>((event) { ..on<RoomDisconnectedEvent>((event) {
if (_phase != _CallPhase.ended && !_userEndedCall) { if (_phase != _CallPhase.ended && !_userEndedCall) {
_onCallEnded(); _onCallEnded();
} }
}); });
// 4. Connect to LiveKit room // 4. Connect to LiveKit room (with timeout)
await _room!.connect( await _room!.connect(
livekitUrl, livekitUrl,
token, token,
connectOptions: const ConnectOptions(
timeouts: Timeouts(
connection: Duration(seconds: 15),
peerConnection: Duration(seconds: 15),
iceRestart: Duration(seconds: 15),
publish: Duration(seconds: 15),
),
),
roomOptions: const RoomOptions( roomOptions: const RoomOptions(
adaptiveStream: true, adaptiveStream: true,
dynacast: true, dynacast: true,
@ -284,6 +319,28 @@ class _AgentCallPageState extends ConsumerState<AgentCallPage>
), ),
), ),
const SizedBox(height: 24), const SizedBox(height: 24),
if (_phase == _CallPhase.active && _isReconnecting)
Padding(
padding: const EdgeInsets.only(bottom: 12),
child: Row(
mainAxisAlignment: MainAxisAlignment.center,
children: [
const SizedBox(
width: 14,
height: 14,
child: CircularProgressIndicator(strokeWidth: 2),
),
const SizedBox(width: 8),
Text(
'网络重连中${_reconnectAttempt > 0 ? ' ($_reconnectAttempt)' : ''}...',
style: TextStyle(
color: AppColors.warning,
fontSize: 13,
),
),
],
),
),
if (_phase == _CallPhase.active) _buildWaveform(), if (_phase == _CallPhase.active) _buildWaveform(),
const Spacer(flex: 3), const Spacer(flex: 3),
_buildControls(), _buildControls(),
@ -344,6 +401,9 @@ class _AgentCallPageState extends ConsumerState<AgentCallPage>
if (s.contains('SocketException') || s.contains('Connection refused')) { if (s.contains('SocketException') || s.contains('Connection refused')) {
return '无法连接到服务器'; return '无法连接到服务器';
} }
if (s.contains('TimeoutException') || s.contains('timed out')) {
return '连接超时,请检查网络';
}
if (s.length > 80) return '${s.substring(0, 80)}...'; if (s.length > 80) return '${s.substring(0, 80)}...';
return s; return s;
} }

View File

@ -8,6 +8,7 @@ Usage:
python -m src.agent start python -m src.agent start
""" """
import asyncio
import json import json
import logging import logging
import ssl import ssl
@ -167,101 +168,132 @@ server.setup_fnc = prewarm
@server.rtc_session(agent_name="voice-agent") @server.rtc_session(agent_name="voice-agent")
async def entrypoint(ctx: JobContext) -> None: async def entrypoint(ctx: JobContext) -> None:
"""Main entrypoint — called for each voice session.""" """Main entrypoint — called for each voice session.
NOTE: session.start() returns immediately while the session continues
running in the background. Resources (httpx clients) must stay alive
for the session's lifetime and are cleaned up via the room disconnect
listener, NOT in a finally block.
"""
logger.info("New voice session: room=%s", ctx.room.name) logger.info("New voice session: room=%s", ctx.room.name)
# Extract auth header from job metadata # httpx clients to close when the room disconnects
# The token endpoint embeds {"auth_header": "Bearer ..."} via RoomAgentDispatch metadata, _http_clients: list = []
# which LiveKit passes through as job.metadata to the agent worker.
auth_header = "" async def _on_room_disconnect() -> None:
tts_voice = settings.openai_tts_voice """Clean up httpx clients when the room disconnects."""
tts_style = "" for client in _http_clients:
try:
await client.aclose()
except Exception:
pass
logger.info("Cleaned up %d httpx client(s) for room %s",
len(_http_clients), ctx.room.name)
# Register cleanup before anything else so it fires even on errors
ctx.room.on("disconnected", lambda *_: asyncio.ensure_future(_on_room_disconnect()))
try: try:
meta_str = ctx.job.metadata or "{}" # Extract auth header from job metadata
meta = json.loads(meta_str) # The token endpoint embeds {"auth_header": "Bearer ..."} via RoomAgentDispatch metadata,
auth_header = meta.get("auth_header", "") # which LiveKit passes through as job.metadata to the agent worker.
tts_voice = meta.get("tts_voice", settings.openai_tts_voice) auth_header = ""
tts_style = meta.get("tts_style", "") tts_voice = settings.openai_tts_voice
except Exception as e: tts_style = ""
logger.warning("Failed to parse job metadata: %s", e) try:
meta_str = ctx.job.metadata or "{}"
meta = json.loads(meta_str)
auth_header = meta.get("auth_header", "")
tts_voice = meta.get("tts_voice", settings.openai_tts_voice)
tts_style = meta.get("tts_style", "")
except Exception as e:
logger.warning("Failed to parse job metadata: %s", e)
logger.info("Auth header present: %s, TTS: voice=%s, style=%s", logger.info("Auth header present: %s, TTS: voice=%s, style=%s",
bool(auth_header), tts_voice, tts_style[:50] if tts_style else "(default)") bool(auth_header), tts_voice, tts_style[:50] if tts_style else "(default)")
# Build STT # Build STT
if settings.stt_provider == "openai": if settings.stt_provider == "openai":
from livekit.plugins import openai as openai_plugin from livekit.plugins import openai as openai_plugin
import httpx as _httpx import httpx as _httpx
import openai as _openai import openai as _openai
# OPENAI_BASE_URL may use a self-signed certificate (e.g. proxy) # OPENAI_BASE_URL may use a self-signed certificate (e.g. proxy)
_http_client = _httpx.AsyncClient(verify=False) _http_client = _httpx.AsyncClient(verify=False)
_oai_client = _openai.AsyncOpenAI(http_client=_http_client) _http_clients.append(_http_client)
_oai_client = _openai.AsyncOpenAI(http_client=_http_client)
stt = openai_plugin.STT( stt = openai_plugin.STT(
model=settings.openai_stt_model, model=settings.openai_stt_model,
language=settings.whisper_language, language=settings.whisper_language,
client=_oai_client, client=_oai_client,
use_realtime=True, use_realtime=True,
# Increase silence_duration_ms so Chinese speech isn't chopped # Increase silence_duration_ms so Chinese speech isn't chopped
# into tiny fragments (default 350ms is too aggressive). # into tiny fragments (default 350ms is too aggressive).
turn_detection={ turn_detection={
"type": "server_vad", "type": "server_vad",
"threshold": 0.6, "threshold": 0.6,
"prefix_padding_ms": 600, "prefix_padding_ms": 600,
"silence_duration_ms": 800, "silence_duration_ms": 800,
}, },
) )
else: else:
stt = LocalWhisperSTT( stt = LocalWhisperSTT(
model=ctx.proc.userdata.get("whisper_model"), model=ctx.proc.userdata.get("whisper_model"),
language=settings.whisper_language, language=settings.whisper_language,
)
# Build TTS
if settings.tts_provider == "openai":
from livekit.plugins import openai as openai_plugin
import httpx as _httpx
import openai as _openai
_http_client_tts = _httpx.AsyncClient(verify=False)
_http_clients.append(_http_client_tts)
_oai_client_tts = _openai.AsyncOpenAI(http_client=_http_client_tts)
default_instructions = "用自然、友好的中文语气说话,语速稍快,简洁干练,像专业助手一样。"
tts = openai_plugin.TTS(
model=settings.openai_tts_model,
voice=tts_voice,
instructions=tts_style if tts_style else default_instructions,
client=_oai_client_tts,
)
else:
tts = LocalKokoroTTS(
pipeline=ctx.proc.userdata.get("kokoro_pipeline"),
voice=settings.kokoro_voice,
)
# Build custom LLM (proxies to agent-service)
llm = AgentServiceLLM(
agent_service_url=settings.agent_service_url,
auth_header=auth_header,
) )
# Build TTS # Create and start AgentSession with the full pipeline
if settings.tts_provider == "openai": session = AgentSession(
from livekit.plugins import openai as openai_plugin vad=ctx.proc.userdata["vad"],
import httpx as _httpx stt=stt,
import openai as _openai llm=llm,
tts=tts,
_http_client_tts = _httpx.AsyncClient(verify=False)
_oai_client_tts = _openai.AsyncOpenAI(http_client=_http_client_tts)
default_instructions = "用自然、友好的中文语气说话,语速稍快,简洁干练,像专业助手一样。"
tts = openai_plugin.TTS(
model=settings.openai_tts_model,
voice=tts_voice,
instructions=tts_style if tts_style else default_instructions,
client=_oai_client_tts,
)
else:
tts = LocalKokoroTTS(
pipeline=ctx.proc.userdata.get("kokoro_pipeline"),
voice=settings.kokoro_voice,
) )
# Build custom LLM (proxies to agent-service) await session.start(
llm = AgentServiceLLM( agent=IT0VoiceAgent(),
agent_service_url=settings.agent_service_url, room=ctx.room,
auth_header=auth_header, room_input_options=room_io.RoomInputOptions(),
) room_output_options=room_io.RoomOutputOptions(),
)
# Create and start AgentSession with the full pipeline logger.info("Voice session started for room %s", ctx.room.name)
session = AgentSession(
vad=ctx.proc.userdata["vad"],
stt=stt,
llm=llm,
tts=tts,
)
await session.start( except Exception as exc:
agent=IT0VoiceAgent(), logger.error(
room=ctx.room, "Voice session failed for room %s: %s: %s",
room_input_options=room_io.RoomInputOptions(), ctx.room.name, type(exc).__name__, exc, exc_info=True,
room_output_options=room_io.RoomOutputOptions(), )
)
logger.info("Voice session started for room %s", ctx.room.name)
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -89,6 +89,10 @@ class AgentServiceLLMStream(llm.LLMStream):
) )
self._llm_instance = llm_instance self._llm_instance = llm_instance
# Retry configuration
_MAX_RETRIES = 2
_RETRY_DELAYS = [1.0, 3.0] # seconds between retries
async def _run(self) -> None: async def _run(self) -> None:
# Extract the latest user message from ChatContext # Extract the latest user message from ChatContext
# items can contain ChatMessage and AgentConfigUpdate; filter by type # items can contain ChatMessage and AgentConfigUpdate; filter by type
@ -114,6 +118,65 @@ class AgentServiceLLMStream(llm.LLMStream):
logger.warning("No user message found in chat context") logger.warning("No user message found in chat context")
return return
request_id = f"agent-{uuid.uuid4().hex[:12]}"
last_error: Exception | None = None
for attempt in range(self._MAX_RETRIES + 1):
try:
if attempt > 0:
delay = self._RETRY_DELAYS[min(attempt - 1, len(self._RETRY_DELAYS) - 1)]
logger.info("Retry %d/%d after %.1fs", attempt, self._MAX_RETRIES, delay)
await asyncio.sleep(delay)
await self._do_stream(user_text, request_id)
return # success
except (httpx.ConnectError, httpx.ConnectTimeout, OSError) as exc:
# Network-level errors — retryable
last_error = exc
logger.warning(
"Agent stream attempt %d failed (network): %s: %s",
attempt + 1, type(exc).__name__, exc,
)
except websockets.exceptions.InvalidStatusCode as exc:
last_error = exc
logger.warning(
"Agent WS connect attempt %d failed: status %s",
attempt + 1, getattr(exc, "status_code", "?"),
)
except Exception as exc:
# Non-retryable errors — fail immediately
logger.error("Agent stream error: %s: %s", type(exc).__name__, exc)
self._event_ch.send_nowait(
llm.ChatChunk(
id=request_id,
delta=llm.ChoiceDelta(
role="assistant",
content="抱歉Agent服务暂时不可用。",
),
)
)
return
# All retries exhausted
logger.error(
"Agent stream failed after %d attempts: %s",
self._MAX_RETRIES + 1, last_error,
)
self._event_ch.send_nowait(
llm.ChatChunk(
id=request_id,
delta=llm.ChoiceDelta(
role="assistant",
content="抱歉Agent服务暂时不可用请稍后再试。",
),
)
)
async def _do_stream(self, user_text: str, request_id: str) -> None:
"""Execute a single WS+HTTP streaming attempt."""
import time
agent_url = self._llm_instance._agent_service_url agent_url = self._llm_instance._agent_service_url
auth_header = self._llm_instance._auth_header auth_header = self._llm_instance._auth_header
@ -124,142 +187,130 @@ class AgentServiceLLMStream(llm.LLMStream):
ws_url = agent_url.replace("http://", "ws://").replace("https://", "wss://") ws_url = agent_url.replace("http://", "ws://").replace("https://", "wss://")
ws_url = f"{ws_url}/ws/agent" ws_url = f"{ws_url}/ws/agent"
request_id = f"agent-{uuid.uuid4().hex[:12]}"
timeout_secs = 120 timeout_secs = 120
try: logger.info("Connecting to agent-service WS: %s", ws_url)
logger.info("Connecting to agent-service WS: %s", ws_url) async with websockets.connect(
async with websockets.connect(ws_url) as ws: ws_url,
# 1. Pre-subscribe with existing session ID (for event buffering) open_timeout=10,
if self._llm_instance._agent_session_id: close_timeout=5,
await ws.send(json.dumps({ ping_interval=20,
"event": "subscribe_session", ping_timeout=10,
"data": {"sessionId": self._llm_instance._agent_session_id}, ) as ws:
})) # 1. Pre-subscribe with existing session ID (for event buffering)
if self._llm_instance._agent_session_id:
await ws.send(json.dumps({
"event": "subscribe_session",
"data": {"sessionId": self._llm_instance._agent_session_id},
}))
# 2. Create agent task # 2. Create agent task (with timeout)
body: dict[str, Any] = { body: dict[str, Any] = {
"prompt": user_text, "prompt": user_text,
"engineType": "claude_api", "engineType": "claude_api",
} }
if self._llm_instance._agent_session_id: if self._llm_instance._agent_session_id:
body["sessionId"] = self._llm_instance._agent_session_id body["sessionId"] = self._llm_instance._agent_session_id
logger.info("POST /tasks prompt=%s", user_text[:80]) logger.info("POST /tasks prompt=%s", user_text[:80])
async with httpx.AsyncClient(timeout=30) as client: async with httpx.AsyncClient(
resp = await client.post( timeout=httpx.Timeout(connect=10, read=30, write=10, pool=10),
f"{agent_url}/api/v1/agent/tasks", ) as client:
json=body, resp = await client.post(
headers=headers, f"{agent_url}/api/v1/agent/tasks",
json=body,
headers=headers,
)
if resp.status_code not in (200, 201):
logger.error(
"Task creation failed: %d %s",
resp.status_code, resp.text[:200],
) )
self._event_ch.send_nowait(
if resp.status_code not in (200, 201): llm.ChatChunk(
logger.error( id=request_id,
"Task creation failed: %d %s", delta=llm.ChoiceDelta(
resp.status_code, resp.text[:200], role="assistant",
content="抱歉Agent服务暂时不可用。",
),
) )
)
return
data = resp.json()
session_id = data.get("sessionId", "")
task_id = data.get("taskId", "")
self._llm_instance._agent_session_id = session_id
logger.info(
"Task created: session=%s, task=%s", session_id, task_id
)
# 3. Subscribe with actual IDs
await ws.send(json.dumps({
"event": "subscribe_session",
"data": {"sessionId": session_id, "taskId": task_id},
}))
# 4. Send initial role delta
self._event_ch.send_nowait(
llm.ChatChunk(
id=request_id,
delta=llm.ChoiceDelta(role="assistant"),
)
)
# 5. Stream events → ChatChunk
deadline = time.time() + timeout_secs
while time.time() < deadline:
remaining = deadline - time.time()
try:
raw = await asyncio.wait_for(
ws.recv(), timeout=min(30.0, remaining)
)
except asyncio.TimeoutError:
if time.time() >= deadline:
logger.warning("Agent stream timeout after %ds", timeout_secs)
continue
except websockets.exceptions.ConnectionClosed:
logger.warning("Agent WS connection closed during streaming")
break
try:
msg = json.loads(raw)
except (json.JSONDecodeError, TypeError):
continue
event_type = msg.get("event", "")
if event_type == "stream_event":
evt_data = msg.get("data", {})
evt_type = evt_data.get("type", "")
if evt_type == "text":
content = evt_data.get("content", "")
if content:
self._event_ch.send_nowait(
llm.ChatChunk(
id=request_id,
delta=llm.ChoiceDelta(content=content),
)
)
elif evt_type == "completed":
logger.info("Agent stream completed")
return
elif evt_type == "error":
err_msg = evt_data.get("message", "Unknown error")
logger.error("Agent error: %s", err_msg)
self._event_ch.send_nowait( self._event_ch.send_nowait(
llm.ChatChunk( llm.ChatChunk(
id=request_id, id=request_id,
delta=llm.ChoiceDelta( delta=llm.ChoiceDelta(
role="assistant", content=f"Agent 错误: {err_msg}"
content="抱歉Agent服务暂时不可用。",
), ),
) )
) )
return return
data = resp.json()
session_id = data.get("sessionId", "")
task_id = data.get("taskId", "")
self._llm_instance._agent_session_id = session_id
logger.info(
"Task created: session=%s, task=%s", session_id, task_id
)
# 3. Subscribe with actual IDs
await ws.send(json.dumps({
"event": "subscribe_session",
"data": {"sessionId": session_id, "taskId": task_id},
}))
# 4. Send initial role delta
self._event_ch.send_nowait(
llm.ChatChunk(
id=request_id,
delta=llm.ChoiceDelta(role="assistant"),
)
)
# 5. Stream events → ChatChunk
import time
deadline = time.time() + timeout_secs
while time.time() < deadline:
remaining = deadline - time.time()
try:
raw = await asyncio.wait_for(
ws.recv(), timeout=min(5.0, remaining)
)
except asyncio.TimeoutError:
if time.time() >= deadline:
logger.warning("Agent stream timeout after %ds", timeout_secs)
continue
except websockets.exceptions.ConnectionClosed:
logger.warning("Agent WS connection closed")
break
try:
msg = json.loads(raw)
except (json.JSONDecodeError, TypeError):
continue
event_type = msg.get("event", "")
if event_type == "stream_event":
evt_data = msg.get("data", {})
evt_type = evt_data.get("type", "")
if evt_type == "text":
content = evt_data.get("content", "")
if content:
self._event_ch.send_nowait(
llm.ChatChunk(
id=request_id,
delta=llm.ChoiceDelta(content=content),
)
)
elif evt_type == "completed":
# If no text was streamed, use the summary
summary = evt_data.get("summary", "")
if summary:
# Check if we already sent text chunks
pass # LiveKit pipeline handles this
logger.info("Agent stream completed")
return
elif evt_type == "error":
err_msg = evt_data.get("message", "Unknown error")
logger.error("Agent error: %s", err_msg)
self._event_ch.send_nowait(
llm.ChatChunk(
id=request_id,
delta=llm.ChoiceDelta(
content=f"Agent 错误: {err_msg}"
),
)
)
return
except Exception as exc:
logger.error("Agent stream error: %s: %s", type(exc).__name__, exc)
self._event_ch.send_nowait(
llm.ChatChunk(
id=request_id,
delta=llm.ChoiceDelta(
role="assistant",
content="抱歉Agent服务暂时不可用。",
),
)
)