diff --git a/it0_app/lib/features/agent_call/presentation/pages/agent_call_page.dart b/it0_app/lib/features/agent_call/presentation/pages/agent_call_page.dart index 138060f..d8e2bef 100644 --- a/it0_app/lib/features/agent_call/presentation/pages/agent_call_page.dart +++ b/it0_app/lib/features/agent_call/presentation/pages/agent_call_page.dart @@ -2,6 +2,7 @@ import 'dart:async'; import 'dart:convert'; import 'dart:math'; import 'dart:typed_data'; +import 'package:dio/dio.dart'; import 'package:flutter/material.dart'; import 'package:flutter_riverpod/flutter_riverpod.dart'; import 'package:record/record.dart'; @@ -258,8 +259,13 @@ class _AgentCallPageState extends ConsumerState _triggerReconnect(); } - /// Attempt to reconnect using exponential backoff (1 s, 2 s, 4 s, 8 s, - /// 16 s) with a maximum of [_maxReconnectAttempts] tries. + /// Attempt to reconnect with exponential backoff (1s → 2s → 4s → 8s → 16s). + /// + /// Strategy: + /// - 200 from /reconnect → session alive, open new WebSocket + /// - 404 from /reconnect → session expired/gone, stop retrying + /// - 409 from /reconnect → terminal state (ended), stop retrying + /// - Network error / timeout → transient, keep retrying Future _triggerReconnect() async { if (_isReconnecting || _phase == _CallPhase.ended || _userEndedCall) return; _isReconnecting = true; @@ -278,60 +284,77 @@ class _AgentCallPageState extends ConsumerState _reconnectAttempts = attempt + 1; if (!mounted || _phase == _CallPhase.ended || _userEndedCall) break; + // Exponential backoff: 1s, 2s, 4s, 8s, 16s (first attempt waits 1s) final delaySecs = min(pow(2, attempt).toInt(), 16); await Future.delayed(Duration(seconds: delaySecs)); if (!mounted || _phase == _CallPhase.ended || _userEndedCall) break; try { - // Ask backend if session is still alive final dio = ref.read(dioClientProvider); final response = await dio.post( '${ApiEndpoints.voice}/sessions/$_sessionId/reconnect', ); final data = response.data as Map; - // Build new WebSocket URL - final config = ref.read(appConfigProvider); - String wsUrl; - final backendWsUrl = data['websocket_url'] as String?; - if (backendWsUrl != null && backendWsUrl.startsWith('/')) { - final uri = Uri.parse(config.wsBaseUrl); - wsUrl = '${uri.scheme}://${uri.host}:${uri.port}$backendWsUrl'; - } else if (backendWsUrl != null) { - wsUrl = backendWsUrl; - } else { - wsUrl = '${config.wsBaseUrl}/api/v1/voice/ws/$_sessionId'; + // Session is alive — open new WebSocket + if (await _connectWebSocket(data)) return; // success + } on DioException catch (e) { + final statusCode = e.response?.statusCode; + if (statusCode == 404 || statusCode == 409) { + // 404: session expired / not found + // 409: session in terminal state (e.g. "ended") + // No point retrying — break immediately + break; } - - // Connect new WebSocket - _audioChannel = WebSocketChannel.connect(Uri.parse(wsUrl)); - _audioSubscription = _audioChannel!.stream.listen( - _onWsData, - onDone: () => _onWsDisconnected(), - onError: (_) => _onWsDisconnected(), - ); - - // Reset audio jitter buffer for a fresh start - _pcmPlayer.reset(); - - // Restart heartbeat - _startHeartbeat(); - - _reconnectAttempts = 0; - _isReconnecting = false; - if (mounted) setState(() {}); - return; // success + // Other errors (network, timeout) → keep retrying } catch (_) { - // Will retry on next iteration + // Unexpected error → keep retrying } } - // All reconnection attempts exhausted + // All reconnection attempts exhausted or session gone _isReconnecting = false; _onCallEnded(); } + /// Open a new WebSocket from reconnect response data. Returns true on success. + Future _connectWebSocket(Map data) async { + try { + final config = ref.read(appConfigProvider); + String wsUrl; + final backendWsUrl = data['websocket_url'] as String?; + if (backendWsUrl != null && backendWsUrl.startsWith('/')) { + final uri = Uri.parse(config.wsBaseUrl); + wsUrl = '${uri.scheme}://${uri.host}:${uri.port}$backendWsUrl'; + } else if (backendWsUrl != null) { + wsUrl = backendWsUrl; + } else { + wsUrl = '${config.wsBaseUrl}/api/v1/voice/ws/$_sessionId'; + } + + _audioChannel = WebSocketChannel.connect(Uri.parse(wsUrl)); + _audioSubscription = _audioChannel!.stream.listen( + _onWsData, + onDone: () => _onWsDisconnected(), + onError: (_) => _onWsDisconnected(), + ); + + // Reset audio jitter buffer for a fresh start + _pcmPlayer.reset(); + + // Restart heartbeat + _startHeartbeat(); + + _reconnectAttempts = 0; + _isReconnecting = false; + if (mounted) setState(() {}); + return true; + } catch (_) { + return false; + } + } + /// Serialises async feed() calls so they don't interleave. Future _feedChain = Future.value(); diff --git a/packages/services/voice-service/src/api/session_router.py b/packages/services/voice-service/src/api/session_router.py index 23984b4..b62dcdf 100644 --- a/packages/services/voice-service/src/api/session_router.py +++ b/packages/services/voice-service/src/api/session_router.py @@ -123,11 +123,14 @@ async def end_session(session_id: str, req: Request): @router.post("/sessions/{session_id}/reconnect", response_model=SessionResponse) async def reconnect_session(session_id: str, req: Request): - """Reconnect to a disconnected voice session. + """Reconnect to a voice session. - If the session still exists and is in "disconnected" state (within the - ``session_ttl`` window), return session info with a fresh websocket_url - so the client can open a new WebSocket connection. + Accepts sessions in "disconnected" or "active" state. When the session is + still active the old WebSocket will be replaced when the client opens a new + one (``voice_websocket`` cancels the previous pipeline automatically). + + Returns 404 when the session is not found or has expired, and 409 only for + truly terminal states like "ended". """ if not hasattr(req.app.state, "sessions"): req.app.state.sessions = {} @@ -139,29 +142,32 @@ async def reconnect_session(session_id: str, req: Request): content={"error": "Session not found or expired", "session_id": session_id}, ) - if session["status"] != "disconnected": + status = session["status"] + + # Terminal states cannot be reconnected + if status not in ("disconnected", "active"): return JSONResponse( status_code=409, content={ - "error": f"Session is in '{session['status']}' state, not reconnectable", + "error": f"Session is in '{status}' state, not reconnectable", "session_id": session_id, }, ) - # Check if session has expired based on TTL - disconnected_at = session.get("disconnected_at", 0) - if time.time() - disconnected_at > settings.session_ttl: - # Expired -- clean it up - req.app.state.sessions.pop(session_id, None) - return JSONResponse( - status_code=404, - content={"error": "Session expired", "session_id": session_id}, - ) + # For disconnected sessions, check if TTL has expired + if status == "disconnected": + disconnected_at = session.get("disconnected_at", 0) + if time.time() - disconnected_at > settings.session_ttl: + req.app.state.sessions.pop(session_id, None) + return JSONResponse( + status_code=404, + content={"error": "Session expired", "session_id": session_id}, + ) websocket_url = f"/ws/voice/{session_id}" return SessionResponse( session_id=session_id, - status="disconnected", + status=status, websocket_url=websocket_url, )