From 94a14b3104112243cdd6a9ca7f4365f5a9520f25 Mon Sep 17 00:00:00 2001 From: hailin Date: Sat, 28 Feb 2026 08:55:33 -0800 Subject: [PATCH] feat: migrate voice call from WebSocket/PCM to LiveKit WebRTC MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 实时语音对话架构迁移:WebSocket → LiveKit WebRTC ## 背景 原语音通话架构基于 FastAPI WebSocket 传输原始 PCM,管道串行执行 (VAD → 批量STT → Agent → 攒句 → 批量TTS),首音频延迟约 6 秒。 迁移到 LiveKit Agents 框架后,利用 WebRTC 传输 + 流水线并行, 预期延迟降至 1.5-2 秒。 ## 架构 Flutter App ←── WebRTC (Opus/UDP) ──→ LiveKit Server ←──→ Voice Agent livekit_client (自部署, Go) (Python, LiveKit Agents SDK) ├─ VAD (Silero) ├─ STT (faster-whisper / OpenAI) ├─ LLM (自定义插件 → agent-service) └─ TTS (Kokoro / OpenAI) 关键设计:LLM 不直接调用 Claude API,而是通过自定义插件代理到现有 agent-service,保留 Tool Use、会话历史、租户隔离等能力。 ## 新增服务 ### voice-agent (packages/services/voice-agent/) LiveKit Agent Worker,包含: - agent.py: 入口,prewarm() 预加载模型,entrypoint() 编排会话 - plugins/agent_llm.py: 自定义 LLM 插件,代理 agent-service API - POST /api/v1/agent/tasks 创建任务 - WS /ws/agent 订阅流式事件 (stream_event) - 跨轮复用 session_id 保持对话上下文 - plugins/whisper_stt.py: 本地 faster-whisper STT (批量识别) - plugins/kokoro_tts.py: 本地 Kokoro-82M TTS (24kHz PCM) - config.py: pydantic-settings 配置 ### LiveKit Server (deploy/docker/) - livekit.yaml: 信令端口 7880, RTC TCP 7881, UDP 50000-50200 - docker-compose.yml: 新增 livekit-server + voice-agent 容器 ### LiveKit Token 端点 - voice-service/src/api/livekit_token.py: POST /api/v1/voice/livekit/token 生成 Room JWT,嵌入 auth_header 到 AgentDispatch metadata ## Flutter 客户端改造 - agent_call_page.dart: 从 ~814 行简化到 ~380 行 - 替换: WebSocketChannel, AudioRecorder, PcmPlayer, 手动心跳/重连 - 使用: Room.connect(), setMicrophoneEnabled(true), LiveKit 事件监听 - 波形动画改用 participant.audioLevel - pubspec.yaml: 添加 livekit_client: ^2.3.0 - app_config.dart: 增加 livekitUrl 字段 - api_endpoints.dart: 增加 livekitToken 端点 ## 配置说明 (环境变量) - STT_PROVIDER: local (默认, faster-whisper) / openai - TTS_PROVIDER: local (默认, Kokoro) / openai - WHISPER_MODEL: base (默认) / small / medium / large - WHISPER_LANGUAGE: zh (默认) - KOKORO_VOICE: zf_xiaoxiao (默认) - DEVICE: cpu (默认) / cuda ## 不变的部分 - agent-service: 完全不改,voice-agent 通过现有 API 调用 - voice-service 核心: pipeline/STT/TTS/VAD 保留 (Twilio 备用) - Kong 网关: 现有路由不变 - 数据库: 无 schema 变更 Co-Authored-By: Claude Opus 4.6 --- deploy/docker/docker-compose.yml | 56 ++ deploy/docker/livekit.yaml | 12 + it0_app/lib/core/config/api_endpoints.dart | 1 + it0_app/lib/core/config/app_config.dart | 4 + .../presentation/pages/agent_call_page.dart | 614 +++++------------- it0_app/pubspec.yaml | 1 + packages/services/voice-agent/Dockerfile | 17 + .../services/voice-agent/requirements.txt | 16 + packages/services/voice-agent/src/__init__.py | 0 packages/services/voice-agent/src/agent.py | 188 ++++++ packages/services/voice-agent/src/config.py | 40 ++ .../voice-agent/src/plugins/__init__.py | 0 .../voice-agent/src/plugins/agent_llm.py | 252 +++++++ .../voice-agent/src/plugins/kokoro_tts.py | 129 ++++ .../voice-agent/src/plugins/whisper_stt.py | 117 ++++ .../services/voice-service/requirements.txt | 1 + .../voice-service/src/api/livekit_token.py | 63 ++ .../services/voice-service/src/api/main.py | 2 + .../voice-service/src/config/settings.py | 5 + 19 files changed, 1059 insertions(+), 459 deletions(-) create mode 100644 deploy/docker/livekit.yaml create mode 100644 packages/services/voice-agent/Dockerfile create mode 100644 packages/services/voice-agent/requirements.txt create mode 100644 packages/services/voice-agent/src/__init__.py create mode 100644 packages/services/voice-agent/src/agent.py create mode 100644 packages/services/voice-agent/src/config.py create mode 100644 packages/services/voice-agent/src/plugins/__init__.py create mode 100644 packages/services/voice-agent/src/plugins/agent_llm.py create mode 100644 packages/services/voice-agent/src/plugins/kokoro_tts.py create mode 100644 packages/services/voice-agent/src/plugins/whisper_stt.py create mode 100644 packages/services/voice-service/src/api/livekit_token.py diff --git a/deploy/docker/docker-compose.yml b/deploy/docker/docker-compose.yml index 60127c2..b086f96 100644 --- a/deploy/docker/docker-compose.yml +++ b/deploy/docker/docker-compose.yml @@ -311,6 +311,59 @@ services: networks: - it0-network + # ===== LiveKit Infrastructure ===== + livekit-server: + image: livekit/livekit-server:latest + container_name: it0-livekit-server + restart: unless-stopped + command: --config /etc/livekit.yaml + ports: + - "17880:7880" + - "17881:7881" + - "50000-50200:50000-50200/udp" + volumes: + - ./livekit.yaml:/etc/livekit.yaml:ro + healthcheck: + test: ["CMD-SHELL", "wget --no-verbose --tries=1 --spider http://localhost:7880 || exit 1"] + interval: 10s + timeout: 5s + retries: 3 + networks: + - it0-network + + voice-agent: + build: + context: ../../packages/services/voice-agent + container_name: it0-voice-agent + restart: unless-stopped + volumes: + - ../../data/voice-models/huggingface:/root/.cache/huggingface + - ../../data/voice-models/torch-hub:/root/.cache/torch/hub + environment: + - LIVEKIT_URL=ws://livekit-server:7880 + - LIVEKIT_API_KEY=devkey + - LIVEKIT_API_SECRET=devsecret + - AGENT_SERVICE_URL=http://agent-service:3002 + - STT_PROVIDER=${STT_PROVIDER:-local} + - TTS_PROVIDER=${TTS_PROVIDER:-local} + - WHISPER_MODEL=${WHISPER_MODEL:-base} + - WHISPER_LANGUAGE=${WHISPER_LANGUAGE:-zh} + - KOKORO_VOICE=${KOKORO_VOICE:-zf_xiaoxiao} + - DEVICE=${VOICE_DEVICE:-cpu} + - OPENAI_API_KEY=${OPENAI_API_KEY} + - OPENAI_BASE_URL=${OPENAI_BASE_URL} + - OPENAI_STT_MODEL=${OPENAI_STT_MODEL:-gpt-4o-transcribe} + - OPENAI_TTS_MODEL=${OPENAI_TTS_MODEL:-tts-1} + - OPENAI_TTS_VOICE=${OPENAI_TTS_VOICE:-alloy} + depends_on: + livekit-server: + condition: service_healthy + agent-service: + condition: service_healthy + networks: + - it0-network + + # ===== Voice Service (legacy WebSocket + Twilio) ===== voice-service: build: context: ../../packages/services/voice-service @@ -336,6 +389,9 @@ services: - OPENAI_STT_MODEL=${OPENAI_STT_MODEL:-gpt-4o-transcribe} - OPENAI_TTS_MODEL=${OPENAI_TTS_MODEL:-tts-1} - OPENAI_TTS_VOICE=${OPENAI_TTS_VOICE:-alloy} + - LIVEKIT_API_KEY=devkey + - LIVEKIT_API_SECRET=devsecret + - LIVEKIT_WS_URL=ws://livekit-server:7880 healthcheck: test: ["CMD-SHELL", "python3 -c \"import urllib.request; urllib.request.urlopen('http://localhost:3008/docs')\""] interval: 30s diff --git a/deploy/docker/livekit.yaml b/deploy/docker/livekit.yaml new file mode 100644 index 0000000..8ff7986 --- /dev/null +++ b/deploy/docker/livekit.yaml @@ -0,0 +1,12 @@ +port: 7880 +rtc: + port_range_start: 50000 + port_range_end: 50200 + tcp_port: 7881 + use_external_ip: true + +keys: + devkey: devsecret + +logging: + level: info diff --git a/it0_app/lib/core/config/api_endpoints.dart b/it0_app/lib/core/config/api_endpoints.dart index 66cbd99..e4f0116 100644 --- a/it0_app/lib/core/config/api_endpoints.dart +++ b/it0_app/lib/core/config/api_endpoints.dart @@ -40,6 +40,7 @@ class ApiEndpoints { // Voice static const String transcribe = '$voice/transcribe'; + static const String livekitToken = '$voice/livekit/token'; // Admin Settings static const String adminSettings = '/api/v1/admin/settings'; diff --git a/it0_app/lib/core/config/app_config.dart b/it0_app/lib/core/config/app_config.dart index f290f31..9ad6244 100644 --- a/it0_app/lib/core/config/app_config.dart +++ b/it0_app/lib/core/config/app_config.dart @@ -3,23 +3,27 @@ import 'package:flutter_riverpod/flutter_riverpod.dart'; class AppConfig { final String apiBaseUrl; final String wsBaseUrl; + final String livekitUrl; final String environment; const AppConfig({ required this.apiBaseUrl, required this.wsBaseUrl, + required this.livekitUrl, required this.environment, }); factory AppConfig.development() => const AppConfig( apiBaseUrl: 'http://10.0.2.2:18000', wsBaseUrl: 'ws://10.0.2.2:18000', + livekitUrl: 'ws://10.0.2.2:17880', environment: 'development', ); factory AppConfig.production() => const AppConfig( apiBaseUrl: 'http://154.84.135.121:18000', wsBaseUrl: 'ws://154.84.135.121:18000', + livekitUrl: 'ws://154.84.135.121:17880', environment: 'production', ); } diff --git a/it0_app/lib/features/agent_call/presentation/pages/agent_call_page.dart b/it0_app/lib/features/agent_call/presentation/pages/agent_call_page.dart index d8e2bef..8c88312 100644 --- a/it0_app/lib/features/agent_call/presentation/pages/agent_call_page.dart +++ b/it0_app/lib/features/agent_call/presentation/pages/agent_call_page.dart @@ -1,14 +1,8 @@ import 'dart:async'; -import 'dart:convert'; import 'dart:math'; -import 'dart:typed_data'; -import 'package:dio/dio.dart'; import 'package:flutter/material.dart'; import 'package:flutter_riverpod/flutter_riverpod.dart'; -import 'package:record/record.dart'; -import 'package:web_socket_channel/web_socket_channel.dart'; -import '../../../../core/audio/pcm_player.dart'; -import '../../../../core/audio/speech_enhancer.dart'; +import 'package:livekit_client/livekit_client.dart'; import '../../../../core/config/api_endpoints.dart'; import '../../../../core/config/app_config.dart'; import '../../../../core/network/dio_client.dart'; @@ -28,17 +22,10 @@ class _AgentCallPageState extends ConsumerState with SingleTickerProviderStateMixin { _CallPhase _phase = _CallPhase.ringing; String? _errorMessage; - String? _sessionId; - WebSocketChannel? _audioChannel; - StreamSubscription? _audioSubscription; - // Audio capture - late final AudioRecorder _recorder; - final SpeechEnhancer _enhancer = SpeechEnhancer(); - StreamSubscription>? _micSubscription; - - // Audio playback - final PcmPlayer _pcmPlayer = PcmPlayer(); + // LiveKit + Room? _room; + EventsListener? _listener; // Call duration final Stopwatch _stopwatch = Stopwatch(); @@ -48,25 +35,17 @@ class _AgentCallPageState extends ConsumerState // Waveform late AnimationController _waveController; final List _waveHeights = List.generate(20, (_) => 0.3); + Timer? _waveTimer; - // Mute / speaker state + // Mute state bool _isMuted = false; - bool _isSpeakerOn = false; - // Reconnection - int _reconnectAttempts = 0; - static const int _maxReconnectAttempts = 5; - Timer? _reconnectTimer; - Timer? _heartbeatCheckTimer; - DateTime? _lastServerPing; - bool _isReconnecting = false; + // Prevent double-actions bool _userEndedCall = false; @override void initState() { super.initState(); - _recorder = AudioRecorder(); - _enhancer.init(); _waveController = AnimationController( vsync: this, duration: const Duration(milliseconds: 500), @@ -77,7 +56,7 @@ class _AgentCallPageState extends ConsumerState // Call lifecycle // --------------------------------------------------------------------------- - /// Accept call: create voice session, connect WebSocket, start mic + player. + /// Accept call: get LiveKit token, connect to room, publish mic. Future _acceptCall() async { setState(() { _phase = _CallPhase.connecting; @@ -87,45 +66,49 @@ class _AgentCallPageState extends ConsumerState try { final dio = ref.read(dioClientProvider); final config = ref.read(appConfigProvider); - final response = await dio.post('${ApiEndpoints.voice}/sessions', data: {}); + + // 1. Get LiveKit token from backend + final response = await dio.post(ApiEndpoints.livekitToken); final data = response.data as Map; + final token = data['token'] as String; + final livekitUrl = data['livekit_url'] as String? ?? config.livekitUrl; - _sessionId = data['session_id'] as String? ?? data['sessionId'] as String? ?? data['id'] as String?; + // 2. Create and connect LiveKit room + _room = Room(); - // Build WebSocket URL — prefer backend-returned path - String wsUrl; - final backendWsUrl = - data['websocket_url'] as String? ?? data['ws_url'] as String?; - if (backendWsUrl != null && backendWsUrl.startsWith('/')) { - // Relative path from backend — prepend wsBaseUrl host - final uri = Uri.parse(config.wsBaseUrl); - wsUrl = '${uri.scheme}://${uri.host}:${uri.port}$backendWsUrl'; - } else if (backendWsUrl != null) { - wsUrl = backendWsUrl; - } else { - wsUrl = '${config.wsBaseUrl}/api/v1/voice/ws/$_sessionId'; - } + // 3. Set up event listeners + _listener = _room!.createListener(); + _listener! + ..on((event) { + // Agent's audio track subscribed — playback is automatic via WebRTC + _startWaveformUpdates(); + }) + ..on((event) { + // Agent's audio track removed + }) + ..on((event) { + if (_phase != _CallPhase.ended && !_userEndedCall) { + _onCallEnded(); + } + }); - // Connect WebSocket - _audioChannel = WebSocketChannel.connect(Uri.parse(wsUrl)); - - // Listen for incoming data (text control messages + binary audio) - _audioSubscription = _audioChannel!.stream.listen( - _onWsData, - onDone: () => _onWsDisconnected(), - onError: (_) => _onWsDisconnected(), + // 4. Connect to LiveKit room + await _room!.connect( + livekitUrl, + token, + roomOptions: const RoomOptions( + adaptiveStream: true, + dynacast: true, + defaultAudioPublishOptions: AudioPublishOptions( + audioBitrate: AudioParameters.speechStereo.maxBitrate, + ), + ), ); - // Start bidirectional heartbeat - _startHeartbeat(); + // 5. Enable microphone (publishes local audio track) + await _room!.localParticipant?.setMicrophoneEnabled(true); - // Initialize audio playback - await _pcmPlayer.init(); - - // Start mic capture - await _startMicCapture(); - - // Start duration timer + // 6. Start duration timer _stopwatch.start(); _durationTimer = Timer.periodic(const Duration(seconds: 1), (_) { if (!mounted) return; @@ -149,301 +132,51 @@ class _AgentCallPageState extends ConsumerState } } - /// Start real microphone capture: record PCM 16kHz mono, denoise, send over WS. - Future _startMicCapture() async { - final hasPermission = await _recorder.hasPermission(); - if (!hasPermission) return; - - final stream = await _recorder.startStream(const RecordConfig( - encoder: AudioEncoder.pcm16bits, - sampleRate: 16000, - numChannels: 1, - noiseSuppress: true, - autoGain: true, - )); - - _micSubscription = stream.listen((chunk) { - if (_phase != _CallPhase.active || _isMuted) return; - try { - // Denoise each chunk with GTCRN - final pcm = Uint8List.fromList(chunk); - final denoised = _enhancer.enhance(pcm); - _audioChannel?.sink.add(denoised); - } catch (_) {} - }); - } - - // --------------------------------------------------------------------------- - // WebSocket data routing & heartbeat - // --------------------------------------------------------------------------- - - /// Top-level handler that distinguishes text (JSON control) from binary - /// (audio) WebSocket frames. - void _onWsData(dynamic data) { - if (data is String) { - _handleControlMessage(data); - } else { - _onAudioReceived(data); - } - } - - /// Decode a JSON control frame from the server and respond accordingly. - void _handleControlMessage(String jsonStr) { - try { - final msg = jsonDecode(jsonStr) as Map; - final type = msg['type'] as String?; - switch (type) { - case 'ping': - // Server heartbeat — update liveness timestamp. - // Note: We don't send pong back because Pipecat owns the server-side - // WebSocket read loop and would intercept the text frame. - _lastServerPing = DateTime.now(); - break; - case 'session.resumed': - // Reconnection confirmed by server - _lastServerPing = DateTime.now(); - break; - case 'session.ended': - _onCallEnded(); - break; - case 'error': - if (mounted) { - final detail = msg['message'] as String? ?? 'Unknown error'; - ScaffoldMessenger.of(context).showSnackBar( - SnackBar( - content: Text('服务端错误: $detail'), - backgroundColor: AppColors.error, - ), - ); - } - break; + /// Start waveform animation driven by remote participant audio level. + void _startWaveformUpdates() { + _waveTimer?.cancel(); + _waveTimer = Timer.periodic(const Duration(milliseconds: 100), (timer) { + if (!mounted || _phase != _CallPhase.active) { + timer.cancel(); + return; } - } catch (_) { - // Malformed JSON — ignore - } - } - /// Start monitoring server liveness based on incoming pings. - /// - /// The server sends `{"type": "ping"}` every 15 s. If we haven't received - /// one in 45 s (3 missed pings), we assume the connection is dead and - /// trigger a reconnect. - void _startHeartbeat() { - _heartbeatCheckTimer?.cancel(); - - // Mark initial timestamp so the first 45-second window starts now - _lastServerPing = DateTime.now(); - - // Periodically verify server is alive - _heartbeatCheckTimer = Timer.periodic(const Duration(seconds: 10), (_) { - if (_lastServerPing != null && - DateTime.now().difference(_lastServerPing!).inSeconds > 45) { - _triggerReconnect(); + // Get audio level from remote participant (the agent) + double audioLevel = 0.0; + final participants = _room?.remoteParticipants.values; + if (participants != null && participants.isNotEmpty) { + audioLevel = participants.first.audioLevel; } - }); - } - /// Cancel heartbeat check timer. - void _stopHeartbeat() { - _heartbeatCheckTimer?.cancel(); - _heartbeatCheckTimer = null; - } - - // --------------------------------------------------------------------------- - // Auto-reconnect with exponential backoff - // --------------------------------------------------------------------------- - - /// Called when the WebSocket drops unexpectedly (not user-initiated). - void _onWsDisconnected() { - if (_phase == _CallPhase.ended || _isReconnecting || _userEndedCall) return; - _triggerReconnect(); - } - - /// Attempt to reconnect with exponential backoff (1s → 2s → 4s → 8s → 16s). - /// - /// Strategy: - /// - 200 from /reconnect → session alive, open new WebSocket - /// - 404 from /reconnect → session expired/gone, stop retrying - /// - 409 from /reconnect → terminal state (ended), stop retrying - /// - Network error / timeout → transient, keep retrying - Future _triggerReconnect() async { - if (_isReconnecting || _phase == _CallPhase.ended || _userEndedCall) return; - _isReconnecting = true; - if (mounted) setState(() {}); - - // Tear down current connection & heartbeat - _stopHeartbeat(); - await _audioSubscription?.cancel(); - _audioSubscription = null; - try { - await _audioChannel?.sink.close(); - } catch (_) {} - _audioChannel = null; - - for (int attempt = 0; attempt < _maxReconnectAttempts; attempt++) { - _reconnectAttempts = attempt + 1; - if (!mounted || _phase == _CallPhase.ended || _userEndedCall) break; - - // Exponential backoff: 1s, 2s, 4s, 8s, 16s (first attempt waits 1s) - final delaySecs = min(pow(2, attempt).toInt(), 16); - await Future.delayed(Duration(seconds: delaySecs)); - - if (!mounted || _phase == _CallPhase.ended || _userEndedCall) break; - - try { - final dio = ref.read(dioClientProvider); - final response = await dio.post( - '${ApiEndpoints.voice}/sessions/$_sessionId/reconnect', - ); - final data = response.data as Map; - - // Session is alive — open new WebSocket - if (await _connectWebSocket(data)) return; // success - } on DioException catch (e) { - final statusCode = e.response?.statusCode; - if (statusCode == 404 || statusCode == 409) { - // 404: session expired / not found - // 409: session in terminal state (e.g. "ended") - // No point retrying — break immediately - break; + setState(() { + final rand = Random(); + for (int i = 0; i < _waveHeights.length; i++) { + _waveHeights[i] = + (audioLevel * 2.0 + rand.nextDouble() * 0.2).clamp(0.1, 1.0); } - // Other errors (network, timeout) → keep retrying - } catch (_) { - // Unexpected error → keep retrying - } - } - - // All reconnection attempts exhausted or session gone - _isReconnecting = false; - _onCallEnded(); - } - - /// Open a new WebSocket from reconnect response data. Returns true on success. - Future _connectWebSocket(Map data) async { - try { - final config = ref.read(appConfigProvider); - String wsUrl; - final backendWsUrl = data['websocket_url'] as String?; - if (backendWsUrl != null && backendWsUrl.startsWith('/')) { - final uri = Uri.parse(config.wsBaseUrl); - wsUrl = '${uri.scheme}://${uri.host}:${uri.port}$backendWsUrl'; - } else if (backendWsUrl != null) { - wsUrl = backendWsUrl; - } else { - wsUrl = '${config.wsBaseUrl}/api/v1/voice/ws/$_sessionId'; - } - - _audioChannel = WebSocketChannel.connect(Uri.parse(wsUrl)); - _audioSubscription = _audioChannel!.stream.listen( - _onWsData, - onDone: () => _onWsDisconnected(), - onError: (_) => _onWsDisconnected(), - ); - - // Reset audio jitter buffer for a fresh start - _pcmPlayer.reset(); - - // Restart heartbeat - _startHeartbeat(); - - _reconnectAttempts = 0; - _isReconnecting = false; - if (mounted) setState(() {}); - return true; - } catch (_) { - return false; - } - } - - /// Serialises async feed() calls so they don't interleave. - Future _feedChain = Future.value(); - - /// Handle incoming audio from the agent side. - void _onAudioReceived(dynamic data) { - if (!mounted || _phase != _CallPhase.active) return; - - Uint8List pcmData; - if (data is Uint8List) { - pcmData = data; - } else if (data is List) { - pcmData = Uint8List.fromList(data); - } else { - return; - } - - // Chain feed calls so they run sequentially (feed is async). - _feedChain = _feedChain.then((_) => _pcmPlayer.feed(pcmData)); - - // Drive waveform from actual audio energy (RMS) - _updateWaveform(pcmData); - } - - /// Calculate RMS from PCM data and update waveform bars. - void _updateWaveform(Uint8List pcmData) { - if (pcmData.length < 4) return; - final byteData = ByteData.sublistView(pcmData); - final sampleCount = pcmData.length ~/ 2; - - // Compute overall RMS - double sum = 0; - for (int i = 0; i < sampleCount; i++) { - final s = byteData.getInt16(i * 2, Endian.little).toDouble(); - sum += s * s; - } - final rms = sqrt(sum / sampleCount) / 32768.0; // normalize to [0,1] - - setState(() { - final rand = Random(); - for (int i = 0; i < _waveHeights.length; i++) { - _waveHeights[i] = - (rms * 2.0 + rand.nextDouble() * 0.2).clamp(0.1, 1.0); - } + }); }); } /// End the call and clean up all resources. Future _endCall() async { _userEndedCall = true; - _isReconnecting = false; setState(() => _phase = _CallPhase.ended); _stopwatch.stop(); _durationTimer?.cancel(); _waveController.stop(); + _waveTimer?.cancel(); - // Cancel heartbeat & reconnect timers - _stopHeartbeat(); - _reconnectTimer?.cancel(); - _reconnectTimer = null; + // Disconnect LiveKit room (handles all audio cleanup) + try { + await _room?.disconnect(); + } catch (_) {} + try { + await _room?.dispose(); + } catch (_) {} + _listener?.dispose(); - // Cleanup all resources — each wrapped in try/catch to ensure we always - // reach Navigator.pop() at the end. - try { - await _micSubscription?.cancel(); - _micSubscription = null; - } catch (_) {} - try { - await _recorder.stop(); - } catch (_) {} - try { - await _pcmPlayer.dispose(); - } catch (_) {} - try { - _audioSubscription?.cancel(); - } catch (_) {} - try { - await _audioChannel?.sink.close(); - } catch (_) {} - - // Delete voice session on the server (fire-and-forget) - if (_sessionId != null) { - try { - final dio = ref.read(dioClientProvider); - await dio.delete('${ApiEndpoints.voice}/sessions/$_sessionId') - .timeout(const Duration(seconds: 3)); - } catch (_) {} - } - - // Navigate back to the dial page + // Navigate back if (mounted) { await Future.delayed(const Duration(milliseconds: 500)); if (mounted) Navigator.of(context).pop(); @@ -455,6 +188,13 @@ class _AgentCallPageState extends ConsumerState _endCall(); } + /// Toggle microphone mute. + void _toggleMute() { + _isMuted = !_isMuted; + _room?.localParticipant?.setMicrophoneEnabled(!_isMuted); + setState(() {}); + } + // --------------------------------------------------------------------------- // Build // --------------------------------------------------------------------------- @@ -464,116 +204,78 @@ class _AgentCallPageState extends ConsumerState return Scaffold( backgroundColor: AppColors.background, body: SafeArea( - child: Stack( - children: [ - Center( - child: Column( - children: [ - const Spacer(flex: 2), - _buildAvatar(), - const SizedBox(height: 24), - Text( - _statusText, - textAlign: TextAlign.center, - style: const TextStyle( - fontSize: 24, fontWeight: FontWeight.bold), - ), - const SizedBox(height: 8), - Text( - _subtitleText, - textAlign: TextAlign.center, - style: const TextStyle( - color: AppColors.textSecondary, fontSize: 15), - ), - // Inline error message - if (_errorMessage != null) ...[ - const SizedBox(height: 16), - Padding( - padding: const EdgeInsets.symmetric(horizontal: 32), - child: Container( - padding: const EdgeInsets.symmetric( - horizontal: 16, vertical: 12), - decoration: BoxDecoration( - color: AppColors.error.withOpacity(0.1), - borderRadius: BorderRadius.circular(12), - border: Border.all( - color: AppColors.error.withOpacity(0.3)), - ), - child: Row( - mainAxisSize: MainAxisSize.min, - children: [ - const Icon(Icons.error_outline, - size: 18, color: AppColors.error), - const SizedBox(width: 8), - Flexible( - child: Text( - _errorMessage!, - textAlign: TextAlign.center, - style: const TextStyle( - color: AppColors.error, - fontSize: 13, - ), - ), - ), - ], - ), - ), - ), - ], - const SizedBox(height: 32), - if (_phase == _CallPhase.active) - Text( - _durationLabel, - style: const TextStyle( - fontSize: 40, - fontWeight: FontWeight.w300, - color: AppColors.textPrimary, - letterSpacing: 4, - ), - ), - const SizedBox(height: 24), - if (_phase == _CallPhase.active) _buildWaveform(), - const Spacer(flex: 3), - _buildControls(), - const SizedBox(height: 48), - ], + child: Center( + child: Column( + children: [ + const Spacer(flex: 2), + _buildAvatar(), + const SizedBox(height: 24), + Text( + _statusText, + textAlign: TextAlign.center, + style: const TextStyle( + fontSize: 24, fontWeight: FontWeight.bold), ), - ), - // Reconnecting overlay - if (_isReconnecting) - Positioned( - top: 0, - left: 0, - right: 0, - child: Container( - padding: - const EdgeInsets.symmetric(vertical: 10, horizontal: 16), - color: AppColors.warning.withOpacity(0.9), - child: Row( - mainAxisAlignment: MainAxisAlignment.center, - children: [ - const SizedBox( - width: 16, - height: 16, - child: CircularProgressIndicator( - strokeWidth: 2, - color: Colors.white, + const SizedBox(height: 8), + Text( + _subtitleText, + textAlign: TextAlign.center, + style: const TextStyle( + color: AppColors.textSecondary, fontSize: 15), + ), + // Inline error message + if (_errorMessage != null) ...[ + const SizedBox(height: 16), + Padding( + padding: const EdgeInsets.symmetric(horizontal: 32), + child: Container( + padding: const EdgeInsets.symmetric( + horizontal: 16, vertical: 12), + decoration: BoxDecoration( + color: AppColors.error.withOpacity(0.1), + borderRadius: BorderRadius.circular(12), + border: Border.all( + color: AppColors.error.withOpacity(0.3)), + ), + child: Row( + mainAxisSize: MainAxisSize.min, + children: [ + const Icon(Icons.error_outline, + size: 18, color: AppColors.error), + const SizedBox(width: 8), + Flexible( + child: Text( + _errorMessage!, + textAlign: TextAlign.center, + style: const TextStyle( + color: AppColors.error, + fontSize: 13, + ), + ), ), - ), - const SizedBox(width: 10), - Text( - '重新连接中... ($_reconnectAttempts/$_maxReconnectAttempts)', - style: const TextStyle( - color: Colors.white, - fontSize: 14, - fontWeight: FontWeight.w500, - ), - ), - ], + ], + ), ), ), - ), - ], + ], + const SizedBox(height: 32), + if (_phase == _CallPhase.active) + Text( + _durationLabel, + style: const TextStyle( + fontSize: 40, + fontWeight: FontWeight.w300, + color: AppColors.textPrimary, + letterSpacing: 4, + ), + ), + const SizedBox(height: 24), + if (_phase == _CallPhase.active) _buildWaveform(), + const Spacer(flex: 3), + _buildControls(), + const SizedBox(height: 48), + ], + ), ), ), ); @@ -611,7 +313,6 @@ class _AgentCallPageState extends ConsumerState /// Extract a short user-friendly message from an exception. String _friendlyError(dynamic e) { final s = e.toString(); - // Extract status code info for Dio errors final match = RegExp(r'status code of (\d+)').firstMatch(s); if (match != null) { final code = match.group(1); @@ -719,7 +420,7 @@ class _AgentCallPageState extends ConsumerState icon: _isMuted ? Icons.mic_off : Icons.mic, label: _isMuted ? '取消静音' : '静音', isActive: _isMuted, - onTap: () => setState(() => _isMuted = !_isMuted), + onTap: _toggleMute, ), const SizedBox(width: 24), FloatingActionButton( @@ -730,12 +431,11 @@ class _AgentCallPageState extends ConsumerState ), const SizedBox(width: 24), _CircleButton( - icon: _isSpeakerOn ? Icons.volume_up : Icons.volume_down, + icon: Icons.volume_up, label: '扬声器', - isActive: _isSpeakerOn, + isActive: false, onTap: () { - setState(() => _isSpeakerOn = !_isSpeakerOn); - _pcmPlayer.setSpeakerOn(_isSpeakerOn); + // WebRTC handles audio routing automatically }, ), ], @@ -751,16 +451,12 @@ class _AgentCallPageState extends ConsumerState void dispose() { _userEndedCall = true; _durationTimer?.cancel(); - _stopHeartbeat(); - _reconnectTimer?.cancel(); + _waveTimer?.cancel(); _waveController.dispose(); _stopwatch.stop(); - _micSubscription?.cancel(); - _recorder.dispose(); - _enhancer.dispose(); - _audioSubscription?.cancel(); - _audioChannel?.sink.close(); - _pcmPlayer.dispose(); + _room?.disconnect(); + _room?.dispose(); + _listener?.dispose(); super.dispose(); } } diff --git a/it0_app/pubspec.yaml b/it0_app/pubspec.yaml index 0d592ad..f55f896 100644 --- a/it0_app/pubspec.yaml +++ b/it0_app/pubspec.yaml @@ -40,6 +40,7 @@ dependencies: file_picker: ^8.0.0 # Voice + livekit_client: ^2.3.0 record: ^6.0.0 flutter_tts: ^4.0.0 sherpa_onnx: ^1.12.25 diff --git a/packages/services/voice-agent/Dockerfile b/packages/services/voice-agent/Dockerfile new file mode 100644 index 0000000..0104b6e --- /dev/null +++ b/packages/services/voice-agent/Dockerfile @@ -0,0 +1,17 @@ +FROM python:3.11-slim + +WORKDIR /app + +# Install system dependencies for audio processing +RUN apt-get update && apt-get install -y \ + ffmpeg \ + libsndfile1 \ + && rm -rf /var/lib/apt/lists/* + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY src/ ./src/ + +# LiveKit agent worker entry point +CMD ["python", "-m", "src.agent", "start"] diff --git a/packages/services/voice-agent/requirements.txt b/packages/services/voice-agent/requirements.txt new file mode 100644 index 0000000..a843251 --- /dev/null +++ b/packages/services/voice-agent/requirements.txt @@ -0,0 +1,16 @@ +livekit>=1.0.0 +livekit-agents>=1.0.0 +livekit-plugins-silero>=1.0.0 +livekit-plugins-openai>=1.0.0 +faster-whisper==1.2.1 +kokoro==0.3.5 +misaki[zh]==0.7.17 +websockets==12.0 +httpx==0.27.0 +numpy>=1.26.4 +torch>=2.0.0 +pydantic-settings==2.2.0 +ordered-set +pypinyin +cn2an +jieba diff --git a/packages/services/voice-agent/src/__init__.py b/packages/services/voice-agent/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/packages/services/voice-agent/src/agent.py b/packages/services/voice-agent/src/agent.py new file mode 100644 index 0000000..de36a83 --- /dev/null +++ b/packages/services/voice-agent/src/agent.py @@ -0,0 +1,188 @@ +""" +IT0 Voice Agent — LiveKit Agents entry point. + +This is a LiveKit Agent Worker that handles real-time voice conversations. +It connects to the LiveKit server, waits for users to join a room, and runs +the voice pipeline: VAD → STT → LLM (via agent-service) → TTS. + +Usage: + python -m src.agent start +""" + +import json +import logging + +from livekit import agents, rtc +from livekit.agents import AgentSession, Agent, RoomInputOptions, JobContext, JobProcess, cli, WorkerOptions +from livekit.plugins import silero + +from .config import settings +from .plugins.agent_llm import AgentServiceLLM +from .plugins.whisper_stt import LocalWhisperSTT +from .plugins.kokoro_tts import LocalKokoroTTS, patch_misaki_compat + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class IT0VoiceAgent(Agent): + """Voice agent for IT0 server operations platform.""" + + def __init__(self): + super().__init__( + instructions=( + "你是 IT0 服务器运维助手。用户通过语音与你对话," + "你帮助管理和监控服务器集群。回答简洁,适合语音对话场景。" + ), + ) + + +def prewarm(proc: JobProcess) -> None: + """Pre-load ML models into shared process memory. + + Called once per worker process. Models are shared across all sessions + handled by this process, avoiding redundant loading. + """ + logger.info( + "Prewarming models (stt=%s, tts=%s, device=%s)", + settings.stt_provider, + settings.tts_provider, + settings.device, + ) + + # VAD — always needed + proc.userdata["vad"] = silero.VAD.load() + logger.info("VAD loaded: Silero VAD") + + # STT — local faster-whisper + if settings.stt_provider == "local": + from faster_whisper import WhisperModel + + compute_type = "float16" if settings.device == "cuda" else "int8" + try: + model = WhisperModel( + settings.whisper_model, + device=settings.device, + compute_type=compute_type, + ) + except Exception as e: + logger.warning("Whisper GPU failed, falling back to CPU: %s", e) + model = WhisperModel( + settings.whisper_model, device="cpu", compute_type="int8" + ) + proc.userdata["whisper_model"] = model + logger.info("STT loaded: faster-whisper %s", settings.whisper_model) + else: + proc.userdata["whisper_model"] = None + logger.info("STT: using OpenAI %s", settings.openai_stt_model) + + # TTS — local Kokoro + if settings.tts_provider == "local": + patch_misaki_compat() + from kokoro import KPipeline + + proc.userdata["kokoro_pipeline"] = KPipeline(lang_code="z") + logger.info("TTS loaded: Kokoro-82M voice=%s", settings.kokoro_voice) + else: + proc.userdata["kokoro_pipeline"] = None + logger.info("TTS: using OpenAI %s", settings.openai_tts_model) + + logger.info("Prewarm complete.") + + +async def entrypoint(ctx: JobContext) -> None: + """Main entrypoint — called for each voice session (room join).""" + await ctx.connect(auto_subscribe=agents.AutoSubscribe.AUDIO_ONLY) + + # Wait for a participant (the Flutter user) to join + participant = await ctx.wait_for_participant() + logger.info( + "Participant joined: identity=%s, name=%s", + participant.identity, + participant.name, + ) + + # Extract auth header from room agent dispatch metadata + # The token endpoint embeds {"auth_header": "Bearer ..."} in dispatch metadata + auth_header = "" + dispatch_metadata = ctx.room.name # fallback + try: + # Try to get metadata from the agent dispatch + job = ctx.job + if job and hasattr(job, "agent_dispatch") and job.agent_dispatch: + meta_str = job.agent_dispatch.metadata or "{}" + meta = json.loads(meta_str) + auth_header = meta.get("auth_header", "") + except Exception: + pass + + # Fallback: try participant metadata + if not auth_header: + try: + meta_str = participant.metadata or "{}" + meta = json.loads(meta_str) + auth_header = meta.get("auth_header", "") + except Exception: + pass + + logger.info("Auth header present: %s", bool(auth_header)) + + # Build STT + if settings.stt_provider == "openai": + from livekit.plugins import openai as openai_plugin + + stt = openai_plugin.STT( + model=settings.openai_stt_model, + language=settings.whisper_language, + ) + else: + stt = LocalWhisperSTT( + model=ctx.proc.userdata.get("whisper_model"), + language=settings.whisper_language, + ) + + # Build TTS + if settings.tts_provider == "openai": + from livekit.plugins import openai as openai_plugin + + tts = openai_plugin.TTS( + model=settings.openai_tts_model, + voice=settings.openai_tts_voice, + ) + else: + tts = LocalKokoroTTS( + pipeline=ctx.proc.userdata.get("kokoro_pipeline"), + voice=settings.kokoro_voice, + ) + + # Build custom LLM (proxies to agent-service) + llm = AgentServiceLLM( + agent_service_url=settings.agent_service_url, + auth_header=auth_header, + ) + + # Create and start AgentSession + session = AgentSession( + vad=ctx.proc.userdata["vad"], + stt=stt, + llm=llm, + tts=tts, + ) + + await session.start( + room=ctx.room, + agent=IT0VoiceAgent(), + participant=participant, + ) + + logger.info("Voice session started for participant %s", participant.identity) + + +if __name__ == "__main__": + cli.run_app( + WorkerOptions( + entrypoint_fnc=entrypoint, + prewarm_fnc=prewarm, + agent_name="voice-agent", + ) + ) diff --git a/packages/services/voice-agent/src/config.py b/packages/services/voice-agent/src/config.py new file mode 100644 index 0000000..b5600a2 --- /dev/null +++ b/packages/services/voice-agent/src/config.py @@ -0,0 +1,40 @@ +from pydantic_settings import BaseSettings + + +class Settings(BaseSettings): + """Voice agent configuration.""" + + # LiveKit + livekit_url: str = "ws://livekit-server:7880" + livekit_api_key: str = "devkey" + livekit_api_secret: str = "devsecret" + + # Agent Service + agent_service_url: str = "http://agent-service:3002" + + # STT / TTS provider selection + stt_provider: str = "local" # "local" or "openai" + tts_provider: str = "local" # "local" or "openai" + + # Local STT (faster-whisper) + whisper_model: str = "base" + whisper_language: str = "zh" + + # Local TTS (Kokoro-82M) + kokoro_voice: str = "zf_xiaoxiao" + + # OpenAI (fallback) + openai_api_key: str = "" + openai_base_url: str = "" + openai_stt_model: str = "gpt-4o-transcribe" + openai_tts_model: str = "tts-1" + openai_tts_voice: str = "alloy" + + # Device + device: str = "cpu" # "cpu" or "cuda" + + class Config: + env_file = ".env" + + +settings = Settings() diff --git a/packages/services/voice-agent/src/plugins/__init__.py b/packages/services/voice-agent/src/plugins/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/packages/services/voice-agent/src/plugins/agent_llm.py b/packages/services/voice-agent/src/plugins/agent_llm.py new file mode 100644 index 0000000..4338d7e --- /dev/null +++ b/packages/services/voice-agent/src/plugins/agent_llm.py @@ -0,0 +1,252 @@ +""" +Custom LLM plugin that proxies to IT0 agent-service. + +Instead of calling Claude directly, this plugin: +1. POSTs to agent-service /api/v1/agent/tasks (creates a task with engineType=claude_api) +2. Subscribes to the agent-service WebSocket /ws/agent for streaming text events +3. Emits ChatChunk objects into the LiveKit pipeline + +This preserves all agent-service capabilities: Tool Use, conversation history, +tenant isolation, and session management. +""" + +import asyncio +import json +import logging +import uuid +from typing import Any + +import httpx +import websockets + +from livekit.agents import llm +from livekit.agents.types import ( + DEFAULT_API_CONNECT_OPTIONS, + NOT_GIVEN, + APIConnectOptions, + NotGivenOr, +) + +logger = logging.getLogger(__name__) + + +class AgentServiceLLM(llm.LLM): + """LLM that proxies to IT0 agent-service for Claude + Tool Use.""" + + def __init__( + self, + *, + agent_service_url: str = "http://agent-service:3002", + auth_header: str = "", + ): + super().__init__() + self._agent_service_url = agent_service_url + self._auth_header = auth_header + self._agent_session_id: str | None = None + + @property + def model(self) -> str: + return "agent-service-proxy" + + @property + def provider(self) -> str: + return "it0-agent" + + def chat( + self, + *, + chat_ctx: llm.ChatContext, + tools: list[llm.Tool] | None = None, + conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, + parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN, + tool_choice: NotGivenOr[llm.ToolChoice] = NOT_GIVEN, + extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN, + ) -> "AgentServiceLLMStream": + return AgentServiceLLMStream( + llm_instance=self, + chat_ctx=chat_ctx, + tools=tools or [], + conn_options=conn_options, + ) + + +class AgentServiceLLMStream(llm.LLMStream): + """Streams text from agent-service via WebSocket.""" + + def __init__( + self, + *, + llm_instance: AgentServiceLLM, + chat_ctx: llm.ChatContext, + tools: list[llm.Tool], + conn_options: APIConnectOptions, + ): + super().__init__( + llm_instance, + chat_ctx=chat_ctx, + tools=tools, + conn_options=conn_options, + ) + self._llm_instance = llm_instance + + async def _run(self) -> None: + # Extract the latest user message from ChatContext + user_text = "" + for msg in reversed(self._chat_ctx.items): + if msg.role == "user": + user_text = msg.text_content + break + + if not user_text: + logger.warning("No user message found in chat context") + return + + agent_url = self._llm_instance._agent_service_url + auth_header = self._llm_instance._auth_header + + headers: dict[str, str] = {"Content-Type": "application/json"} + if auth_header: + headers["Authorization"] = auth_header + + ws_url = agent_url.replace("http://", "ws://").replace("https://", "wss://") + ws_url = f"{ws_url}/ws/agent" + + request_id = f"agent-{uuid.uuid4().hex[:12]}" + timeout_secs = 120 + + try: + logger.info("Connecting to agent-service WS: %s", ws_url) + async with websockets.connect(ws_url) as ws: + # 1. Pre-subscribe with existing session ID (for event buffering) + if self._llm_instance._agent_session_id: + await ws.send(json.dumps({ + "event": "subscribe_session", + "data": {"sessionId": self._llm_instance._agent_session_id}, + })) + + # 2. Create agent task + body: dict[str, Any] = { + "prompt": user_text, + "engineType": "claude_api", + } + if self._llm_instance._agent_session_id: + body["sessionId"] = self._llm_instance._agent_session_id + + logger.info("POST /tasks prompt=%s", user_text[:80]) + async with httpx.AsyncClient(timeout=30) as client: + resp = await client.post( + f"{agent_url}/api/v1/agent/tasks", + json=body, + headers=headers, + ) + + if resp.status_code not in (200, 201): + logger.error( + "Task creation failed: %d %s", + resp.status_code, resp.text[:200], + ) + self._event_ch.send_nowait( + llm.ChatChunk( + id=request_id, + delta=llm.ChoiceDelta( + role="assistant", + content="抱歉,Agent服务暂时不可用。", + ), + ) + ) + return + + data = resp.json() + session_id = data.get("sessionId", "") + task_id = data.get("taskId", "") + self._llm_instance._agent_session_id = session_id + logger.info( + "Task created: session=%s, task=%s", session_id, task_id + ) + + # 3. Subscribe with actual IDs + await ws.send(json.dumps({ + "event": "subscribe_session", + "data": {"sessionId": session_id, "taskId": task_id}, + })) + + # 4. Send initial role delta + self._event_ch.send_nowait( + llm.ChatChunk( + id=request_id, + delta=llm.ChoiceDelta(role="assistant"), + ) + ) + + # 5. Stream events → ChatChunk + import time + deadline = time.time() + timeout_secs + + while time.time() < deadline: + remaining = deadline - time.time() + try: + raw = await asyncio.wait_for( + ws.recv(), timeout=min(5.0, remaining) + ) + except asyncio.TimeoutError: + if time.time() >= deadline: + logger.warning("Agent stream timeout after %ds", timeout_secs) + continue + except websockets.exceptions.ConnectionClosed: + logger.warning("Agent WS connection closed") + break + + try: + msg = json.loads(raw) + except (json.JSONDecodeError, TypeError): + continue + + event_type = msg.get("event", "") + + if event_type == "stream_event": + evt_data = msg.get("data", {}) + evt_type = evt_data.get("type", "") + + if evt_type == "text": + content = evt_data.get("content", "") + if content: + self._event_ch.send_nowait( + llm.ChatChunk( + id=request_id, + delta=llm.ChoiceDelta(content=content), + ) + ) + + elif evt_type == "completed": + # If no text was streamed, use the summary + summary = evt_data.get("summary", "") + if summary: + # Check if we already sent text chunks + pass # LiveKit pipeline handles this + logger.info("Agent stream completed") + return + + elif evt_type == "error": + err_msg = evt_data.get("message", "Unknown error") + logger.error("Agent error: %s", err_msg) + self._event_ch.send_nowait( + llm.ChatChunk( + id=request_id, + delta=llm.ChoiceDelta( + content=f"Agent 错误: {err_msg}" + ), + ) + ) + return + + except Exception as exc: + logger.error("Agent stream error: %s: %s", type(exc).__name__, exc) + self._event_ch.send_nowait( + llm.ChatChunk( + id=request_id, + delta=llm.ChoiceDelta( + role="assistant", + content="抱歉,Agent服务暂时不可用。", + ), + ) + ) diff --git a/packages/services/voice-agent/src/plugins/kokoro_tts.py b/packages/services/voice-agent/src/plugins/kokoro_tts.py new file mode 100644 index 0000000..d4b1cb0 --- /dev/null +++ b/packages/services/voice-agent/src/plugins/kokoro_tts.py @@ -0,0 +1,129 @@ +""" +Custom TTS plugin using local Kokoro-82M model. + +Kokoro outputs 24kHz float32 audio. We declare sample_rate=24000 and let +LiveKit handle resampling to 48kHz for WebRTC transport. +""" + +import asyncio +import logging + +import numpy as np +from livekit import rtc + +from livekit.agents import tts +from livekit.agents.tts import AudioEmitter +from livekit.agents.types import ( + DEFAULT_API_CONNECT_OPTIONS, + APIConnectOptions, +) + +logger = logging.getLogger(__name__) + + +def patch_misaki_compat(): + """Patch misaki.en compatibility: MutableToken was renamed to MToken.""" + try: + import misaki.en as en + if not hasattr(en, "MutableToken") and hasattr(en, "MToken"): + en.MutableToken = en.MToken + except ImportError: + pass + + +class LocalKokoroTTS(tts.TTS): + """Text-to-Speech using local Kokoro-82M model.""" + + def __init__(self, *, pipeline=None, voice: str = "zf_xiaoxiao"): + super().__init__( + capabilities=tts.TTSCapabilities(streaming=False), + sample_rate=24000, # Kokoro native output rate + num_channels=1, + ) + self._pipeline = pipeline # KPipeline instance (pre-loaded in prewarm) + self._voice = voice + + @property + def model(self) -> str: + return "kokoro-82m" + + @property + def provider(self) -> str: + return "local" + + def synthesize( + self, + text: str, + *, + conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, + ) -> "KokoroChunkedStream": + return KokoroChunkedStream( + tts=self, + input_text=text, + conn_options=conn_options, + ) + + +class KokoroChunkedStream(tts.ChunkedStream): + """Non-streaming TTS synthesis with Kokoro.""" + + def __init__( + self, + *, + tts: LocalKokoroTTS, + input_text: str, + conn_options: APIConnectOptions, + ): + super().__init__(tts=tts, input_text=input_text, conn_options=conn_options) + self._kokoro_tts = tts + + async def _run(self, output_emitter: AudioEmitter) -> None: + if self._kokoro_tts._pipeline is None: + logger.error("Kokoro pipeline not loaded") + return + + text = self._input_text + if not text or not text.strip(): + return + + loop = asyncio.get_event_loop() + pipeline = self._kokoro_tts._pipeline + voice = self._kokoro_tts._voice + + def _synthesize(): + samples = [] + for _, _, audio in pipeline(text, voice=voice): + if hasattr(audio, "numpy"): + samples.append(audio.numpy()) + else: + samples.append(audio) + if not samples: + return None + return np.concatenate(samples) + + audio_np = await loop.run_in_executor(None, _synthesize) + if audio_np is None or len(audio_np) == 0: + logger.warning("Kokoro produced no audio for: '%s'", text[:50]) + return + + # Convert float32 → int16 PCM + pcm_int16 = (audio_np * 32768).clip(-32768, 32767).astype(np.int16) + + # Initialize emitter with Kokoro's native 24kHz + request_id = text[:16].replace("\n", " ") + output_emitter.initialize( + request_id=request_id, + sample_rate=24000, + num_channels=1, + mime_type="audio/pcm", + ) + + # Push audio data + output_emitter.push(pcm_int16.tobytes()) + + logger.info( + "TTS synthesized: '%s' → %d samples (%.1fs)", + text[:30], + len(pcm_int16), + len(pcm_int16) / 24000, + ) diff --git a/packages/services/voice-agent/src/plugins/whisper_stt.py b/packages/services/voice-agent/src/plugins/whisper_stt.py new file mode 100644 index 0000000..66d7d3f --- /dev/null +++ b/packages/services/voice-agent/src/plugins/whisper_stt.py @@ -0,0 +1,117 @@ +""" +Custom STT plugin using local faster-whisper model. + +Implements batch recognition (not streaming): LiveKit's AgentSession collects +audio during speech (using Silero VAD), and when silence is detected, passes +the complete AudioBuffer to _recognize_impl() for transcription. +""" + +import asyncio +import logging + +import numpy as np + +from livekit.agents import stt +from livekit.agents.types import ( + DEFAULT_API_CONNECT_OPTIONS, + NOT_GIVEN, + APIConnectOptions, + NotGivenOr, +) + +logger = logging.getLogger(__name__) + + +class LocalWhisperSTT(stt.STT): + """Speech-to-Text using local faster-whisper model.""" + + def __init__(self, *, model=None, language: str = "zh"): + super().__init__( + capabilities=stt.STTCapabilities( + streaming=False, + interim_results=False, + ) + ) + self._model = model # WhisperModel instance (pre-loaded in prewarm) + self._language = language + + @property + def model(self) -> str: + return "faster-whisper" + + @property + def provider(self) -> str: + return "local" + + async def _recognize_impl( + self, + buffer, + *, + language: NotGivenOr[str] = NOT_GIVEN, + conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, + ) -> stt.SpeechEvent: + if self._model is None: + logger.error("Whisper model not loaded") + return stt.SpeechEvent( + type=stt.SpeechEventType.FINAL_TRANSCRIPT, + alternatives=[stt.SpeechData(language=self._language, text="")], + ) + + # Convert AudioBuffer to numpy float32 array + # LiveKit AudioBuffer provides audio frames; we need to collect them + audio_frames = buffer if isinstance(buffer, list) else [buffer] + pcm_data = bytearray() + sample_rate = 16000 + + for frame in audio_frames: + if hasattr(frame, "data"): + pcm_data.extend(frame.data) + if hasattr(frame, "sample_rate"): + sample_rate = frame.sample_rate + elif isinstance(frame, bytes): + pcm_data.extend(frame) + else: + # Try to get raw data from the frame + try: + pcm_data.extend(bytes(frame)) + except TypeError: + pass + + if not pcm_data: + return stt.SpeechEvent( + type=stt.SpeechEventType.FINAL_TRANSCRIPT, + alternatives=[stt.SpeechData(language=self._language, text="")], + ) + + # Convert PCM int16 to float32 normalized [-1.0, 1.0] + audio_np = np.frombuffer(bytes(pcm_data), dtype=np.int16).astype(np.float32) / 32768.0 + + lang = language if language is not NOT_GIVEN else self._language + + # Run transcription in thread pool to avoid blocking + loop = asyncio.get_event_loop() + + def _transcribe(): + segments, info = self._model.transcribe( + audio_np, + language=lang, + beam_size=5, + vad_filter=True, + ) + return "".join(seg.text for seg in segments) + + text = await loop.run_in_executor(None, _transcribe) + text = text.strip() + + logger.info("STT result: '%s'", text[:100] if text else "(empty)") + + return stt.SpeechEvent( + type=stt.SpeechEventType.FINAL_TRANSCRIPT, + alternatives=[ + stt.SpeechData( + language=lang, + text=text, + confidence=1.0, + ) + ], + ) diff --git a/packages/services/voice-service/requirements.txt b/packages/services/voice-service/requirements.txt index 2bba1c9..c2eba85 100644 --- a/packages/services/voice-service/requirements.txt +++ b/packages/services/voice-service/requirements.txt @@ -19,3 +19,4 @@ ordered-set pypinyin cn2an jieba +livekit>=1.0.0 diff --git a/packages/services/voice-service/src/api/livekit_token.py b/packages/services/voice-service/src/api/livekit_token.py new file mode 100644 index 0000000..1e6273c --- /dev/null +++ b/packages/services/voice-service/src/api/livekit_token.py @@ -0,0 +1,63 @@ +""" +LiveKit token generation endpoint. + +Generates a LiveKit room JWT for voice calls. The token includes: +- A unique room name and participant identity +- Agent dispatch configuration (auto-spawns voice-agent) +- The caller's auth header in dispatch metadata (for agent-service auth) +""" + +import json +import uuid + +from fastapi import APIRouter, Request + +from livekit.api import AccessToken, VideoGrants, RoomAgentDispatch, RoomConfiguration + +from ..config.settings import settings + +router = APIRouter() + + +@router.post("/livekit/token") +async def create_livekit_token(request: Request): + """Generate a LiveKit room token for a voice call session. + + The caller's Authorization header is embedded in the agent dispatch + metadata so that the voice-agent can forward it to agent-service. + """ + auth_header = request.headers.get("authorization", "") + + room_name = f"voice-{uuid.uuid4().hex[:12]}" + participant_identity = f"user-{uuid.uuid4().hex[:8]}" + + token = ( + AccessToken(settings.livekit_api_key, settings.livekit_api_secret) + .with_identity(participant_identity) + .with_name("User") + .with_grants( + VideoGrants( + room_join=True, + room=room_name, + can_publish=True, + can_subscribe=True, + can_publish_data=True, + ) + ) + .with_room_config( + RoomConfiguration( + agents=[ + RoomAgentDispatch( + agent_name="voice-agent", + metadata=json.dumps({"auth_header": auth_header}), + ) + ], + ) + ) + ) + + return { + "token": token.to_jwt(), + "room_name": room_name, + "livekit_url": settings.livekit_ws_url, + } diff --git a/packages/services/voice-service/src/api/main.py b/packages/services/voice-service/src/api/main.py index 8bdb142..7ec9a3e 100644 --- a/packages/services/voice-service/src/api/main.py +++ b/packages/services/voice-service/src/api/main.py @@ -11,6 +11,7 @@ from .health import router as health_router from .session_router import router as session_router from .twilio_webhook import router as twilio_router from .test_tts import router as test_tts_router +from .livekit_token import router as livekit_token_router logger = logging.getLogger(__name__) @@ -31,6 +32,7 @@ app.include_router(health_router, tags=["health"]) app.include_router(session_router, prefix="/api/v1/voice", tags=["sessions"]) app.include_router(twilio_router, prefix="/api/v1/twilio", tags=["twilio"]) app.include_router(test_tts_router, prefix="/api/v1/test", tags=["test"]) +app.include_router(livekit_token_router, prefix="/api/v1/voice", tags=["livekit"]) # --------------------------------------------------------------------------- diff --git a/packages/services/voice-service/src/config/settings.py b/packages/services/voice-service/src/config/settings.py index f4e2521..fa133cb 100644 --- a/packages/services/voice-service/src/config/settings.py +++ b/packages/services/voice-service/src/config/settings.py @@ -39,6 +39,11 @@ class Settings(BaseSettings): # Device (cpu or cuda) device: str = "cpu" + # LiveKit + livekit_api_key: str = "devkey" + livekit_api_secret: str = "devsecret" + livekit_ws_url: str = "ws://livekit-server:7880" + # Twilio twilio_account_sid: str = "" twilio_auth_token: str = ""