fix: improve voice call reconnection robustness

Server side (session_router.py):
- /reconnect now accepts sessions in "active" state (not just "disconnected")
- When client reconnects to an active session, the old WebSocket/pipeline is
  automatically replaced when the new WebSocket connects
- Only truly terminal states (e.g. "ended") return 409

Flutter side (agent_call_page.dart):
- Distinguish terminal errors (404 session gone, 409 ended) from transient
  errors (network timeout, server unreachable) in reconnect loop
- Terminal errors break immediately instead of wasting retry attempts
- Extract _connectWebSocket() helper for cleaner reconnect flow
- Add DioException handling for proper HTTP status code inspection

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-02-25 07:33:34 -08:00
parent 57fabb4653
commit bc7e32061a
2 changed files with 80 additions and 51 deletions

View File

@ -2,6 +2,7 @@ import 'dart:async';
import 'dart:convert'; import 'dart:convert';
import 'dart:math'; import 'dart:math';
import 'dart:typed_data'; import 'dart:typed_data';
import 'package:dio/dio.dart';
import 'package:flutter/material.dart'; import 'package:flutter/material.dart';
import 'package:flutter_riverpod/flutter_riverpod.dart'; import 'package:flutter_riverpod/flutter_riverpod.dart';
import 'package:record/record.dart'; import 'package:record/record.dart';
@ -258,8 +259,13 @@ class _AgentCallPageState extends ConsumerState<AgentCallPage>
_triggerReconnect(); _triggerReconnect();
} }
/// Attempt to reconnect using exponential backoff (1 s, 2 s, 4 s, 8 s, /// Attempt to reconnect with exponential backoff (1s 2s 4s 8s 16s).
/// 16 s) with a maximum of [_maxReconnectAttempts] tries. ///
/// Strategy:
/// - 200 from /reconnect session alive, open new WebSocket
/// - 404 from /reconnect session expired/gone, stop retrying
/// - 409 from /reconnect terminal state (ended), stop retrying
/// - Network error / timeout transient, keep retrying
Future<void> _triggerReconnect() async { Future<void> _triggerReconnect() async {
if (_isReconnecting || _phase == _CallPhase.ended || _userEndedCall) return; if (_isReconnecting || _phase == _CallPhase.ended || _userEndedCall) return;
_isReconnecting = true; _isReconnecting = true;
@ -278,20 +284,43 @@ class _AgentCallPageState extends ConsumerState<AgentCallPage>
_reconnectAttempts = attempt + 1; _reconnectAttempts = attempt + 1;
if (!mounted || _phase == _CallPhase.ended || _userEndedCall) break; if (!mounted || _phase == _CallPhase.ended || _userEndedCall) break;
// Exponential backoff: 1s, 2s, 4s, 8s, 16s (first attempt waits 1s)
final delaySecs = min(pow(2, attempt).toInt(), 16); final delaySecs = min(pow(2, attempt).toInt(), 16);
await Future.delayed(Duration(seconds: delaySecs)); await Future.delayed(Duration(seconds: delaySecs));
if (!mounted || _phase == _CallPhase.ended || _userEndedCall) break; if (!mounted || _phase == _CallPhase.ended || _userEndedCall) break;
try { try {
// Ask backend if session is still alive
final dio = ref.read(dioClientProvider); final dio = ref.read(dioClientProvider);
final response = await dio.post( final response = await dio.post(
'${ApiEndpoints.voice}/sessions/$_sessionId/reconnect', '${ApiEndpoints.voice}/sessions/$_sessionId/reconnect',
); );
final data = response.data as Map<String, dynamic>; final data = response.data as Map<String, dynamic>;
// Build new WebSocket URL // Session is alive open new WebSocket
if (await _connectWebSocket(data)) return; // success
} on DioException catch (e) {
final statusCode = e.response?.statusCode;
if (statusCode == 404 || statusCode == 409) {
// 404: session expired / not found
// 409: session in terminal state (e.g. "ended")
// No point retrying break immediately
break;
}
// Other errors (network, timeout) keep retrying
} catch (_) {
// Unexpected error keep retrying
}
}
// All reconnection attempts exhausted or session gone
_isReconnecting = false;
_onCallEnded();
}
/// Open a new WebSocket from reconnect response data. Returns true on success.
Future<bool> _connectWebSocket(Map<String, dynamic> data) async {
try {
final config = ref.read(appConfigProvider); final config = ref.read(appConfigProvider);
String wsUrl; String wsUrl;
final backendWsUrl = data['websocket_url'] as String?; final backendWsUrl = data['websocket_url'] as String?;
@ -304,7 +333,6 @@ class _AgentCallPageState extends ConsumerState<AgentCallPage>
wsUrl = '${config.wsBaseUrl}/api/v1/voice/ws/$_sessionId'; wsUrl = '${config.wsBaseUrl}/api/v1/voice/ws/$_sessionId';
} }
// Connect new WebSocket
_audioChannel = WebSocketChannel.connect(Uri.parse(wsUrl)); _audioChannel = WebSocketChannel.connect(Uri.parse(wsUrl));
_audioSubscription = _audioChannel!.stream.listen( _audioSubscription = _audioChannel!.stream.listen(
_onWsData, _onWsData,
@ -321,17 +349,12 @@ class _AgentCallPageState extends ConsumerState<AgentCallPage>
_reconnectAttempts = 0; _reconnectAttempts = 0;
_isReconnecting = false; _isReconnecting = false;
if (mounted) setState(() {}); if (mounted) setState(() {});
return; // success return true;
} catch (_) { } catch (_) {
// Will retry on next iteration return false;
} }
} }
// All reconnection attempts exhausted
_isReconnecting = false;
_onCallEnded();
}
/// Serialises async feed() calls so they don't interleave. /// Serialises async feed() calls so they don't interleave.
Future<void> _feedChain = Future.value(); Future<void> _feedChain = Future.value();

