feat: migrate voice call from WebSocket/PCM to LiveKit WebRTC

实时语音对话架构迁移:WebSocket → LiveKit WebRTC

## 背景
原语音通话架构基于 FastAPI WebSocket 传输原始 PCM,管道串行执行
(VAD → 批量STT → Agent → 攒句 → 批量TTS),首音频延迟约 6 秒。
迁移到 LiveKit Agents 框架后,利用 WebRTC 传输 + 流水线并行,
预期延迟降至 1.5-2 秒。

## 架构
Flutter App ←── WebRTC (Opus/UDP) ──→ LiveKit Server ←──→ Voice Agent
  livekit_client                      (自部署, Go)       (Python, LiveKit Agents SDK)
                                                          ├─ VAD (Silero)
                                                          ├─ STT (faster-whisper / OpenAI)
                                                          ├─ LLM (自定义插件 → agent-service)
                                                          └─ TTS (Kokoro / OpenAI)

关键设计:LLM 不直接调用 Claude API,而是通过自定义插件代理到现有
agent-service,保留 Tool Use、会话历史、租户隔离等能力。

## 新增服务

### voice-agent (packages/services/voice-agent/)
LiveKit Agent Worker,包含:
- agent.py: 入口,prewarm() 预加载模型,entrypoint() 编排会话
- plugins/agent_llm.py: 自定义 LLM 插件,代理 agent-service API
  - POST /api/v1/agent/tasks 创建任务
  - WS /ws/agent 订阅流式事件 (stream_event)
  - 跨轮复用 session_id 保持对话上下文
- plugins/whisper_stt.py: 本地 faster-whisper STT (批量识别)
- plugins/kokoro_tts.py: 本地 Kokoro-82M TTS (24kHz PCM)
- config.py: pydantic-settings 配置

### LiveKit Server (deploy/docker/)
- livekit.yaml: 信令端口 7880, RTC TCP 7881, UDP 50000-50200
- docker-compose.yml: 新增 livekit-server + voice-agent 容器

### LiveKit Token 端点
- voice-service/src/api/livekit_token.py:
  POST /api/v1/voice/livekit/token
  生成 Room JWT,嵌入 auth_header 到 AgentDispatch metadata

## Flutter 客户端改造
- agent_call_page.dart: 从 ~814 行简化到 ~380 行
  - 替换: WebSocketChannel, AudioRecorder, PcmPlayer, 手动心跳/重连
  - 使用: Room.connect(), setMicrophoneEnabled(true), LiveKit 事件监听
  - 波形动画改用 participant.audioLevel
- pubspec.yaml: 添加 livekit_client: ^2.3.0
- app_config.dart: 增加 livekitUrl 字段
- api_endpoints.dart: 增加 livekitToken 端点

## 配置说明 (环境变量)
- STT_PROVIDER: local (默认, faster-whisper) / openai
- TTS_PROVIDER: local (默认, Kokoro) / openai
- WHISPER_MODEL: base (默认) / small / medium / large
- WHISPER_LANGUAGE: zh (默认)
- KOKORO_VOICE: zf_xiaoxiao (默认)
- DEVICE: cpu (默认) / cuda

## 不变的部分
- agent-service: 完全不改,voice-agent 通过现有 API 调用
- voice-service 核心: pipeline/STT/TTS/VAD 保留 (Twilio 备用)
- Kong 网关: 现有路由不变
- 数据库: 无 schema 变更

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-02-28 08:55:33 -08:00
parent 7e44ddc358
commit 94a14b3104
19 changed files with 1059 additions and 459 deletions

View File

@ -311,6 +311,59 @@ services:
networks:
- it0-network
# ===== LiveKit Infrastructure =====
livekit-server:
image: livekit/livekit-server:latest
container_name: it0-livekit-server
restart: unless-stopped
command: --config /etc/livekit.yaml
ports:
- "17880:7880"
- "17881:7881"
- "50000-50200:50000-50200/udp"
volumes:
- ./livekit.yaml:/etc/livekit.yaml:ro
healthcheck:
test: ["CMD-SHELL", "wget --no-verbose --tries=1 --spider http://localhost:7880 || exit 1"]
interval: 10s
timeout: 5s
retries: 3
networks:
- it0-network
voice-agent:
build:
context: ../../packages/services/voice-agent
container_name: it0-voice-agent
restart: unless-stopped
volumes:
- ../../data/voice-models/huggingface:/root/.cache/huggingface
- ../../data/voice-models/torch-hub:/root/.cache/torch/hub
environment:
- LIVEKIT_URL=ws://livekit-server:7880
- LIVEKIT_API_KEY=devkey
- LIVEKIT_API_SECRET=devsecret
- AGENT_SERVICE_URL=http://agent-service:3002
- STT_PROVIDER=${STT_PROVIDER:-local}
- TTS_PROVIDER=${TTS_PROVIDER:-local}
- WHISPER_MODEL=${WHISPER_MODEL:-base}
- WHISPER_LANGUAGE=${WHISPER_LANGUAGE:-zh}
- KOKORO_VOICE=${KOKORO_VOICE:-zf_xiaoxiao}
- DEVICE=${VOICE_DEVICE:-cpu}
- OPENAI_API_KEY=${OPENAI_API_KEY}
- OPENAI_BASE_URL=${OPENAI_BASE_URL}
- OPENAI_STT_MODEL=${OPENAI_STT_MODEL:-gpt-4o-transcribe}
- OPENAI_TTS_MODEL=${OPENAI_TTS_MODEL:-tts-1}
- OPENAI_TTS_VOICE=${OPENAI_TTS_VOICE:-alloy}
depends_on:
livekit-server:
condition: service_healthy
agent-service:
condition: service_healthy
networks:
- it0-network
# ===== Voice Service (legacy WebSocket + Twilio) =====
voice-service:
build:
context: ../../packages/services/voice-service
@ -336,6 +389,9 @@ services:
- OPENAI_STT_MODEL=${OPENAI_STT_MODEL:-gpt-4o-transcribe}
- OPENAI_TTS_MODEL=${OPENAI_TTS_MODEL:-tts-1}
- OPENAI_TTS_VOICE=${OPENAI_TTS_VOICE:-alloy}
- LIVEKIT_API_KEY=devkey
- LIVEKIT_API_SECRET=devsecret
- LIVEKIT_WS_URL=ws://livekit-server:7880
healthcheck:
test: ["CMD-SHELL", "python3 -c \"import urllib.request; urllib.request.urlopen('http://localhost:3008/docs')\""]
interval: 30s

View File

@ -0,0 +1,12 @@
port: 7880
rtc:
port_range_start: 50000
port_range_end: 50200
tcp_port: 7881
use_external_ip: true
keys:
devkey: devsecret
logging:
level: info

