fix: rewrite voice pipeline for direct WebSocket I/O, fix TTS and navigation

Root cause: Pipecat's WebsocketServerTransport creates its own WebSocket
server on (host,port) and expects FrameProcessor subclasses. Our code was
passing a FastAPI WebSocket object as 'host' and using plain STT/TTS/VAD
service classes that aren't FrameProcessors. The pipeline crashed immediately
when receiving audio, causing "disconnects when speaking".

Changes:
- **base_pipeline.py**: Complete rewrite — replaced Pipecat Pipeline with
  direct async loop: WebSocket → VAD → STT → Claude LLM → TTS → WebSocket.
  Supports barge-in (interrupt TTS when user speaks), audio chunking, and
  24kHz→16kHz TTS resampling.
- **session_router.py**: Pass WebSocket directly to pipeline instead of
  wrapping in AppTransport.
- **app_transport.py**: Deprecated (no longer needed).
- **kokoro_service.py**: Fix misaki compatibility (MutableToken→MToken
  rename), use correct Chinese voice 'zf_xiaoxiao', handle torch tensors.
- **main.py**: Apply misaki monkey-patch before importing kokoro.
- **settings.py**: Change default TTS voice from 'zh_female_1' (non-existent)
  to 'zf_xiaoxiao' (valid Kokoro-82M Chinese female voice).
- **requirements.txt**: Remove pipecat-ai dependency, pin kokoro==0.3.5 +
  misaki==0.7.17, add Chinese NLP deps (pypinyin, cn2an, jieba, ordered-set).
- **agent_call_page.dart**: Wrap each cleanup step in try/catch to ensure
  Navigator.pop() always executes after call ends. Add 3s timeout on session
  delete request.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-02-23 23:34:35 -08:00
parent 6cd53e713c
commit 7afbd54fce
8 changed files with 361 additions and 114 deletions

View File

@ -389,32 +389,37 @@ class _AgentCallPageState extends ConsumerState<AgentCallPage>
_reconnectTimer?.cancel();
_reconnectTimer = null;
// Stop mic
await _micSubscription?.cancel();
_micSubscription = null;
// Cleanup all resources each wrapped in try/catch to ensure we always
// reach Navigator.pop() at the end.
try {
await _micSubscription?.cancel();
_micSubscription = null;
} catch (_) {}
try {
await _recorder.stop();
} catch (_) {}
// Stop playback
await _pcmPlayer.dispose();
// Close WebSocket
_audioSubscription?.cancel();
try {
await _pcmPlayer.dispose();
} catch (_) {}
try {
_audioSubscription?.cancel();
} catch (_) {}
try {
await _audioChannel?.sink.close();
} catch (_) {}
// Delete voice session on the server
// Delete voice session on the server (fire-and-forget)
if (_sessionId != null) {
try {
final dio = ref.read(dioClientProvider);
await dio.delete('${ApiEndpoints.voice}/sessions/$_sessionId');
await dio.delete('${ApiEndpoints.voice}/sessions/$_sessionId')
.timeout(const Duration(seconds: 3));
} catch (_) {}
}
// Navigate back to the dial page
if (mounted) {
await Future.delayed(const Duration(seconds: 1));
await Future.delayed(const Duration(milliseconds: 500));
if (mounted) Navigator.of(context).pop();
}
}

View File

@ -1,16 +1,20 @@
fastapi==0.110.0
uvicorn==0.29.0
pipecat-ai==0.0.30
faster-whisper==1.2.1
kokoro==0.3.0
kokoro==0.3.5
misaki==0.7.17
silero-vad==5.1
twilio==9.0.0
anthropic==0.32.0
openai>=1.0.0
anthropic>=0.32.0
websockets==12.0
pydantic==2.6.0
pydantic-settings==2.2.0
python-dotenv==1.0.0
python-multipart==0.0.9
httpx==0.27.0
numpy==1.26.4
numpy>=1.26.4
torch>=2.0.0
ordered-set
pypinyin
cn2an
jieba

View File