View File

@ -123,11 +123,14 @@ async def end_session(session_id: str, req: Request):
@router.post("/sessions/{session_id}/reconnect", response_model=SessionResponse) @router.post("/sessions/{session_id}/reconnect", response_model=SessionResponse)
async def reconnect_session(session_id: str, req: Request): async def reconnect_session(session_id: str, req: Request):
"""Reconnect to a disconnected voice session. """Reconnect to a voice session.
If the session still exists and is in "disconnected" state (within the Accepts sessions in "disconnected" or "active" state. When the session is
``session_ttl`` window), return session info with a fresh websocket_url still active the old WebSocket will be replaced when the client opens a new
so the client can open a new WebSocket connection. one (``voice_websocket`` cancels the previous pipeline automatically).
Returns 404 when the session is not found or has expired, and 409 only for
truly terminal states like "ended".
""" """
if not hasattr(req.app.state, "sessions"): if not hasattr(req.app.state, "sessions"):
req.app.state.sessions = {} req.app.state.sessions = {}
@ -139,19 +142,22 @@ async def reconnect_session(session_id: str, req: Request):
content={"error": "Session not found or expired", "session_id": session_id}, content={"error": "Session not found or expired", "session_id": session_id},
) )
if session["status"] != "disconnected": status = session["status"]
# Terminal states cannot be reconnected
if status not in ("disconnected", "active"):
return JSONResponse( return JSONResponse(
status_code=409, status_code=409,
content={ content={
"error": f"Session is in '{session['status']}' state, not reconnectable", "error": f"Session is in '{status}' state, not reconnectable",
"session_id": session_id, "session_id": session_id,
}, },
) )
# Check if session has expired based on TTL # For disconnected sessions, check if TTL has expired
if status == "disconnected":
disconnected_at = session.get("disconnected_at", 0) disconnected_at = session.get("disconnected_at", 0)
if time.time() - disconnected_at > settings.session_ttl: if time.time() - disconnected_at > settings.session_ttl:
# Expired -- clean it up
req.app.state.sessions.pop(session_id, None) req.app.state.sessions.pop(session_id, None)
return JSONResponse( return JSONResponse(
status_code=404, status_code=404,
@ -161,7 +167,7 @@ async def reconnect_session(session_id: str, req: Request):
websocket_url = f"/ws/voice/{session_id}" websocket_url = f"/ws/voice/{session_id}"
return SessionResponse( return SessionResponse(
session_id=session_id, session_id=session_id,
status="disconnected", status=status,
websocket_url=websocket_url, websocket_url=websocket_url,
) )