View File

@ -40,6 +40,7 @@ class ApiEndpoints {
// Voice
static const String transcribe = '$voice/transcribe';
static const String livekitToken = '$voice/livekit/token';
// Admin Settings
static const String adminSettings = '/api/v1/admin/settings';

View File

@ -3,23 +3,27 @@ import 'package:flutter_riverpod/flutter_riverpod.dart';
class AppConfig {
final String apiBaseUrl;
final String wsBaseUrl;
final String livekitUrl;
final String environment;
const AppConfig({
required this.apiBaseUrl,
required this.wsBaseUrl,
required this.livekitUrl,
required this.environment,
});
factory AppConfig.development() => const AppConfig(
apiBaseUrl: 'http://10.0.2.2:18000',
wsBaseUrl: 'ws://10.0.2.2:18000',
livekitUrl: 'ws://10.0.2.2:17880',
environment: 'development',
);
factory AppConfig.production() => const AppConfig(
apiBaseUrl: 'http://154.84.135.121:18000',
wsBaseUrl: 'ws://154.84.135.121:18000',
livekitUrl: 'ws://154.84.135.121:17880',
environment: 'production',
);
}

View File

@ -1,14 +1,8 @@
import 'dart:async';
import 'dart:convert';
import 'dart:math';
import 'dart:typed_data';
import 'package:dio/dio.dart';
import 'package:flutter/material.dart';
import 'package:flutter_riverpod/flutter_riverpod.dart';
import 'package:record/record.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import '../../../../core/audio/pcm_player.dart';
import '../../../../core/audio/speech_enhancer.dart';
import 'package:livekit_client/livekit_client.dart';
import '../../../../core/config/api_endpoints.dart';
import '../../../../core/config/app_config.dart';
import '../../../../core/network/dio_client.dart';
@ -28,17 +22,10 @@ class _AgentCallPageState extends ConsumerState<AgentCallPage>
with SingleTickerProviderStateMixin {
_CallPhase _phase = _CallPhase.ringing;
String? _errorMessage;
String? _sessionId;
WebSocketChannel? _audioChannel;
StreamSubscription? _audioSubscription;
// Audio capture
late final AudioRecorder _recorder;
final SpeechEnhancer _enhancer = SpeechEnhancer();
StreamSubscription<List<int>>? _micSubscription;
// Audio playback
final PcmPlayer _pcmPlayer = PcmPlayer();
// LiveKit
Room? _room;
EventsListener<RoomEvent>? _listener;
// Call duration
final Stopwatch _stopwatch = Stopwatch();
@ -48,25 +35,17 @@ class _AgentCallPageState extends ConsumerState<AgentCallPage>
// Waveform
late AnimationController _waveController;
final List<double> _waveHeights = List.generate(20, (_) => 0.3);
Timer? _waveTimer;
// Mute / speaker state
// Mute state
bool _isMuted = false;
bool _isSpeakerOn = false;
// Reconnection
int _reconnectAttempts = 0;
static const int _maxReconnectAttempts = 5;
Timer? _reconnectTimer;
Timer? _heartbeatCheckTimer;
DateTime? _lastServerPing;
bool _isReconnecting = false;
// Prevent double-actions
bool _userEndedCall = false;
@override
void initState() {
super.initState();
_recorder = AudioRecorder();
_enhancer.init();
_waveController = AnimationController(
vsync: this,
duration: const Duration(milliseconds: 500),
@ -77,7 +56,7 @@ class _AgentCallPageState extends ConsumerState<AgentCallPage>
// Call lifecycle
// ---------------------------------------------------------------------------
/// Accept call: create voice session, connect WebSocket, start mic + player.
/// Accept call: get LiveKit token, connect to room, publish mic.
Future<void> _acceptCall() async {
setState(() {
_phase = _CallPhase.connecting;
@ -87,45 +66,49 @@ class _AgentCallPageState extends ConsumerState<AgentCallPage>
try {
final dio = ref.read(dioClientProvider);
final config = ref.read(appConfigProvider);
final response = await dio.post('${ApiEndpoints.voice}/sessions', data: {});
// 1. Get LiveKit token from backend
final response = await dio.post(ApiEndpoints.livekitToken);
final data = response.data as Map<String, dynamic>;
final token = data['token'] as String;
final livekitUrl = data['livekit_url'] as String? ?? config.livekitUrl;
_sessionId = data['session_id'] as String? ?? data['sessionId'] as String? ?? data['id'] as String?;
// 2. Create and connect LiveKit room
_room = Room();
// Build WebSocket URL prefer backend-returned path
String wsUrl;
final backendWsUrl =
data['websocket_url'] as String? ?? data['ws_url'] as String?;
if (backendWsUrl != null && backendWsUrl.startsWith('/')) {
// Relative path from backend prepend wsBaseUrl host
final uri = Uri.parse(config.wsBaseUrl);
wsUrl = '${uri.scheme}://${uri.host}:${uri.port}$backendWsUrl';
} else if (backendWsUrl != null) {
wsUrl = backendWsUrl;
} else {
wsUrl = '${config.wsBaseUrl}/api/v1/voice/ws/$_sessionId';
}
// 3. Set up event listeners
_listener = _room!.createListener();
_listener!
..on<TrackSubscribedEvent>((event) {
// Agent's audio track subscribed — playback is automatic via WebRTC
_startWaveformUpdates();
})
..on<TrackUnsubscribedEvent>((event) {
// Agent's audio track removed
})
..on<RoomDisconnectedEvent>((event) {
if (_phase != _CallPhase.ended && !_userEndedCall) {
_onCallEnded();
}
});
// Connect WebSocket
_audioChannel = WebSocketChannel.connect(Uri.parse(wsUrl));
// Listen for incoming data (text control messages + binary audio)
_audioSubscription = _audioChannel!.stream.listen(
_onWsData,
onDone: () => _onWsDisconnected(),
onError: (_) => _onWsDisconnected(),
// 4. Connect to LiveKit room
await _room!.connect(
livekitUrl,
token,
roomOptions: const RoomOptions(
adaptiveStream: true,
dynacast: true,
defaultAudioPublishOptions: AudioPublishOptions(
audioBitrate: AudioParameters.speechStereo.maxBitrate,
),
),
);
// Start bidirectional heartbeat
_startHeartbeat();
// 5. Enable microphone (publishes local audio track)
await _room!.localParticipant?.setMicrophoneEnabled(true);
// Initialize audio playback
await _pcmPlayer.init();
// Start mic capture
await _startMicCapture();
// Start duration timer
// 6. Start duration timer
_stopwatch.start();
_durationTimer = Timer.periodic(const Duration(seconds: 1), (_) {
if (!mounted) return;
@ -149,301 +132,51 @@ class _AgentCallPageState extends ConsumerState<AgentCallPage>
}
}
/// Start real microphone capture: record PCM 16kHz mono, denoise, send over WS.
Future<void> _startMicCapture() async {
final hasPermission = await _recorder.hasPermission();
if (!hasPermission) return;
final stream = await _recorder.startStream(const RecordConfig(
encoder: AudioEncoder.pcm16bits,
sampleRate: 16000,
numChannels: 1,
noiseSuppress: true,
autoGain: true,
));
_micSubscription = stream.listen((chunk) {
if (_phase != _CallPhase.active || _isMuted) return;
try {
// Denoise each chunk with GTCRN
final pcm = Uint8List.fromList(chunk);
final denoised = _enhancer.enhance(pcm);
_audioChannel?.sink.add(denoised);
} catch (_) {}
});
}
// ---------------------------------------------------------------------------
// WebSocket data routing & heartbeat
// ---------------------------------------------------------------------------
/// Top-level handler that distinguishes text (JSON control) from binary
/// (audio) WebSocket frames.
void _onWsData(dynamic data) {
if (data is String) {
_handleControlMessage(data);
} else {
_onAudioReceived(data);
}
}
/// Decode a JSON control frame from the server and respond accordingly.
void _handleControlMessage(String jsonStr) {
try {
final msg = jsonDecode(jsonStr) as Map<String, dynamic>;
final type = msg['type'] as String?;
switch (type) {
case 'ping':
// Server heartbeat update liveness timestamp.
// Note: We don't send pong back because Pipecat owns the server-side
// WebSocket read loop and would intercept the text frame.
_lastServerPing = DateTime.now();
break;
case 'session.resumed':
// Reconnection confirmed by server
_lastServerPing = DateTime.now();
break;
case 'session.ended':
_onCallEnded();
break;
case 'error':
if (mounted) {
final detail = msg['message'] as String? ?? 'Unknown error';
ScaffoldMessenger.of(context).showSnackBar(
SnackBar(
content: Text('服务端错误: $detail'),
backgroundColor: AppColors.error,
),
);
}
break;
/// Start waveform animation driven by remote participant audio level.
void _startWaveformUpdates() {
_waveTimer?.cancel();
_waveTimer = Timer.periodic(const Duration(milliseconds: 100), (timer) {
if (!mounted || _phase != _CallPhase.active) {
timer.cancel();
return;
}
} catch (_) {
// Malformed JSON ignore
}
}
/// Start monitoring server liveness based on incoming pings.
///
/// The server sends `{"type": "ping"}` every 15 s. If we haven't received
/// one in 45 s (3 missed pings), we assume the connection is dead and
/// trigger a reconnect.
void _startHeartbeat() {
_heartbeatCheckTimer?.cancel();
// Mark initial timestamp so the first 45-second window starts now
_lastServerPing = DateTime.now();
// Periodically verify server is alive
_heartbeatCheckTimer = Timer.periodic(const Duration(seconds: 10), (_) {
if (_lastServerPing != null &&
DateTime.now().difference(_lastServerPing!).inSeconds > 45) {
_triggerReconnect();
// Get audio level from remote participant (the agent)
double audioLevel = 0.0;
final participants = _room?.remoteParticipants.values;
if (participants != null && participants.isNotEmpty) {
audioLevel = participants.first.audioLevel;
}
});
}
/// Cancel heartbeat check timer.
void _stopHeartbeat() {
_heartbeatCheckTimer?.cancel();
_heartbeatCheckTimer = null;
}
// ---------------------------------------------------------------------------
// Auto-reconnect with exponential backoff
// ---------------------------------------------------------------------------
/// Called when the WebSocket drops unexpectedly (not user-initiated).
void _onWsDisconnected() {
if (_phase == _CallPhase.ended || _isReconnecting || _userEndedCall) return;
_triggerReconnect();
}
/// Attempt to reconnect with exponential backoff (1s 2s 4s 8s 16s).
///
/// Strategy:
/// - 200 from /reconnect session alive, open new WebSocket
/// - 404 from /reconnect session expired/gone, stop retrying
/// - 409 from /reconnect terminal state (ended), stop retrying
/// - Network error / timeout transient, keep retrying
Future<void> _triggerReconnect() async {
if (_isReconnecting || _phase == _CallPhase.ended || _userEndedCall) return;
_isReconnecting = true;
if (mounted) setState(() {});
// Tear down current connection & heartbeat
_stopHeartbeat();
await _audioSubscription?.cancel();
_audioSubscription = null;
try {
await _audioChannel?.sink.close();
} catch (_) {}
_audioChannel = null;
for (int attempt = 0; attempt < _maxReconnectAttempts; attempt++) {
_reconnectAttempts = attempt + 1;
if (!mounted || _phase == _CallPhase.ended || _userEndedCall) break;
// Exponential backoff: 1s, 2s, 4s, 8s, 16s (first attempt waits 1s)
final delaySecs = min(pow(2, attempt).toInt(), 16);
await Future.delayed(Duration(seconds: delaySecs));
if (!mounted || _phase == _CallPhase.ended || _userEndedCall) break;
try {
final dio = ref.read(dioClientProvider);
final response = await dio.post(
'${ApiEndpoints.voice}/sessions/$_sessionId/reconnect',
);
final data = response.data as Map<String, dynamic>;
// Session is alive open new WebSocket
if (await _connectWebSocket(data)) return; // success
} on DioException catch (e) {
final statusCode = e.response?.statusCode;
if (statusCode == 404 || statusCode == 409) {
// 404: session expired / not found
// 409: session in terminal state (e.g. "ended")
// No point retrying break immediately
break;
setState(() {
final rand = Random();
for (int i = 0; i < _waveHeights.length; i++) {
_waveHeights[i] =
(audioLevel * 2.0 + rand.nextDouble() * 0.2).clamp(0.1, 1.0);
}
// Other errors (network, timeout) keep retrying
} catch (_) {
// Unexpected error keep retrying
}
}
// All reconnection attempts exhausted or session gone
_isReconnecting = false;
_onCallEnded();
}
/// Open a new WebSocket from reconnect response data. Returns true on success.
Future<bool> _connectWebSocket(Map<String, dynamic> data) async {
try {
final config = ref.read(appConfigProvider);
String wsUrl;
final backendWsUrl = data['websocket_url'] as String?;
if (backendWsUrl != null && backendWsUrl.startsWith('/')) {
final uri = Uri.parse(config.wsBaseUrl);
wsUrl = '${uri.scheme}://${uri.host}:${uri.port}$backendWsUrl';
} else if (backendWsUrl != null) {
wsUrl = backendWsUrl;
} else {
wsUrl = '${config.wsBaseUrl}/api/v1/voice/ws/$_sessionId';
}
_audioChannel = WebSocketChannel.connect(Uri.parse(wsUrl));
_audioSubscription = _audioChannel!.stream.listen(
_onWsData,
onDone: () => _onWsDisconnected(),
onError: (_) => _onWsDisconnected(),
);
// Reset audio jitter buffer for a fresh start
_pcmPlayer.reset();
// Restart heartbeat
_startHeartbeat();
_reconnectAttempts = 0;
_isReconnecting = false;
if (mounted) setState(() {});
return true;
} catch (_) {
return false;
}
}
/// Serialises async feed() calls so they don't interleave.
Future<void> _feedChain = Future.value();
/// Handle incoming audio from the agent side.
void _onAudioReceived(dynamic data) {
if (!mounted || _phase != _CallPhase.active) return;
Uint8List pcmData;
if (data is Uint8List) {
pcmData = data;
} else if (data is List<int>) {
pcmData = Uint8List.fromList(data);
} else {
return;
}
// Chain feed calls so they run sequentially (feed is async).
_feedChain = _feedChain.then((_) => _pcmPlayer.feed(pcmData));
// Drive waveform from actual audio energy (RMS)
_updateWaveform(pcmData);
}
/// Calculate RMS from PCM data and update waveform bars.
void _updateWaveform(Uint8List pcmData) {
if (pcmData.length < 4) return;
final byteData = ByteData.sublistView(pcmData);
final sampleCount = pcmData.length ~/ 2;
// Compute overall RMS
double sum = 0;
for (int i = 0; i < sampleCount; i++) {
final s = byteData.getInt16(i * 2, Endian.little).toDouble();
sum += s * s;
}
final rms = sqrt(sum / sampleCount) / 32768.0; // normalize to [0,1]
setState(() {
final rand = Random();
for (int i = 0; i < _waveHeights.length; i++) {
_waveHeights[i] =
(rms * 2.0 + rand.nextDouble() * 0.2).clamp(0.1, 1.0);
}
});
});
}
/// End the call and clean up all resources.
Future<void> _endCall() async {
_userEndedCall = true;
_isReconnecting = false;
setState(() => _phase = _CallPhase.ended);
_stopwatch.stop();
_durationTimer?.cancel();
_waveController.stop();
_waveTimer?.cancel();
// Cancel heartbeat & reconnect timers
_stopHeartbeat();
_reconnectTimer?.cancel();
_reconnectTimer = null;
// Disconnect LiveKit room (handles all audio cleanup)
try {
await _room?.disconnect();
} catch (_) {}
try {
await _room?.dispose();
} catch (_) {}
_listener?.dispose();
// Cleanup all resources each wrapped in try/catch to ensure we always
// reach Navigator.pop() at the end.
try {
await _micSubscription?.cancel();
_micSubscription = null;
} catch (_) {}
try {
await _recorder.stop();
} catch (_) {}
try {
await _pcmPlayer.dispose();
} catch (_) {}
try {
_audioSubscription?.cancel();
} catch (_) {}
try {
await _audioChannel?.sink.close();
} catch (_) {}
// Delete voice session on the server (fire-and-forget)
if (_sessionId != null) {
try {
final dio = ref.read(dioClientProvider);
await dio.delete('${ApiEndpoints.voice}/sessions/$_sessionId')
.timeout(const Duration(seconds: 3));
} catch (_) {}
}
// Navigate back to the dial page
// Navigate back
if (mounted) {
await Future.delayed(const Duration(milliseconds: 500));
if (mounted) Navigator.of(context).pop();
@ -455,6 +188,13 @@ class _AgentCallPageState extends ConsumerState<AgentCallPage>
_endCall();
}
/// Toggle microphone mute.
void _toggleMute() {
_isMuted = !_isMuted;
_room?.localParticipant?.setMicrophoneEnabled(!_isMuted);
setState(() {});
}
// ---------------------------------------------------------------------------
// Build
// ---------------------------------------------------------------------------
@ -464,116 +204,78 @@ class _AgentCallPageState extends ConsumerState<AgentCallPage>
return Scaffold(
backgroundColor: AppColors.background,
body: SafeArea(
child: Stack(
children: [
Center(
child: Column(
children: [
const Spacer(flex: 2),
_buildAvatar(),
const SizedBox(height: 24),
Text(
_statusText,
textAlign: TextAlign.center,
style: const TextStyle(
fontSize: 24, fontWeight: FontWeight.bold),
),
const SizedBox(height: 8),
Text(
_subtitleText,
textAlign: TextAlign.center,
style: const TextStyle(
color: AppColors.textSecondary, fontSize: 15),
),
// Inline error message
if (_errorMessage != null) ...[
const SizedBox(height: 16),
Padding(
padding: const EdgeInsets.symmetric(horizontal: 32),
child: Container(
padding: const EdgeInsets.symmetric(
horizontal: 16, vertical: 12),
decoration: BoxDecoration(
color: AppColors.error.withOpacity(0.1),
borderRadius: BorderRadius.circular(12),
border: Border.all(
color: AppColors.error.withOpacity(0.3)),
),
child: Row(
mainAxisSize: MainAxisSize.min,
children: [
const Icon(Icons.error_outline,
size: 18, color: AppColors.error),
const SizedBox(width: 8),
Flexible(
child: Text(
_errorMessage!,
textAlign: TextAlign.center,
style: const TextStyle(
color: AppColors.error,
fontSize: 13,
),
),
),
],
),
),
),
],
const SizedBox(height: 32),
if (_phase == _CallPhase.active)
Text(
_durationLabel,
style: const TextStyle(
fontSize: 40,
fontWeight: FontWeight.w300,
color: AppColors.textPrimary,
letterSpacing: 4,
),
),
const SizedBox(height: 24),
if (_phase == _CallPhase.active) _buildWaveform(),
const Spacer(flex: 3),
_buildControls(),
const SizedBox(height: 48),
],
child: Center(
child: Column(
children: [
const Spacer(flex: 2),
_buildAvatar(),
const SizedBox(height: 24),
Text(
_statusText,
textAlign: TextAlign.center,
style: const TextStyle(
fontSize: 24, fontWeight: FontWeight.bold),
),
),
// Reconnecting overlay
if (_isReconnecting)
Positioned(
top: 0,
left: 0,
right: 0,
child: Container(
padding:
const EdgeInsets.symmetric(vertical: 10, horizontal: 16),
color: AppColors.warning.withOpacity(0.9),
child: Row(
mainAxisAlignment: MainAxisAlignment.center,
children: [
const SizedBox(
width: 16,
height: 16,
child: CircularProgressIndicator(
strokeWidth: 2,
color: Colors.white,
const SizedBox(height: 8),
Text(
_subtitleText,
textAlign: TextAlign.center,
style: const TextStyle(
color: AppColors.textSecondary, fontSize: 15),
),
// Inline error message
if (_errorMessage != null) ...[
const SizedBox(height: 16),
Padding(
padding: const EdgeInsets.symmetric(horizontal: 32),
child: Container(
padding: const EdgeInsets.symmetric(
horizontal: 16, vertical: 12),
decoration: BoxDecoration(
color: AppColors.error.withOpacity(0.1),
borderRadius: BorderRadius.circular(12),
border: Border.all(
color: AppColors.error.withOpacity(0.3)),
),
child: Row(
mainAxisSize: MainAxisSize.min,
children: [
const Icon(Icons.error_outline,
size: 18, color: AppColors.error),
const SizedBox(width: 8),
Flexible(
child: Text(
_errorMessage!,
textAlign: TextAlign.center,
style: const TextStyle(
color: AppColors.error,
fontSize: 13,
),
),
),
),
const SizedBox(width: 10),
Text(
'重新连接中... ($_reconnectAttempts/$_maxReconnectAttempts)',
style: const TextStyle(
color: Colors.white,
fontSize: 14,
fontWeight: FontWeight.w500,
),
),
],
],
),
),
),
),
],
],
const SizedBox(height: 32),
if (_phase == _CallPhase.active)
Text(
_durationLabel,
style: const TextStyle(
fontSize: 40,
fontWeight: FontWeight.w300,
color: AppColors.textPrimary,
letterSpacing: 4,
),
),
const SizedBox(height: 24),
if (_phase == _CallPhase.active) _buildWaveform(),
const Spacer(flex: 3),
_buildControls(),
const SizedBox(height: 48),
],
),
),
),
);
@ -611,7 +313,6 @@ class _AgentCallPageState extends ConsumerState<AgentCallPage>
/// Extract a short user-friendly message from an exception.
String _friendlyError(dynamic e) {
final s = e.toString();
// Extract status code info for Dio errors
final match = RegExp(r'status code of (\d+)').firstMatch(s);
if (match != null) {
final code = match.group(1);
@ -719,7 +420,7 @@ class _AgentCallPageState extends ConsumerState<AgentCallPage>
icon: _isMuted ? Icons.mic_off : Icons.mic,
label: _isMuted ? '取消静音' : '静音',
isActive: _isMuted,
onTap: () => setState(() => _isMuted = !_isMuted),
onTap: _toggleMute,
),
const SizedBox(width: 24),
FloatingActionButton(
@ -730,12 +431,11 @@ class _AgentCallPageState extends ConsumerState<AgentCallPage>
),
const SizedBox(width: 24),
_CircleButton(
icon: _isSpeakerOn ? Icons.volume_up : Icons.volume_down,
icon: Icons.volume_up,
label: '扬声器',
isActive: _isSpeakerOn,
isActive: false,
onTap: () {
setState(() => _isSpeakerOn = !_isSpeakerOn);
_pcmPlayer.setSpeakerOn(_isSpeakerOn);
// WebRTC handles audio routing automatically
},
),
],
@ -751,16 +451,12 @@ class _AgentCallPageState extends ConsumerState<AgentCallPage>
void dispose() {
_userEndedCall = true;
_durationTimer?.cancel();
_stopHeartbeat();
_reconnectTimer?.cancel();
_waveTimer?.cancel();
_waveController.dispose();
_stopwatch.stop();
_micSubscription?.cancel();
_recorder.dispose();
_enhancer.dispose();
_audioSubscription?.cancel();
_audioChannel?.sink.close();
_pcmPlayer.dispose();
_room?.disconnect();
_room?.dispose();
_listener?.dispose();
super.dispose();
}
}

View File

@ -40,6 +40,7 @@ dependencies:
file_picker: ^8.0.0
# Voice
livekit_client: ^2.3.0
record: ^6.0.0
flutter_tts: ^4.0.0
sherpa_onnx: ^1.12.25

View File

@ -0,0 +1,17 @@
FROM python:3.11-slim
WORKDIR /app
# Install system dependencies for audio processing
RUN apt-get update && apt-get install -y \
ffmpeg \
libsndfile1 \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY src/ ./src/
# LiveKit agent worker entry point
CMD ["python", "-m", "src.agent", "start"]

View File

@ -0,0 +1,16 @@
livekit>=1.0.0
livekit-agents>=1.0.0
livekit-plugins-silero>=1.0.0
livekit-plugins-openai>=1.0.0
faster-whisper==1.2.1
kokoro==0.3.5
misaki[zh]==0.7.17
websockets==12.0
httpx==0.27.0
numpy>=1.26.4
torch>=2.0.0
pydantic-settings==2.2.0
ordered-set
pypinyin
cn2an
jieba

View File

@ -0,0 +1,188 @@
"""
IT0 Voice Agent LiveKit Agents entry point.
This is a LiveKit Agent Worker that handles real-time voice conversations.
It connects to the LiveKit server, waits for users to join a room, and runs
the voice pipeline: VAD STT LLM (via agent-service) TTS.
Usage:
python -m src.agent start
"""
import json
import logging
from livekit import agents, rtc
from livekit.agents import AgentSession, Agent, RoomInputOptions, JobContext, JobProcess, cli, WorkerOptions
from livekit.plugins import silero
from .config import settings
from .plugins.agent_llm import AgentServiceLLM
from .plugins.whisper_stt import LocalWhisperSTT
from .plugins.kokoro_tts import LocalKokoroTTS, patch_misaki_compat
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class IT0VoiceAgent(Agent):
"""Voice agent for IT0 server operations platform."""
def __init__(self):
super().__init__(
instructions=(
"你是 IT0 服务器运维助手。用户通过语音与你对话,"
"你帮助管理和监控服务器集群。回答简洁,适合语音对话场景。"
),
)
def prewarm(proc: JobProcess) -> None:
"""Pre-load ML models into shared process memory.
Called once per worker process. Models are shared across all sessions
handled by this process, avoiding redundant loading.
"""
logger.info(
"Prewarming models (stt=%s, tts=%s, device=%s)",
settings.stt_provider,
settings.tts_provider,
settings.device,
)
# VAD — always needed
proc.userdata["vad"] = silero.VAD.load()
logger.info("VAD loaded: Silero VAD")
# STT — local faster-whisper
if settings.stt_provider == "local":
from faster_whisper import WhisperModel
compute_type = "float16" if settings.device == "cuda" else "int8"
try:
model = WhisperModel(
settings.whisper_model,
device=settings.device,
compute_type=compute_type,
)
except Exception as e:
logger.warning("Whisper GPU failed, falling back to CPU: %s", e)
model = WhisperModel(
settings.whisper_model, device="cpu", compute_type="int8"
)
proc.userdata["whisper_model"] = model
logger.info("STT loaded: faster-whisper %s", settings.whisper_model)
else:
proc.userdata["whisper_model"] = None
logger.info("STT: using OpenAI %s", settings.openai_stt_model)
# TTS — local Kokoro
if settings.tts_provider == "local":
patch_misaki_compat()
from kokoro import KPipeline
proc.userdata["kokoro_pipeline"] = KPipeline(lang_code="z")
logger.info("TTS loaded: Kokoro-82M voice=%s", settings.kokoro_voice)
else:
proc.userdata["kokoro_pipeline"] = None
logger.info("TTS: using OpenAI %s", settings.openai_tts_model)
logger.info("Prewarm complete.")
async def entrypoint(ctx: JobContext) -> None:
"""Main entrypoint — called for each voice session (room join)."""
await ctx.connect(auto_subscribe=agents.AutoSubscribe.AUDIO_ONLY)
# Wait for a participant (the Flutter user) to join
participant = await ctx.wait_for_participant()
logger.info(
"Participant joined: identity=%s, name=%s",
participant.identity,
participant.name,
)
# Extract auth header from room agent dispatch metadata
# The token endpoint embeds {"auth_header": "Bearer ..."} in dispatch metadata
auth_header = ""
dispatch_metadata = ctx.room.name # fallback
try:
# Try to get metadata from the agent dispatch
job = ctx.job
if job and hasattr(job, "agent_dispatch") and job.agent_dispatch:
meta_str = job.agent_dispatch.metadata or "{}"
meta = json.loads(meta_str)
auth_header = meta.get("auth_header", "")
except Exception:
pass
# Fallback: try participant metadata
if not auth_header:
try:
meta_str = participant.metadata or "{}"
meta = json.loads(meta_str)
auth_header = meta.get("auth_header", "")
except Exception:
pass
logger.info("Auth header present: %s", bool(auth_header))
# Build STT
if settings.stt_provider == "openai":
from livekit.plugins import openai as openai_plugin
stt = openai_plugin.STT(
model=settings.openai_stt_model,
language=settings.whisper_language,
)
else:
stt = LocalWhisperSTT(
model=ctx.proc.userdata.get("whisper_model"),
language=settings.whisper_language,
)
# Build TTS
if settings.tts_provider == "openai":
from livekit.plugins import openai as openai_plugin
tts = openai_plugin.TTS(
model=settings.openai_tts_model,
voice=settings.openai_tts_voice,
)
else:
tts = LocalKokoroTTS(
pipeline=ctx.proc.userdata.get("kokoro_pipeline"),
voice=settings.kokoro_voice,
)
# Build custom LLM (proxies to agent-service)
llm = AgentServiceLLM(
agent_service_url=settings.agent_service_url,
auth_header=auth_header,
)
# Create and start AgentSession
session = AgentSession(
vad=ctx.proc.userdata["vad"],
stt=stt,
llm=llm,
tts=tts,
)
await session.start(
room=ctx.room,
agent=IT0VoiceAgent(),
participant=participant,
)
logger.info("Voice session started for participant %s", participant.identity)
if __name__ == "__main__":
cli.run_app(
WorkerOptions(
entrypoint_fnc=entrypoint,
prewarm_fnc=prewarm,
agent_name="voice-agent",
)
)

View File

@ -0,0 +1,40 @@
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
"""Voice agent configuration."""
# LiveKit
livekit_url: str = "ws://livekit-server:7880"
livekit_api_key: str = "devkey"
livekit_api_secret: str = "devsecret"
# Agent Service
agent_service_url: str = "http://agent-service:3002"
# STT / TTS provider selection
stt_provider: str = "local" # "local" or "openai"
tts_provider: str = "local" # "local" or "openai"
# Local STT (faster-whisper)
whisper_model: str = "base"
whisper_language: str = "zh"
# Local TTS (Kokoro-82M)
kokoro_voice: str = "zf_xiaoxiao"
# OpenAI (fallback)
openai_api_key: str = ""
openai_base_url: str = ""
openai_stt_model: str = "gpt-4o-transcribe"
openai_tts_model: str = "tts-1"
openai_tts_voice: str = "alloy"
# Device
device: str = "cpu" # "cpu" or "cuda"
class Config:
env_file = ".env"
settings = Settings()

View File

@ -0,0 +1,252 @@
"""
Custom LLM plugin that proxies to IT0 agent-service.
Instead of calling Claude directly, this plugin:
1. POSTs to agent-service /api/v1/agent/tasks (creates a task with engineType=claude_api)
2. Subscribes to the agent-service WebSocket /ws/agent for streaming text events
3. Emits ChatChunk objects into the LiveKit pipeline
This preserves all agent-service capabilities: Tool Use, conversation history,
tenant isolation, and session management.
"""
import asyncio
import json
import logging
import uuid
from typing import Any
import httpx
import websockets
from livekit.agents import llm
from livekit.agents.types import (
DEFAULT_API_CONNECT_OPTIONS,
NOT_GIVEN,
APIConnectOptions,
NotGivenOr,
)
logger = logging.getLogger(__name__)
class AgentServiceLLM(llm.LLM):
"""LLM that proxies to IT0 agent-service for Claude + Tool Use."""
def __init__(
self,
*,
agent_service_url: str = "http://agent-service:3002",
auth_header: str = "",
):
super().__init__()
self._agent_service_url = agent_service_url
self._auth_header = auth_header
self._agent_session_id: str | None = None
@property
def model(self) -> str:
return "agent-service-proxy"
@property
def provider(self) -> str:
return "it0-agent"
def chat(
self,
*,
chat_ctx: llm.ChatContext,
tools: list[llm.Tool] | None = None,
conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
tool_choice: NotGivenOr[llm.ToolChoice] = NOT_GIVEN,
extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN,
) -> "AgentServiceLLMStream":
return AgentServiceLLMStream(
llm_instance=self,
chat_ctx=chat_ctx,
tools=tools or [],
conn_options=conn_options,
)
class AgentServiceLLMStream(llm.LLMStream):
"""Streams text from agent-service via WebSocket."""
def __init__(
self,
*,
llm_instance: AgentServiceLLM,
chat_ctx: llm.ChatContext,
tools: list[llm.Tool],
conn_options: APIConnectOptions,
):
super().__init__(
llm_instance,
chat_ctx=chat_ctx,
tools=tools,
conn_options=conn_options,
)
self._llm_instance = llm_instance
async def _run(self) -> None:
# Extract the latest user message from ChatContext
user_text = ""
for msg in reversed(self._chat_ctx.items):
if msg.role == "user":
user_text = msg.text_content
break
if not user_text:
logger.warning("No user message found in chat context")
return
agent_url = self._llm_instance._agent_service_url
auth_header = self._llm_instance._auth_header
headers: dict[str, str] = {"Content-Type": "application/json"}
if auth_header:
headers["Authorization"] = auth_header
ws_url = agent_url.replace("http://", "ws://").replace("https://", "wss://")
ws_url = f"{ws_url}/ws/agent"
request_id = f"agent-{uuid.uuid4().hex[:12]}"
timeout_secs = 120
try:
logger.info("Connecting to agent-service WS: %s", ws_url)
async with websockets.connect(ws_url) as ws:
# 1. Pre-subscribe with existing session ID (for event buffering)
if self._llm_instance._agent_session_id:
await ws.send(json.dumps({
"event": "subscribe_session",
"data": {"sessionId": self._llm_instance._agent_session_id},
}))
# 2. Create agent task
body: dict[str, Any] = {
"prompt": user_text,
"engineType": "claude_api",
}
if self._llm_instance._agent_session_id:
body["sessionId"] = self._llm_instance._agent_session_id
logger.info("POST /tasks prompt=%s", user_text[:80])
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.post(
f"{agent_url}/api/v1/agent/tasks",
json=body,
headers=headers,
)
if resp.status_code not in (200, 201):
logger.error(
"Task creation failed: %d %s",
resp.status_code, resp.text[:200],
)
self._event_ch.send_nowait(
llm.ChatChunk(
id=request_id,
delta=llm.ChoiceDelta(
role="assistant",
content="抱歉Agent服务暂时不可用。",
),
)
)
return
data = resp.json()
session_id = data.get("sessionId", "")
task_id = data.get("taskId", "")
self._llm_instance._agent_session_id = session_id
logger.info(
"Task created: session=%s, task=%s", session_id, task_id
)
# 3. Subscribe with actual IDs
await ws.send(json.dumps({
"event": "subscribe_session",
"data": {"sessionId": session_id, "taskId": task_id},
}))
# 4. Send initial role delta
self._event_ch.send_nowait(
llm.ChatChunk(
id=request_id,
delta=llm.ChoiceDelta(role="assistant"),
)
)
# 5. Stream events → ChatChunk
import time
deadline = time.time() + timeout_secs
while time.time() < deadline:
remaining = deadline - time.time()
try:
raw = await asyncio.wait_for(
ws.recv(), timeout=min(5.0, remaining)
)
except asyncio.TimeoutError:
if time.time() >= deadline:
logger.warning("Agent stream timeout after %ds", timeout_secs)
continue
except websockets.exceptions.ConnectionClosed:
logger.warning("Agent WS connection closed")
break
try:
msg = json.loads(raw)
except (json.JSONDecodeError, TypeError):
continue
event_type = msg.get("event", "")
if event_type == "stream_event":
evt_data = msg.get("data", {})
evt_type = evt_data.get("type", "")
if evt_type == "text":
content = evt_data.get("content", "")
if content:
self._event_ch.send_nowait(
llm.ChatChunk(
id=request_id,
delta=llm.ChoiceDelta(content=content),
)
)
elif evt_type == "completed":
# If no text was streamed, use the summary
summary = evt_data.get("summary", "")
if summary:
# Check if we already sent text chunks
pass # LiveKit pipeline handles this
logger.info("Agent stream completed")
return
elif evt_type == "error":
err_msg = evt_data.get("message", "Unknown error")
logger.error("Agent error: %s", err_msg)
self._event_ch.send_nowait(
llm.ChatChunk(
id=request_id,
delta=llm.ChoiceDelta(
content=f"Agent 错误: {err_msg}"
),
)
)
return
except Exception as exc:
logger.error("Agent stream error: %s: %s", type(exc).__name__, exc)
self._event_ch.send_nowait(
llm.ChatChunk(
id=request_id,
delta=llm.ChoiceDelta(
role="assistant",
content="抱歉Agent服务暂时不可用。",
),
)
)

View File

@ -0,0 +1,129 @@
"""
Custom TTS plugin using local Kokoro-82M model.
Kokoro outputs 24kHz float32 audio. We declare sample_rate=24000 and let
LiveKit handle resampling to 48kHz for WebRTC transport.
"""
import asyncio
import logging
import numpy as np
from livekit import rtc
from livekit.agents import tts
from livekit.agents.tts import AudioEmitter
from livekit.agents.types import (
DEFAULT_API_CONNECT_OPTIONS,
APIConnectOptions,
)
logger = logging.getLogger(__name__)
def patch_misaki_compat():
"""Patch misaki.en compatibility: MutableToken was renamed to MToken."""
try:
import misaki.en as en
if not hasattr(en, "MutableToken") and hasattr(en, "MToken"):
en.MutableToken = en.MToken
except ImportError:
pass
class LocalKokoroTTS(tts.TTS):
"""Text-to-Speech using local Kokoro-82M model."""
def __init__(self, *, pipeline=None, voice: str = "zf_xiaoxiao"):
super().__init__(
capabilities=tts.TTSCapabilities(streaming=False),
sample_rate=24000, # Kokoro native output rate
num_channels=1,
)
self._pipeline = pipeline # KPipeline instance (pre-loaded in prewarm)
self._voice = voice
@property
def model(self) -> str:
return "kokoro-82m"
@property
def provider(self) -> str:
return "local"
def synthesize(
self,
text: str,
*,
conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> "KokoroChunkedStream":
return KokoroChunkedStream(
tts=self,
input_text=text,
conn_options=conn_options,
)
class KokoroChunkedStream(tts.ChunkedStream):
"""Non-streaming TTS synthesis with Kokoro."""
def __init__(
self,
*,
tts: LocalKokoroTTS,
input_text: str,
conn_options: APIConnectOptions,
):
super().__init__(tts=tts, input_text=input_text, conn_options=conn_options)
self._kokoro_tts = tts
async def _run(self, output_emitter: AudioEmitter) -> None:
if self._kokoro_tts._pipeline is None:
logger.error("Kokoro pipeline not loaded")
return
text = self._input_text
if not text or not text.strip():
return
loop = asyncio.get_event_loop()
pipeline = self._kokoro_tts._pipeline
voice = self._kokoro_tts._voice
def _synthesize():
samples = []
for _, _, audio in pipeline(text, voice=voice):
if hasattr(audio, "numpy"):
samples.append(audio.numpy())
else:
samples.append(audio)
if not samples:
return None
return np.concatenate(samples)
audio_np = await loop.run_in_executor(None, _synthesize)
if audio_np is None or len(audio_np) == 0:
logger.warning("Kokoro produced no audio for: '%s'", text[:50])
return
# Convert float32 → int16 PCM
pcm_int16 = (audio_np * 32768).clip(-32768, 32767).astype(np.int16)
# Initialize emitter with Kokoro's native 24kHz
request_id = text[:16].replace("\n", " ")
output_emitter.initialize(
request_id=request_id,
sample_rate=24000,
num_channels=1,
mime_type="audio/pcm",
)
# Push audio data
output_emitter.push(pcm_int16.tobytes())
logger.info(
"TTS synthesized: '%s'%d samples (%.1fs)",
text[:30],
len(pcm_int16),
len(pcm_int16) / 24000,
)

View File

@ -0,0 +1,117 @@
"""
Custom STT plugin using local faster-whisper model.
Implements batch recognition (not streaming): LiveKit's AgentSession collects
audio during speech (using Silero VAD), and when silence is detected, passes
the complete AudioBuffer to _recognize_impl() for transcription.
"""
import asyncio
import logging
import numpy as np
from livekit.agents import stt
from livekit.agents.types import (
DEFAULT_API_CONNECT_OPTIONS,
NOT_GIVEN,
APIConnectOptions,
NotGivenOr,
)
logger = logging.getLogger(__name__)
class LocalWhisperSTT(stt.STT):
"""Speech-to-Text using local faster-whisper model."""
def __init__(self, *, model=None, language: str = "zh"):
super().__init__(
capabilities=stt.STTCapabilities(
streaming=False,
interim_results=False,
)
)
self._model = model # WhisperModel instance (pre-loaded in prewarm)
self._language = language
@property
def model(self) -> str:
return "faster-whisper"
@property
def provider(self) -> str:
return "local"
async def _recognize_impl(
self,
buffer,
*,
language: NotGivenOr[str] = NOT_GIVEN,
conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> stt.SpeechEvent:
if self._model is None:
logger.error("Whisper model not loaded")
return stt.SpeechEvent(
type=stt.SpeechEventType.FINAL_TRANSCRIPT,
alternatives=[stt.SpeechData(language=self._language, text="")],
)
# Convert AudioBuffer to numpy float32 array
# LiveKit AudioBuffer provides audio frames; we need to collect them
audio_frames = buffer if isinstance(buffer, list) else [buffer]
pcm_data = bytearray()
sample_rate = 16000
for frame in audio_frames:
if hasattr(frame, "data"):
pcm_data.extend(frame.data)
if hasattr(frame, "sample_rate"):
sample_rate = frame.sample_rate
elif isinstance(frame, bytes):
pcm_data.extend(frame)
else:
# Try to get raw data from the frame
try:
pcm_data.extend(bytes(frame))
except TypeError:
pass
if not pcm_data:
return stt.SpeechEvent(
type=stt.SpeechEventType.FINAL_TRANSCRIPT,
alternatives=[stt.SpeechData(language=self._language, text="")],
)
# Convert PCM int16 to float32 normalized [-1.0, 1.0]
audio_np = np.frombuffer(bytes(pcm_data), dtype=np.int16).astype(np.float32) / 32768.0
lang = language if language is not NOT_GIVEN else self._language
# Run transcription in thread pool to avoid blocking
loop = asyncio.get_event_loop()
def _transcribe():
segments, info = self._model.transcribe(
audio_np,
language=lang,
beam_size=5,
vad_filter=True,
)
return "".join(seg.text for seg in segments)
text = await loop.run_in_executor(None, _transcribe)
text = text.strip()
logger.info("STT result: '%s'", text[:100] if text else "(empty)")
return stt.SpeechEvent(
type=stt.SpeechEventType.FINAL_TRANSCRIPT,
alternatives=[
stt.SpeechData(
language=lang,
text=text,
confidence=1.0,
)
],
)

View File

@ -19,3 +19,4 @@ ordered-set
pypinyin
cn2an
jieba
livekit>=1.0.0

View File

@ -0,0 +1,63 @@
"""
LiveKit token generation endpoint.
Generates a LiveKit room JWT for voice calls. The token includes:
- A unique room name and participant identity
- Agent dispatch configuration (auto-spawns voice-agent)
- The caller's auth header in dispatch metadata (for agent-service auth)
"""
import json
import uuid
from fastapi import APIRouter, Request
from livekit.api import AccessToken, VideoGrants, RoomAgentDispatch, RoomConfiguration
from ..config.settings import settings
router = APIRouter()
@router.post("/livekit/token")
async def create_livekit_token(request: Request):
"""Generate a LiveKit room token for a voice call session.
The caller's Authorization header is embedded in the agent dispatch
metadata so that the voice-agent can forward it to agent-service.
"""
auth_header = request.headers.get("authorization", "")
room_name = f"voice-{uuid.uuid4().hex[:12]}"
participant_identity = f"user-{uuid.uuid4().hex[:8]}"
token = (
AccessToken(settings.livekit_api_key, settings.livekit_api_secret)
.with_identity(participant_identity)
.with_name("User")
.with_grants(
VideoGrants(
room_join=True,
room=room_name,
can_publish=True,
can_subscribe=True,
can_publish_data=True,
)
)
.with_room_config(
RoomConfiguration(
agents=[
RoomAgentDispatch(
agent_name="voice-agent",
metadata=json.dumps({"auth_header": auth_header}),
)
],
)
)
)
return {
"token": token.to_jwt(),
"room_name": room_name,
"livekit_url": settings.livekit_ws_url,
}

View File

@ -11,6 +11,7 @@ from .health import router as health_router
from .session_router import router as session_router
from .twilio_webhook import router as twilio_router
from .test_tts import router as test_tts_router
from .livekit_token import router as livekit_token_router
logger = logging.getLogger(__name__)
@ -31,6 +32,7 @@ app.include_router(health_router, tags=["health"])
app.include_router(session_router, prefix="/api/v1/voice", tags=["sessions"])
app.include_router(twilio_router, prefix="/api/v1/twilio", tags=["twilio"])
app.include_router(test_tts_router, prefix="/api/v1/test", tags=["test"])
app.include_router(livekit_token_router, prefix="/api/v1/voice", tags=["livekit"])
# ---------------------------------------------------------------------------

View File

@ -39,6 +39,11 @@ class Settings(BaseSettings):
# Device (cpu or cuda)
device: str = "cpu"
# LiveKit
livekit_api_key: str = "devkey"
livekit_api_secret: str = "devsecret"
livekit_ws_url: str = "ws://livekit-server:7880"
# Twilio
twilio_account_sid: str = ""
twilio_auth_token: str = ""