diff --git a/it0_app/lib/features/agent_call/presentation/pages/agent_call_page.dart b/it0_app/lib/features/agent_call/presentation/pages/agent_call_page.dart index 804d55b..5feb6cb 100644 --- a/it0_app/lib/features/agent_call/presentation/pages/agent_call_page.dart +++ b/it0_app/lib/features/agent_call/presentation/pages/agent_call_page.dart @@ -389,32 +389,37 @@ class _AgentCallPageState extends ConsumerState _reconnectTimer?.cancel(); _reconnectTimer = null; - // Stop mic - await _micSubscription?.cancel(); - _micSubscription = null; + // Cleanup all resources — each wrapped in try/catch to ensure we always + // reach Navigator.pop() at the end. + try { + await _micSubscription?.cancel(); + _micSubscription = null; + } catch (_) {} try { await _recorder.stop(); } catch (_) {} - - // Stop playback - await _pcmPlayer.dispose(); - - // Close WebSocket - _audioSubscription?.cancel(); + try { + await _pcmPlayer.dispose(); + } catch (_) {} + try { + _audioSubscription?.cancel(); + } catch (_) {} try { await _audioChannel?.sink.close(); } catch (_) {} - // Delete voice session on the server + // Delete voice session on the server (fire-and-forget) if (_sessionId != null) { try { final dio = ref.read(dioClientProvider); - await dio.delete('${ApiEndpoints.voice}/sessions/$_sessionId'); + await dio.delete('${ApiEndpoints.voice}/sessions/$_sessionId') + .timeout(const Duration(seconds: 3)); } catch (_) {} } + // Navigate back to the dial page if (mounted) { - await Future.delayed(const Duration(seconds: 1)); + await Future.delayed(const Duration(milliseconds: 500)); if (mounted) Navigator.of(context).pop(); } } diff --git a/packages/services/voice-service/requirements.txt b/packages/services/voice-service/requirements.txt index 4a2c85e..3822509 100644 --- a/packages/services/voice-service/requirements.txt +++ b/packages/services/voice-service/requirements.txt @@ -1,16 +1,20 @@ fastapi==0.110.0 uvicorn==0.29.0 -pipecat-ai==0.0.30 faster-whisper==1.2.1 -kokoro==0.3.0 +kokoro==0.3.5 +misaki==0.7.17 silero-vad==5.1 twilio==9.0.0 -anthropic==0.32.0 -openai>=1.0.0 +anthropic>=0.32.0 websockets==12.0 pydantic==2.6.0 pydantic-settings==2.2.0 python-dotenv==1.0.0 python-multipart==0.0.9 httpx==0.27.0 -numpy==1.26.4 +numpy>=1.26.4 +torch>=2.0.0 +ordered-set +pypinyin +cn2an +jieba diff --git a/packages/services/voice-service/src/api/main.py b/packages/services/voice-service/src/api/main.py index f4a3be4..7a78050 100644 --- a/packages/services/voice-service/src/api/main.py +++ b/packages/services/voice-service/src/api/main.py @@ -106,13 +106,15 @@ def _load_models_sync(): # TTS try: - from ..tts.kokoro_service import KokoroTTSService + from ..tts.kokoro_service import KokoroTTSService, _patch_misaki_compat + + _patch_misaki_compat() from kokoro import KPipeline tts = KokoroTTSService(model=settings.kokoro_model, voice=settings.kokoro_voice) tts._pipeline = KPipeline(lang_code='z') app.state.tts = tts - _p(f"[bg] TTS loaded: {settings.kokoro_model}") + _p(f"[bg] TTS loaded: {settings.kokoro_model} voice={settings.kokoro_voice}") except Exception as e: app.state.tts = None _p(f"[bg] WARNING: TTS failed: {e}") diff --git a/packages/services/voice-service/src/api/session_router.py b/packages/services/voice-service/src/api/session_router.py index 6915903..16380ca 100644 --- a/packages/services/voice-service/src/api/session_router.py +++ b/packages/services/voice-service/src/api/session_router.py @@ -10,7 +10,6 @@ from pydantic import BaseModel from typing import Optional from ..config.settings import settings -from ..pipeline.app_transport import AppTransport from ..pipeline.base_pipeline import create_voice_pipeline logger = logging.getLogger(__name__) @@ -213,9 +212,6 @@ async def voice_websocket(websocket: WebSocket, session_id: str): json.dumps({"type": "session.resumed", "session_id": session_id}) ) - # Create the AppTransport from the websocket connection - transport = AppTransport(websocket) - # Build the session context from stored session data session_context = { "session_id": session_id, @@ -223,9 +219,9 @@ async def voice_websocket(websocket: WebSocket, session_id: str): "agent_context": session.get("agent_context", {}), } - # Create the Pipecat voice pipeline using shared services from app.state + # Create the voice pipeline using the WebSocket directly task = await create_voice_pipeline( - transport, + websocket, session_context, stt=getattr(app.state, "stt", None), tts=getattr(app.state, "tts", None), diff --git a/packages/services/voice-service/src/config/settings.py b/packages/services/voice-service/src/config/settings.py index af403e4..c231d2e 100644 --- a/packages/services/voice-service/src/config/settings.py +++ b/packages/services/voice-service/src/config/settings.py @@ -22,7 +22,7 @@ class Settings(BaseSettings): # TTS (Kokoro) kokoro_model: str = "kokoro-82m" - kokoro_voice: str = "zh_female_1" + kokoro_voice: str = "zf_xiaoxiao" # Device (cpu or cuda) device: str = "cpu" diff --git a/packages/services/voice-service/src/pipeline/app_transport.py b/packages/services/voice-service/src/pipeline/app_transport.py index cf86657..dff9a39 100644 --- a/packages/services/voice-service/src/pipeline/app_transport.py +++ b/packages/services/voice-service/src/pipeline/app_transport.py @@ -1,31 +1,2 @@ -""" -Flutter App WebSocket audio transport. - -- Input: PCM 16kHz 16bit mono (Flutter recording format) -- Output: PCM 16kHz 16bit mono (Flutter playback format) -""" - -from pipecat.transports.network.websocket_server import WebsocketServerTransport, WebsocketServerParams - - -class AppTransport: - """WebSocket transport for Flutter App audio streaming.""" - - def __init__(self, websocket): - self.websocket = websocket - self.sample_rate = 16000 - self._transport = WebsocketServerTransport( - websocket, - params=WebsocketServerParams( - audio_in_sample_rate=16000, - audio_out_sample_rate=16000, - ), - ) - - def input(self): - """Audio input processor — receives PCM 16kHz 16bit mono from Flutter.""" - return self._transport.input() - - def output(self): - """Audio output processor — sends PCM 16kHz 16bit mono to Flutter.""" - return self._transport.output() +# This module is no longer used. +# Audio transport is now handled directly in base_pipeline.py via FastAPI WebSocket. diff --git a/packages/services/voice-service/src/pipeline/base_pipeline.py b/packages/services/voice-service/src/pipeline/base_pipeline.py index 3ff6e86..bfeea3a 100644 --- a/packages/services/voice-service/src/pipeline/base_pipeline.py +++ b/packages/services/voice-service/src/pipeline/base_pipeline.py @@ -1,69 +1,324 @@ """ -Pipecat Pipeline core -- Voice dialogue pipeline definition. +Voice dialogue pipeline — direct WebSocket audio I/O. -Pipeline: Audio Input -> VAD -> STT -> LLM -> TTS -> Audio Output -- Supports interruption (barge-in) -- Supports tool_use forwarding to agent-service +Pipeline: Audio Input → VAD → STT → LLM → TTS → Audio Output + +Runs as an async task that reads binary PCM frames from a FastAPI WebSocket, +detects speech with VAD, transcribes with STT, generates a response via +Claude LLM, synthesizes speech with TTS, and sends audio back. """ -from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.task import PipelineTask -from pipecat.services.anthropic import AnthropicLLMService +import asyncio +import logging +import time +from typing import Optional + +import anthropic +import numpy as np +from fastapi import WebSocket from ..config.settings import settings from ..stt.whisper_service import WhisperSTTService from ..tts.kokoro_service import KokoroTTSService from ..vad.silero_service import SileroVADService +logger = logging.getLogger(__name__) -async def create_voice_pipeline(transport, session_context, *, stt=None, tts=None, vad=None): - """ - Create a Pipecat voice dialogue pipeline. +# Minimum speech duration in seconds before we transcribe +_MIN_SPEECH_SECS = 0.5 +# Silence duration in seconds after speech ends before we process +_SILENCE_AFTER_SPEECH_SECS = 0.8 +# Sample rate +_SAMPLE_RATE = 16000 +# Bytes per sample (16-bit PCM mono) +_BYTES_PER_SAMPLE = 2 +# VAD chunk size (512 samples = 32ms at 16kHz, Silero expects this) +_VAD_CHUNK_SAMPLES = 512 +_VAD_CHUNK_BYTES = _VAD_CHUNK_SAMPLES * _BYTES_PER_SAMPLE +# Max audio output chunk size sent over WebSocket (4KB) +_WS_AUDIO_CHUNK = 4096 + + +class VoicePipelineTask: + """Async voice pipeline that bridges a FastAPI WebSocket to STT/LLM/TTS.""" + + def __init__( + self, + websocket: WebSocket, + session_context: dict, + *, + stt: Optional[WhisperSTTService] = None, + tts: Optional[KokoroTTSService] = None, + vad: Optional[SileroVADService] = None, + ): + self.websocket = websocket + self.session_context = session_context + self.stt = stt + self.tts = tts + self.vad = vad + + self._conversation: list[dict] = [ + { + "role": "user", + "content": ( + "You are iAgent, an AI voice assistant for IT operations. " + "Respond concisely in Chinese. Keep answers under 2 sentences " + "when possible. You are in a real-time voice conversation." + ), + }, + { + "role": "assistant", + "content": "好的,我是 iAgent 智能运维语音助手。有什么可以帮您的?", + }, + ] + self._cancelled = False + self._speaking = False # True while sending TTS audio to client + + async def run(self): + """Main loop: read audio → VAD → STT → LLM → TTS → send audio.""" + logger.info("Voice pipeline started for session %s", self.session_context.get("session_id")) + + # Audio buffer for accumulating speech + speech_buffer = bytearray() + vad_buffer = bytearray() # accumulates until _VAD_CHUNK_BYTES + is_speech_active = False + silence_start: Optional[float] = None + speech_start: Optional[float] = None + + try: + while not self._cancelled: + try: + data = await asyncio.wait_for( + self.websocket.receive_bytes(), timeout=30.0 + ) + except asyncio.TimeoutError: + # No data for 30s — connection might be dead + continue + except Exception: + # WebSocket closed + break + + # Accumulate into VAD-sized chunks + vad_buffer.extend(data) + + while len(vad_buffer) >= _VAD_CHUNK_BYTES: + chunk = bytes(vad_buffer[:_VAD_CHUNK_BYTES]) + del vad_buffer[:_VAD_CHUNK_BYTES] + + # Run VAD + has_speech = self._detect_speech(chunk) + + if has_speech: + if not is_speech_active: + is_speech_active = True + speech_start = time.time() + silence_start = None + # Barge-in: if we were speaking TTS, stop + if self._speaking: + self._cancelled_tts = True + logger.debug("Barge-in detected") + + speech_buffer.extend(chunk) + silence_start = None + else: + if is_speech_active: + # Still accumulate a bit during silence gap + speech_buffer.extend(chunk) + + if silence_start is None: + silence_start = time.time() + elif time.time() - silence_start >= _SILENCE_AFTER_SPEECH_SECS: + # Silence detected after speech — process + speech_duration = time.time() - (speech_start or time.time()) + if speech_duration >= _MIN_SPEECH_SECS and len(speech_buffer) > 0: + await self._process_speech(bytes(speech_buffer)) + + # Reset + speech_buffer.clear() + is_speech_active = False + silence_start = None + speech_start = None + + except asyncio.CancelledError: + logger.info("Voice pipeline cancelled") + except Exception as exc: + logger.exception("Voice pipeline error: %s", exc) + finally: + logger.info("Voice pipeline ended for session %s", self.session_context.get("session_id")) + + def cancel(self): + self._cancelled = True + + def _detect_speech(self, chunk: bytes) -> bool: + """Run VAD on a single chunk. Returns True if speech detected.""" + if self.vad is None or self.vad._model is None: + # No VAD — treat everything as speech + return True + try: + return self.vad.detect_speech(chunk) + except Exception as exc: + logger.debug("VAD error: %s", exc) + return True # Assume speech on error + + async def _process_speech(self, audio_data: bytes): + """Transcribe speech, generate LLM response, synthesize and send TTS.""" + session_id = self.session_context.get("session_id", "?") + + # 1. STT + text = await self._transcribe(audio_data) + if not text or not text.strip(): + logger.debug("[%s] STT returned empty text, skipping", session_id) + return + + logger.info("[%s] User said: %s", session_id, text.strip()) + + # Notify client that we heard them + try: + import json + await self.websocket.send_text( + json.dumps({"type": "transcript", "text": text.strip(), "role": "user"}) + ) + except Exception: + pass + + # 2. LLM + self._conversation.append({"role": "user", "content": text.strip()}) + response_text = await self._llm_generate() + if not response_text: + logger.warning("[%s] LLM returned empty response", session_id) + return + + logger.info("[%s] Agent says: %s", session_id, response_text) + self._conversation.append({"role": "assistant", "content": response_text}) + + # Notify client of the response text + try: + import json + await self.websocket.send_text( + json.dumps({"type": "transcript", "text": response_text, "role": "assistant"}) + ) + except Exception: + pass + + # 3. TTS → send audio back + await self._synthesize_and_send(response_text) + + async def _transcribe(self, audio_data: bytes) -> str: + """Transcribe audio using STT service.""" + if self.stt is None or self.stt._model is None: + logger.warning("STT not available") + return "" + try: + return await self.stt.transcribe(audio_data) + except Exception as exc: + logger.error("STT error: %s", exc) + return "" + + async def _llm_generate(self) -> str: + """Generate a response using Anthropic Claude.""" + if not settings.anthropic_api_key: + logger.warning("Anthropic API key not set, returning default response") + return "抱歉,语音助手暂时无法连接到AI服务。" + + try: + client = anthropic.AsyncAnthropic(api_key=settings.anthropic_api_key) + response = await client.messages.create( + model=settings.claude_model, + max_tokens=256, + messages=self._conversation, + ) + return response.content[0].text if response.content else "" + except Exception as exc: + logger.error("LLM error: %s", exc) + return "抱歉,AI服务暂时不可用,请稍后再试。" + + async def _synthesize_and_send(self, text: str): + """Synthesize text to speech and send audio chunks over WebSocket.""" + self._speaking = True + self._cancelled_tts = False + + try: + if self.tts is None or self.tts._pipeline is None: + logger.warning("TTS not available, skipping audio response") + return + + # Run TTS (CPU-bound) in a thread + audio_bytes = await asyncio.get_event_loop().run_in_executor( + None, self._tts_sync, text + ) + + if not audio_bytes or self._cancelled_tts: + return + + # Send audio in chunks + offset = 0 + while offset < len(audio_bytes) and not self._cancelled_tts: + end = min(offset + _WS_AUDIO_CHUNK, len(audio_bytes)) + try: + await self.websocket.send_bytes(audio_bytes[offset:end]) + except Exception: + break + offset = end + # Small yield to not starve the event loop + await asyncio.sleep(0.01) + + except Exception as exc: + logger.error("TTS/send error: %s", exc) + finally: + self._speaking = False + + def _tts_sync(self, text: str) -> bytes: + """Synchronous TTS synthesis (runs in thread pool).""" + try: + samples = [] + for _, _, audio in self.tts._pipeline(text, voice=self.tts.voice): + samples.append(audio) + + if not samples: + return b"" + + audio_np = np.concatenate(samples) + # Kokoro outputs at 24kHz, we need 16kHz + # Resample using linear interpolation + if len(audio_np) > 0: + original_rate = 24000 + target_rate = _SAMPLE_RATE + duration = len(audio_np) / original_rate + target_samples = int(duration * target_rate) + indices = np.linspace(0, len(audio_np) - 1, target_samples) + resampled = np.interp(indices, np.arange(len(audio_np)), audio_np) + return (resampled * 32768).clip(-32768, 32767).astype(np.int16).tobytes() + + return (audio_np * 32768).clip(-32768, 32767).astype(np.int16).tobytes() + except Exception as exc: + logger.error("TTS synthesis error: %s", exc) + return b"" + + +async def create_voice_pipeline( + websocket: WebSocket, + session_context: dict, + *, + stt=None, + tts=None, + vad=None, +) -> VoicePipelineTask: + """Create a voice pipeline task for the given WebSocket connection. Args: - transport: AppTransport (Flutter WebSocket) or TwilioTransport - session_context: Dialogue context (standing order info, server info, etc.) - stt: Optional pre-initialized STT service (uses app.state if not provided) - tts: Optional pre-initialized TTS service (uses app.state if not provided) - vad: Optional pre-initialized VAD service (uses app.state if not provided) + websocket: FastAPI WebSocket connection (already accepted) + session_context: Session metadata dict + stt: Pre-initialized STT service + tts: Pre-initialized TTS service + vad: Pre-initialized VAD service Returns: - PipelineTask with interruption support + VoicePipelineTask ready to run """ - # Use provided services or create defaults from settings - if stt is None: - stt = WhisperSTTService( - model=settings.whisper_model, - device=settings.whisper_device, - language=settings.whisper_language, - ) - await stt.initialize() - - if tts is None: - tts = KokoroTTSService( - model=settings.kokoro_model, - voice=settings.kokoro_voice, - ) - await tts.initialize() - - if vad is None: - vad = SileroVADService() - await vad.initialize() - - # LLM service (Anthropic Claude) - llm = AnthropicLLMService( - api_key=settings.anthropic_api_key, - model=settings.claude_model, + return VoicePipelineTask( + websocket, + session_context, + stt=stt, + tts=tts, + vad=vad, ) - - # Build the pipeline: input -> VAD -> STT -> LLM -> TTS -> output - pipeline = Pipeline([ - transport.input(), - vad, - stt, - llm, - tts, - transport.output(), - ]) - - return PipelineTask(pipeline, allow_interruptions=True) diff --git a/packages/services/voice-service/src/tts/kokoro_service.py b/packages/services/voice-service/src/tts/kokoro_service.py index 780ae71..805ef37 100644 --- a/packages/services/voice-service/src/tts/kokoro_service.py +++ b/packages/services/voice-service/src/tts/kokoro_service.py @@ -2,34 +2,48 @@ Kokoro-82M TTS service configuration. Model: Kokoro-82M (Chinese + English bilingual) -Voice: zh_female_1 +Voice: zf_xiaoxiao (Chinese female) """ import numpy as np +def _patch_misaki_compat(): + """Patch misaki.en compatibility: MutableToken was renamed to MToken.""" + try: + import misaki.en as en + if not hasattr(en, 'MutableToken') and hasattr(en, 'MToken'): + en.MutableToken = en.MToken + except ImportError: + pass + + class KokoroTTSService: """Text-to-Speech service using Kokoro-82M.""" - def __init__(self, model: str = "kokoro-82m", voice: str = "zh_female_1"): + def __init__(self, model: str = "kokoro-82m", voice: str = "zf_xiaoxiao"): self.model_name = model self.voice = voice self._pipeline = None async def initialize(self): """Load the Kokoro TTS model.""" + _patch_misaki_compat() from kokoro import KPipeline self._pipeline = KPipeline(lang_code='z') # Chinese async def synthesize(self, text: str) -> bytes: - """Convert text to speech audio.""" + """Convert text to speech audio (24kHz float32 → 16-bit PCM).""" samples = [] for _, _, audio in self._pipeline(text, voice=self.voice): - samples.append(audio) + if hasattr(audio, 'numpy'): + samples.append(audio.numpy()) + else: + samples.append(audio) if not samples: return b'' audio_np = np.concatenate(samples) - return (audio_np * 32768).astype(np.int16).tobytes() + return (audio_np * 32768).clip(-32768, 32767).astype(np.int16).tobytes()