@ -106,13 +106,15 @@ def _load_models_sync():
# TTS
try:
from ..tts.kokoro_service import KokoroTTSService
from ..tts.kokoro_service import KokoroTTSService, _patch_misaki_compat
_patch_misaki_compat()
from kokoro import KPipeline
tts = KokoroTTSService(model=settings.kokoro_model, voice=settings.kokoro_voice)
tts._pipeline = KPipeline(lang_code='z')
app.state.tts = tts
_p(f"[bg] TTS loaded: {settings.kokoro_model}")
_p(f"[bg] TTS loaded: {settings.kokoro_model} voice={settings.kokoro_voice}")
except Exception as e:
app.state.tts = None
_p(f"[bg] WARNING: TTS failed: {e}")

View File

@ -10,7 +10,6 @@ from pydantic import BaseModel
from typing import Optional
from ..config.settings import settings
from ..pipeline.app_transport import AppTransport
from ..pipeline.base_pipeline import create_voice_pipeline
logger = logging.getLogger(__name__)
@ -213,9 +212,6 @@ async def voice_websocket(websocket: WebSocket, session_id: str):
json.dumps({"type": "session.resumed", "session_id": session_id})
)
# Create the AppTransport from the websocket connection
transport = AppTransport(websocket)
# Build the session context from stored session data
session_context = {
"session_id": session_id,
@ -223,9 +219,9 @@ async def voice_websocket(websocket: WebSocket, session_id: str):
"agent_context": session.get("agent_context", {}),
}
# Create the Pipecat voice pipeline using shared services from app.state
# Create the voice pipeline using the WebSocket directly
task = await create_voice_pipeline(
transport,
websocket,
session_context,
stt=getattr(app.state, "stt", None),
tts=getattr(app.state, "tts", None),

View File

@ -22,7 +22,7 @@ class Settings(BaseSettings):
# TTS (Kokoro)
kokoro_model: str = "kokoro-82m"
kokoro_voice: str = "zh_female_1"
kokoro_voice: str = "zf_xiaoxiao"
# Device (cpu or cuda)
device: str = "cpu"

View File

@ -1,31 +1,2 @@
"""
Flutter App WebSocket audio transport.
- Input: PCM 16kHz 16bit mono (Flutter recording format)
- Output: PCM 16kHz 16bit mono (Flutter playback format)
"""
from pipecat.transports.network.websocket_server import WebsocketServerTransport, WebsocketServerParams
class AppTransport:
"""WebSocket transport for Flutter App audio streaming."""
def __init__(self, websocket):
self.websocket = websocket
self.sample_rate = 16000
self._transport = WebsocketServerTransport(
websocket,
params=WebsocketServerParams(
audio_in_sample_rate=16000,
audio_out_sample_rate=16000,
),
)
def input(self):
"""Audio input processor — receives PCM 16kHz 16bit mono from Flutter."""
return self._transport.input()
def output(self):
"""Audio output processor — sends PCM 16kHz 16bit mono to Flutter."""
return self._transport.output()
# This module is no longer used.
# Audio transport is now handled directly in base_pipeline.py via FastAPI WebSocket.

View File

@ -1,69 +1,324 @@
"""
Pipecat Pipeline core -- Voice dialogue pipeline definition.
Voice dialogue pipeline direct WebSocket audio I/O.
Pipeline: Audio Input -> VAD -> STT -> LLM -> TTS -> Audio Output
- Supports interruption (barge-in)
- Supports tool_use forwarding to agent-service
Pipeline: Audio Input VAD STT LLM TTS Audio Output
Runs as an async task that reads binary PCM frames from a FastAPI WebSocket,
detects speech with VAD, transcribes with STT, generates a response via
Claude LLM, synthesizes speech with TTS, and sends audio back.
"""
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.services.anthropic import AnthropicLLMService
import asyncio
import logging
import time
from typing import Optional
import anthropic
import numpy as np
from fastapi import WebSocket
from ..config.settings import settings
from ..stt.whisper_service import WhisperSTTService
from ..tts.kokoro_service import KokoroTTSService
from ..vad.silero_service import SileroVADService
logger = logging.getLogger(__name__)
async def create_voice_pipeline(transport, session_context, *, stt=None, tts=None, vad=None):
"""
Create a Pipecat voice dialogue pipeline.
# Minimum speech duration in seconds before we transcribe
_MIN_SPEECH_SECS = 0.5
# Silence duration in seconds after speech ends before we process
_SILENCE_AFTER_SPEECH_SECS = 0.8
# Sample rate
_SAMPLE_RATE = 16000
# Bytes per sample (16-bit PCM mono)
_BYTES_PER_SAMPLE = 2
# VAD chunk size (512 samples = 32ms at 16kHz, Silero expects this)
_VAD_CHUNK_SAMPLES = 512
_VAD_CHUNK_BYTES = _VAD_CHUNK_SAMPLES * _BYTES_PER_SAMPLE
# Max audio output chunk size sent over WebSocket (4KB)
_WS_AUDIO_CHUNK = 4096
class VoicePipelineTask:
"""Async voice pipeline that bridges a FastAPI WebSocket to STT/LLM/TTS."""
def __init__(
self,
websocket: WebSocket,
session_context: dict,
*,
stt: Optional[WhisperSTTService] = None,
tts: Optional[KokoroTTSService] = None,
vad: Optional[SileroVADService] = None,
):
self.websocket = websocket
self.session_context = session_context
self.stt = stt
self.tts = tts
self.vad = vad
self._conversation: list[dict] = [
{
"role": "user",
"content": (
"You are iAgent, an AI voice assistant for IT operations. "
"Respond concisely in Chinese. Keep answers under 2 sentences "
"when possible. You are in a real-time voice conversation."
),
},
{
"role": "assistant",
"content": "好的,我是 iAgent 智能运维语音助手。有什么可以帮您的?",
},
]
self._cancelled = False
self._speaking = False # True while sending TTS audio to client
async def run(self):
"""Main loop: read audio → VAD → STT → LLM → TTS → send audio."""
logger.info("Voice pipeline started for session %s", self.session_context.get("session_id"))
# Audio buffer for accumulating speech
speech_buffer = bytearray()
vad_buffer = bytearray() # accumulates until _VAD_CHUNK_BYTES
is_speech_active = False
silence_start: Optional[float] = None
speech_start: Optional[float] = None
try:
while not self._cancelled:
try:
data = await asyncio.wait_for(
self.websocket.receive_bytes(), timeout=30.0
)
except asyncio.TimeoutError:
# No data for 30s — connection might be dead
continue
except Exception:
# WebSocket closed
break
# Accumulate into VAD-sized chunks
vad_buffer.extend(data)
while len(vad_buffer) >= _VAD_CHUNK_BYTES:
chunk = bytes(vad_buffer[:_VAD_CHUNK_BYTES])
del vad_buffer[:_VAD_CHUNK_BYTES]
# Run VAD
has_speech = self._detect_speech(chunk)
if has_speech:
if not is_speech_active:
is_speech_active = True
speech_start = time.time()
silence_start = None
# Barge-in: if we were speaking TTS, stop
if self._speaking:
self._cancelled_tts = True
logger.debug("Barge-in detected")
speech_buffer.extend(chunk)
silence_start = None
else:
if is_speech_active:
# Still accumulate a bit during silence gap
speech_buffer.extend(chunk)
if silence_start is None:
silence_start = time.time()
elif time.time() - silence_start >= _SILENCE_AFTER_SPEECH_SECS:
# Silence detected after speech — process
speech_duration = time.time() - (speech_start or time.time())
if speech_duration >= _MIN_SPEECH_SECS and len(speech_buffer) > 0:
await self._process_speech(bytes(speech_buffer))
# Reset
speech_buffer.clear()
is_speech_active = False
silence_start = None
speech_start = None
except asyncio.CancelledError:
logger.info("Voice pipeline cancelled")
except Exception as exc:
logger.exception("Voice pipeline error: %s", exc)
finally:
logger.info("Voice pipeline ended for session %s", self.session_context.get("session_id"))
def cancel(self):
self._cancelled = True
def _detect_speech(self, chunk: bytes) -> bool:
"""Run VAD on a single chunk. Returns True if speech detected."""
if self.vad is None or self.vad._model is None:
# No VAD — treat everything as speech
return True
try:
return self.vad.detect_speech(chunk)
except Exception as exc:
logger.debug("VAD error: %s", exc)
return True # Assume speech on error
async def _process_speech(self, audio_data: bytes):
"""Transcribe speech, generate LLM response, synthesize and send TTS."""
session_id = self.session_context.get("session_id", "?")
# 1. STT
text = await self._transcribe(audio_data)
if not text or not text.strip():
logger.debug("[%s] STT returned empty text, skipping", session_id)
return
logger.info("[%s] User said: %s", session_id, text.strip())
# Notify client that we heard them
try:
import json
await self.websocket.send_text(
json.dumps({"type": "transcript", "text": text.strip(), "role": "user"})
)
except Exception:
pass
# 2. LLM
self._conversation.append({"role": "user", "content": text.strip()})
response_text = await self._llm_generate()
if not response_text:
logger.warning("[%s] LLM returned empty response", session_id)
return
logger.info("[%s] Agent says: %s", session_id, response_text)
self._conversation.append({"role": "assistant", "content": response_text})
# Notify client of the response text
try:
import json
await self.websocket.send_text(
json.dumps({"type": "transcript", "text": response_text, "role": "assistant"})
)
except Exception:
pass
# 3. TTS → send audio back
await self._synthesize_and_send(response_text)
async def _transcribe(self, audio_data: bytes) -> str:
"""Transcribe audio using STT service."""
if self.stt is None or self.stt._model is None:
logger.warning("STT not available")
return ""
try:
return await self.stt.transcribe(audio_data)
except Exception as exc:
logger.error("STT error: %s", exc)
return ""
async def _llm_generate(self) -> str:
"""Generate a response using Anthropic Claude."""
if not settings.anthropic_api_key:
logger.warning("Anthropic API key not set, returning default response")
return "抱歉语音助手暂时无法连接到AI服务。"
try:
client = anthropic.AsyncAnthropic(api_key=settings.anthropic_api_key)
response = await client.messages.create(
model=settings.claude_model,
max_tokens=256,
messages=self._conversation,
)
return response.content[0].text if response.content else ""
except Exception as exc:
logger.error("LLM error: %s", exc)
return "抱歉AI服务暂时不可用请稍后再试。"
async def _synthesize_and_send(self, text: str):
"""Synthesize text to speech and send audio chunks over WebSocket."""
self._speaking = True
self._cancelled_tts = False
try:
if self.tts is None or self.tts._pipeline is None:
logger.warning("TTS not available, skipping audio response")
return
# Run TTS (CPU-bound) in a thread
audio_bytes = await asyncio.get_event_loop().run_in_executor(
None, self._tts_sync, text
)
if not audio_bytes or self._cancelled_tts:
return
# Send audio in chunks
offset = 0
while offset < len(audio_bytes) and not self._cancelled_tts:
end = min(offset + _WS_AUDIO_CHUNK, len(audio_bytes))
try:
await self.websocket.send_bytes(audio_bytes[offset:end])
except Exception:
break
offset = end
# Small yield to not starve the event loop
await asyncio.sleep(0.01)
except Exception as exc:
logger.error("TTS/send error: %s", exc)
finally:
self._speaking = False
def _tts_sync(self, text: str) -> bytes:
"""Synchronous TTS synthesis (runs in thread pool)."""
try:
samples = []
for _, _, audio in self.tts._pipeline(text, voice=self.tts.voice):
samples.append(audio)
if not samples:
return b""
audio_np = np.concatenate(samples)
# Kokoro outputs at 24kHz, we need 16kHz
# Resample using linear interpolation
if len(audio_np) > 0:
original_rate = 24000
target_rate = _SAMPLE_RATE
duration = len(audio_np) / original_rate
target_samples = int(duration * target_rate)
indices = np.linspace(0, len(audio_np) - 1, target_samples)
resampled = np.interp(indices, np.arange(len(audio_np)), audio_np)
return (resampled * 32768).clip(-32768, 32767).astype(np.int16).tobytes()
return (audio_np * 32768).clip(-32768, 32767).astype(np.int16).tobytes()
except Exception as exc:
logger.error("TTS synthesis error: %s", exc)
return b""
async def create_voice_pipeline(
websocket: WebSocket,
session_context: dict,
*,
stt=None,
tts=None,
vad=None,
) -> VoicePipelineTask:
"""Create a voice pipeline task for the given WebSocket connection.
Args:
transport: AppTransport (Flutter WebSocket) or TwilioTransport
session_context: Dialogue context (standing order info, server info, etc.)
stt: Optional pre-initialized STT service (uses app.state if not provided)
tts: Optional pre-initialized TTS service (uses app.state if not provided)
vad: Optional pre-initialized VAD service (uses app.state if not provided)
websocket: FastAPI WebSocket connection (already accepted)
session_context: Session metadata dict
stt: Pre-initialized STT service
tts: Pre-initialized TTS service
vad: Pre-initialized VAD service
Returns:
PipelineTask with interruption support
VoicePipelineTask ready to run
"""
# Use provided services or create defaults from settings
if stt is None:
stt = WhisperSTTService(
model=settings.whisper_model,
device=settings.whisper_device,
language=settings.whisper_language,
)
await stt.initialize()
if tts is None:
tts = KokoroTTSService(
model=settings.kokoro_model,
voice=settings.kokoro_voice,
)
await tts.initialize()
if vad is None:
vad = SileroVADService()
await vad.initialize()
# LLM service (Anthropic Claude)
llm = AnthropicLLMService(
api_key=settings.anthropic_api_key,
model=settings.claude_model,
return VoicePipelineTask(
websocket,
session_context,
stt=stt,
tts=tts,
vad=vad,
)
# Build the pipeline: input -> VAD -> STT -> LLM -> TTS -> output
pipeline = Pipeline([
transport.input(),
vad,
stt,
llm,
tts,
transport.output(),
])
return PipelineTask(pipeline, allow_interruptions=True)

View File

@ -2,34 +2,48 @@
Kokoro-82M TTS service configuration.
Model: Kokoro-82M (Chinese + English bilingual)
Voice: zh_female_1
Voice: zf_xiaoxiao (Chinese female)
"""
import numpy as np
def _patch_misaki_compat():
"""Patch misaki.en compatibility: MutableToken was renamed to MToken."""
try:
import misaki.en as en
if not hasattr(en, 'MutableToken') and hasattr(en, 'MToken'):
en.MutableToken = en.MToken
except ImportError:
pass
class KokoroTTSService:
"""Text-to-Speech service using Kokoro-82M."""
def __init__(self, model: str = "kokoro-82m", voice: str = "zh_female_1"):
def __init__(self, model: str = "kokoro-82m", voice: str = "zf_xiaoxiao"):
self.model_name = model
self.voice = voice
self._pipeline = None
async def initialize(self):
"""Load the Kokoro TTS model."""
_patch_misaki_compat()
from kokoro import KPipeline
self._pipeline = KPipeline(lang_code='z') # Chinese
async def synthesize(self, text: str) -> bytes:
"""Convert text to speech audio."""
"""Convert text to speech audio (24kHz float32 → 16-bit PCM)."""
samples = []
for _, _, audio in self._pipeline(text, voice=self.voice):
samples.append(audio)
if hasattr(audio, 'numpy'):
samples.append(audio.numpy())
else:
samples.append(audio)
if not samples:
return b''
audio_np = np.concatenate(samples)
return (audio_np * 32768).astype(np.int16).tobytes()
return (audio_np * 32768).clip(-32768, 32767).astype(np.int16).tobytes()