diff --git a/it0_app/lib/features/agent_call/presentation/pages/agent_call_page.dart b/it0_app/lib/features/agent_call/presentation/pages/agent_call_page.dart index b5797d2..74cb2c4 100644 --- a/it0_app/lib/features/agent_call/presentation/pages/agent_call_page.dart +++ b/it0_app/lib/features/agent_call/presentation/pages/agent_call_page.dart @@ -42,6 +42,10 @@ class _AgentCallPageState extends ConsumerState bool _isMuted = false; bool _isSpeaker = true; + // Reconnection state + bool _isReconnecting = false; + int _reconnectAttempt = 0; + // Prevent double-actions bool _userEndedCall = false; @@ -95,16 +99,47 @@ class _AgentCallPageState extends ConsumerState ..on((event) { // Agent's audio track removed }) + ..on((event) { + if (mounted) { + setState(() { + _isReconnecting = true; + _reconnectAttempt = 0; + }); + } + }) + ..on((event) { + if (mounted) { + setState(() { + _reconnectAttempt = event.attempt; + }); + } + }) + ..on((event) { + if (mounted) { + setState(() { + _isReconnecting = false; + _reconnectAttempt = 0; + }); + } + }) ..on((event) { if (_phase != _CallPhase.ended && !_userEndedCall) { _onCallEnded(); } }); - // 4. Connect to LiveKit room + // 4. Connect to LiveKit room (with timeout) await _room!.connect( livekitUrl, token, + connectOptions: const ConnectOptions( + timeouts: Timeouts( + connection: Duration(seconds: 15), + peerConnection: Duration(seconds: 15), + iceRestart: Duration(seconds: 15), + publish: Duration(seconds: 15), + ), + ), roomOptions: const RoomOptions( adaptiveStream: true, dynacast: true, @@ -284,6 +319,28 @@ class _AgentCallPageState extends ConsumerState ), ), const SizedBox(height: 24), + if (_phase == _CallPhase.active && _isReconnecting) + Padding( + padding: const EdgeInsets.only(bottom: 12), + child: Row( + mainAxisAlignment: MainAxisAlignment.center, + children: [ + const SizedBox( + width: 14, + height: 14, + child: CircularProgressIndicator(strokeWidth: 2), + ), + const SizedBox(width: 8), + Text( + '网络重连中${_reconnectAttempt > 0 ? ' ($_reconnectAttempt)' : ''}...', + style: TextStyle( + color: AppColors.warning, + fontSize: 13, + ), + ), + ], + ), + ), if (_phase == _CallPhase.active) _buildWaveform(), const Spacer(flex: 3), _buildControls(), @@ -344,6 +401,9 @@ class _AgentCallPageState extends ConsumerState if (s.contains('SocketException') || s.contains('Connection refused')) { return '无法连接到服务器'; } + if (s.contains('TimeoutException') || s.contains('timed out')) { + return '连接超时,请检查网络'; + } if (s.length > 80) return '${s.substring(0, 80)}...'; return s; } diff --git a/packages/services/voice-agent/src/agent.py b/packages/services/voice-agent/src/agent.py index cd63239..b671aa7 100644 --- a/packages/services/voice-agent/src/agent.py +++ b/packages/services/voice-agent/src/agent.py @@ -8,6 +8,7 @@ Usage: python -m src.agent start """ +import asyncio import json import logging import ssl @@ -167,101 +168,132 @@ server.setup_fnc = prewarm @server.rtc_session(agent_name="voice-agent") async def entrypoint(ctx: JobContext) -> None: - """Main entrypoint — called for each voice session.""" + """Main entrypoint — called for each voice session. + + NOTE: session.start() returns immediately while the session continues + running in the background. Resources (httpx clients) must stay alive + for the session's lifetime and are cleaned up via the room disconnect + listener, NOT in a finally block. + """ logger.info("New voice session: room=%s", ctx.room.name) - # Extract auth header from job metadata - # The token endpoint embeds {"auth_header": "Bearer ..."} via RoomAgentDispatch metadata, - # which LiveKit passes through as job.metadata to the agent worker. - auth_header = "" - tts_voice = settings.openai_tts_voice - tts_style = "" + # httpx clients to close when the room disconnects + _http_clients: list = [] + + async def _on_room_disconnect() -> None: + """Clean up httpx clients when the room disconnects.""" + for client in _http_clients: + try: + await client.aclose() + except Exception: + pass + logger.info("Cleaned up %d httpx client(s) for room %s", + len(_http_clients), ctx.room.name) + + # Register cleanup before anything else so it fires even on errors + ctx.room.on("disconnected", lambda *_: asyncio.ensure_future(_on_room_disconnect())) + try: - meta_str = ctx.job.metadata or "{}" - meta = json.loads(meta_str) - auth_header = meta.get("auth_header", "") - tts_voice = meta.get("tts_voice", settings.openai_tts_voice) - tts_style = meta.get("tts_style", "") - except Exception as e: - logger.warning("Failed to parse job metadata: %s", e) + # Extract auth header from job metadata + # The token endpoint embeds {"auth_header": "Bearer ..."} via RoomAgentDispatch metadata, + # which LiveKit passes through as job.metadata to the agent worker. + auth_header = "" + tts_voice = settings.openai_tts_voice + tts_style = "" + try: + meta_str = ctx.job.metadata or "{}" + meta = json.loads(meta_str) + auth_header = meta.get("auth_header", "") + tts_voice = meta.get("tts_voice", settings.openai_tts_voice) + tts_style = meta.get("tts_style", "") + except Exception as e: + logger.warning("Failed to parse job metadata: %s", e) - logger.info("Auth header present: %s, TTS: voice=%s, style=%s", - bool(auth_header), tts_voice, tts_style[:50] if tts_style else "(default)") + logger.info("Auth header present: %s, TTS: voice=%s, style=%s", + bool(auth_header), tts_voice, tts_style[:50] if tts_style else "(default)") - # Build STT - if settings.stt_provider == "openai": - from livekit.plugins import openai as openai_plugin - import httpx as _httpx - import openai as _openai + # Build STT + if settings.stt_provider == "openai": + from livekit.plugins import openai as openai_plugin + import httpx as _httpx + import openai as _openai - # OPENAI_BASE_URL may use a self-signed certificate (e.g. proxy) - _http_client = _httpx.AsyncClient(verify=False) - _oai_client = _openai.AsyncOpenAI(http_client=_http_client) + # OPENAI_BASE_URL may use a self-signed certificate (e.g. proxy) + _http_client = _httpx.AsyncClient(verify=False) + _http_clients.append(_http_client) + _oai_client = _openai.AsyncOpenAI(http_client=_http_client) - stt = openai_plugin.STT( - model=settings.openai_stt_model, - language=settings.whisper_language, - client=_oai_client, - use_realtime=True, - # Increase silence_duration_ms so Chinese speech isn't chopped - # into tiny fragments (default 350ms is too aggressive). - turn_detection={ - "type": "server_vad", - "threshold": 0.6, - "prefix_padding_ms": 600, - "silence_duration_ms": 800, - }, - ) - else: - stt = LocalWhisperSTT( - model=ctx.proc.userdata.get("whisper_model"), - language=settings.whisper_language, + stt = openai_plugin.STT( + model=settings.openai_stt_model, + language=settings.whisper_language, + client=_oai_client, + use_realtime=True, + # Increase silence_duration_ms so Chinese speech isn't chopped + # into tiny fragments (default 350ms is too aggressive). + turn_detection={ + "type": "server_vad", + "threshold": 0.6, + "prefix_padding_ms": 600, + "silence_duration_ms": 800, + }, + ) + else: + stt = LocalWhisperSTT( + model=ctx.proc.userdata.get("whisper_model"), + language=settings.whisper_language, + ) + + # Build TTS + if settings.tts_provider == "openai": + from livekit.plugins import openai as openai_plugin + import httpx as _httpx + import openai as _openai + + _http_client_tts = _httpx.AsyncClient(verify=False) + _http_clients.append(_http_client_tts) + _oai_client_tts = _openai.AsyncOpenAI(http_client=_http_client_tts) + + default_instructions = "用自然、友好的中文语气说话,语速稍快,简洁干练,像专业助手一样。" + tts = openai_plugin.TTS( + model=settings.openai_tts_model, + voice=tts_voice, + instructions=tts_style if tts_style else default_instructions, + client=_oai_client_tts, + ) + else: + tts = LocalKokoroTTS( + pipeline=ctx.proc.userdata.get("kokoro_pipeline"), + voice=settings.kokoro_voice, + ) + + # Build custom LLM (proxies to agent-service) + llm = AgentServiceLLM( + agent_service_url=settings.agent_service_url, + auth_header=auth_header, ) - # Build TTS - if settings.tts_provider == "openai": - from livekit.plugins import openai as openai_plugin - import httpx as _httpx - import openai as _openai - - _http_client_tts = _httpx.AsyncClient(verify=False) - _oai_client_tts = _openai.AsyncOpenAI(http_client=_http_client_tts) - - default_instructions = "用自然、友好的中文语气说话,语速稍快,简洁干练,像专业助手一样。" - tts = openai_plugin.TTS( - model=settings.openai_tts_model, - voice=tts_voice, - instructions=tts_style if tts_style else default_instructions, - client=_oai_client_tts, - ) - else: - tts = LocalKokoroTTS( - pipeline=ctx.proc.userdata.get("kokoro_pipeline"), - voice=settings.kokoro_voice, + # Create and start AgentSession with the full pipeline + session = AgentSession( + vad=ctx.proc.userdata["vad"], + stt=stt, + llm=llm, + tts=tts, ) - # Build custom LLM (proxies to agent-service) - llm = AgentServiceLLM( - agent_service_url=settings.agent_service_url, - auth_header=auth_header, - ) + await session.start( + agent=IT0VoiceAgent(), + room=ctx.room, + room_input_options=room_io.RoomInputOptions(), + room_output_options=room_io.RoomOutputOptions(), + ) - # Create and start AgentSession with the full pipeline - session = AgentSession( - vad=ctx.proc.userdata["vad"], - stt=stt, - llm=llm, - tts=tts, - ) + logger.info("Voice session started for room %s", ctx.room.name) - await session.start( - agent=IT0VoiceAgent(), - room=ctx.room, - room_input_options=room_io.RoomInputOptions(), - room_output_options=room_io.RoomOutputOptions(), - ) - - logger.info("Voice session started for room %s", ctx.room.name) + except Exception as exc: + logger.error( + "Voice session failed for room %s: %s: %s", + ctx.room.name, type(exc).__name__, exc, exc_info=True, + ) if __name__ == "__main__": diff --git a/packages/services/voice-agent/src/plugins/agent_llm.py b/packages/services/voice-agent/src/plugins/agent_llm.py index a600068..90c66af 100644 --- a/packages/services/voice-agent/src/plugins/agent_llm.py +++ b/packages/services/voice-agent/src/plugins/agent_llm.py @@ -89,6 +89,10 @@ class AgentServiceLLMStream(llm.LLMStream): ) self._llm_instance = llm_instance + # Retry configuration + _MAX_RETRIES = 2 + _RETRY_DELAYS = [1.0, 3.0] # seconds between retries + async def _run(self) -> None: # Extract the latest user message from ChatContext # items can contain ChatMessage and AgentConfigUpdate; filter by type @@ -114,6 +118,65 @@ class AgentServiceLLMStream(llm.LLMStream): logger.warning("No user message found in chat context") return + request_id = f"agent-{uuid.uuid4().hex[:12]}" + last_error: Exception | None = None + + for attempt in range(self._MAX_RETRIES + 1): + try: + if attempt > 0: + delay = self._RETRY_DELAYS[min(attempt - 1, len(self._RETRY_DELAYS) - 1)] + logger.info("Retry %d/%d after %.1fs", attempt, self._MAX_RETRIES, delay) + await asyncio.sleep(delay) + + await self._do_stream(user_text, request_id) + return # success + + except (httpx.ConnectError, httpx.ConnectTimeout, OSError) as exc: + # Network-level errors — retryable + last_error = exc + logger.warning( + "Agent stream attempt %d failed (network): %s: %s", + attempt + 1, type(exc).__name__, exc, + ) + except websockets.exceptions.InvalidStatusCode as exc: + last_error = exc + logger.warning( + "Agent WS connect attempt %d failed: status %s", + attempt + 1, getattr(exc, "status_code", "?"), + ) + except Exception as exc: + # Non-retryable errors — fail immediately + logger.error("Agent stream error: %s: %s", type(exc).__name__, exc) + self._event_ch.send_nowait( + llm.ChatChunk( + id=request_id, + delta=llm.ChoiceDelta( + role="assistant", + content="抱歉,Agent服务暂时不可用。", + ), + ) + ) + return + + # All retries exhausted + logger.error( + "Agent stream failed after %d attempts: %s", + self._MAX_RETRIES + 1, last_error, + ) + self._event_ch.send_nowait( + llm.ChatChunk( + id=request_id, + delta=llm.ChoiceDelta( + role="assistant", + content="抱歉,Agent服务暂时不可用,请稍后再试。", + ), + ) + ) + + async def _do_stream(self, user_text: str, request_id: str) -> None: + """Execute a single WS+HTTP streaming attempt.""" + import time + agent_url = self._llm_instance._agent_service_url auth_header = self._llm_instance._auth_header @@ -124,142 +187,130 @@ class AgentServiceLLMStream(llm.LLMStream): ws_url = agent_url.replace("http://", "ws://").replace("https://", "wss://") ws_url = f"{ws_url}/ws/agent" - request_id = f"agent-{uuid.uuid4().hex[:12]}" timeout_secs = 120 - try: - logger.info("Connecting to agent-service WS: %s", ws_url) - async with websockets.connect(ws_url) as ws: - # 1. Pre-subscribe with existing session ID (for event buffering) - if self._llm_instance._agent_session_id: - await ws.send(json.dumps({ - "event": "subscribe_session", - "data": {"sessionId": self._llm_instance._agent_session_id}, - })) + logger.info("Connecting to agent-service WS: %s", ws_url) + async with websockets.connect( + ws_url, + open_timeout=10, + close_timeout=5, + ping_interval=20, + ping_timeout=10, + ) as ws: + # 1. Pre-subscribe with existing session ID (for event buffering) + if self._llm_instance._agent_session_id: + await ws.send(json.dumps({ + "event": "subscribe_session", + "data": {"sessionId": self._llm_instance._agent_session_id}, + })) - # 2. Create agent task - body: dict[str, Any] = { - "prompt": user_text, - "engineType": "claude_api", - } - if self._llm_instance._agent_session_id: - body["sessionId"] = self._llm_instance._agent_session_id + # 2. Create agent task (with timeout) + body: dict[str, Any] = { + "prompt": user_text, + "engineType": "claude_api", + } + if self._llm_instance._agent_session_id: + body["sessionId"] = self._llm_instance._agent_session_id - logger.info("POST /tasks prompt=%s", user_text[:80]) - async with httpx.AsyncClient(timeout=30) as client: - resp = await client.post( - f"{agent_url}/api/v1/agent/tasks", - json=body, - headers=headers, + logger.info("POST /tasks prompt=%s", user_text[:80]) + async with httpx.AsyncClient( + timeout=httpx.Timeout(connect=10, read=30, write=10, pool=10), + ) as client: + resp = await client.post( + f"{agent_url}/api/v1/agent/tasks", + json=body, + headers=headers, + ) + + if resp.status_code not in (200, 201): + logger.error( + "Task creation failed: %d %s", + resp.status_code, resp.text[:200], ) - - if resp.status_code not in (200, 201): - logger.error( - "Task creation failed: %d %s", - resp.status_code, resp.text[:200], + self._event_ch.send_nowait( + llm.ChatChunk( + id=request_id, + delta=llm.ChoiceDelta( + role="assistant", + content="抱歉,Agent服务暂时不可用。", + ), ) + ) + return + + data = resp.json() + session_id = data.get("sessionId", "") + task_id = data.get("taskId", "") + self._llm_instance._agent_session_id = session_id + logger.info( + "Task created: session=%s, task=%s", session_id, task_id + ) + + # 3. Subscribe with actual IDs + await ws.send(json.dumps({ + "event": "subscribe_session", + "data": {"sessionId": session_id, "taskId": task_id}, + })) + + # 4. Send initial role delta + self._event_ch.send_nowait( + llm.ChatChunk( + id=request_id, + delta=llm.ChoiceDelta(role="assistant"), + ) + ) + + # 5. Stream events → ChatChunk + deadline = time.time() + timeout_secs + + while time.time() < deadline: + remaining = deadline - time.time() + try: + raw = await asyncio.wait_for( + ws.recv(), timeout=min(30.0, remaining) + ) + except asyncio.TimeoutError: + if time.time() >= deadline: + logger.warning("Agent stream timeout after %ds", timeout_secs) + continue + except websockets.exceptions.ConnectionClosed: + logger.warning("Agent WS connection closed during streaming") + break + + try: + msg = json.loads(raw) + except (json.JSONDecodeError, TypeError): + continue + + event_type = msg.get("event", "") + + if event_type == "stream_event": + evt_data = msg.get("data", {}) + evt_type = evt_data.get("type", "") + + if evt_type == "text": + content = evt_data.get("content", "") + if content: + self._event_ch.send_nowait( + llm.ChatChunk( + id=request_id, + delta=llm.ChoiceDelta(content=content), + ) + ) + + elif evt_type == "completed": + logger.info("Agent stream completed") + return + + elif evt_type == "error": + err_msg = evt_data.get("message", "Unknown error") + logger.error("Agent error: %s", err_msg) self._event_ch.send_nowait( llm.ChatChunk( id=request_id, delta=llm.ChoiceDelta( - role="assistant", - content="抱歉,Agent服务暂时不可用。", + content=f"Agent 错误: {err_msg}" ), ) ) return - - data = resp.json() - session_id = data.get("sessionId", "") - task_id = data.get("taskId", "") - self._llm_instance._agent_session_id = session_id - logger.info( - "Task created: session=%s, task=%s", session_id, task_id - ) - - # 3. Subscribe with actual IDs - await ws.send(json.dumps({ - "event": "subscribe_session", - "data": {"sessionId": session_id, "taskId": task_id}, - })) - - # 4. Send initial role delta - self._event_ch.send_nowait( - llm.ChatChunk( - id=request_id, - delta=llm.ChoiceDelta(role="assistant"), - ) - ) - - # 5. Stream events → ChatChunk - import time - deadline = time.time() + timeout_secs - - while time.time() < deadline: - remaining = deadline - time.time() - try: - raw = await asyncio.wait_for( - ws.recv(), timeout=min(5.0, remaining) - ) - except asyncio.TimeoutError: - if time.time() >= deadline: - logger.warning("Agent stream timeout after %ds", timeout_secs) - continue - except websockets.exceptions.ConnectionClosed: - logger.warning("Agent WS connection closed") - break - - try: - msg = json.loads(raw) - except (json.JSONDecodeError, TypeError): - continue - - event_type = msg.get("event", "") - - if event_type == "stream_event": - evt_data = msg.get("data", {}) - evt_type = evt_data.get("type", "") - - if evt_type == "text": - content = evt_data.get("content", "") - if content: - self._event_ch.send_nowait( - llm.ChatChunk( - id=request_id, - delta=llm.ChoiceDelta(content=content), - ) - ) - - elif evt_type == "completed": - # If no text was streamed, use the summary - summary = evt_data.get("summary", "") - if summary: - # Check if we already sent text chunks - pass # LiveKit pipeline handles this - logger.info("Agent stream completed") - return - - elif evt_type == "error": - err_msg = evt_data.get("message", "Unknown error") - logger.error("Agent error: %s", err_msg) - self._event_ch.send_nowait( - llm.ChatChunk( - id=request_id, - delta=llm.ChoiceDelta( - content=f"Agent 错误: {err_msg}" - ), - ) - ) - return - - except Exception as exc: - logger.error("Agent stream error: %s: %s", type(exc).__name__, exc) - self._event_ch.send_nowait( - llm.ChatChunk( - id=request_id, - delta=llm.ChoiceDelta( - role="assistant", - content="抱歉,Agent服务暂时不可用。", - ), - ) - )