""" Speechmatics STT factory for voice-agent. Creates a livekit-plugins-speechmatics STT instance configured for Mandarin recognition with speaker diarization support. The SPEECHMATICS_API_KEY environment variable is read automatically by the livekit-plugins-speechmatics package. """ import asyncio import logging import time from livekit.agents import stt, utils from livekit.plugins.speechmatics import STT, TurnDetectionMode from livekit.plugins.speechmatics.stt import SpeechStream logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Monkey-patch: auto-finalize partial transcripts after silence. # # In EXTERNAL turn-detection mode, the Speechmatics server never emits # AddSegment (FINAL_TRANSCRIPT) on its own. The LiveKit agents framework # does NOT call stream.flush() — it only pushes silence audio and waits # for FINAL_TRANSCRIPT events. So no FlushSentinel ever reaches the # stream's _process_audio loop. # # Fix: each partial transcript restarts a debounce timer. When partials # stop arriving (user stops speaking), the timer fires and promotes the # last partial to FINAL_TRANSCRIPT. A cooldown prevents duplicate finals. # --------------------------------------------------------------------------- _FINALIZE_DELAY = 0.4 # seconds after last partial before emitting FINAL _COOLDOWN = 1.5 # seconds after a FINAL before allowing another _original_handle_partial_segment = SpeechStream._handle_partial_segment async def _auto_finalize(stream: SpeechStream) -> None: """Wait, then promote stored partials to FINAL_TRANSCRIPT.""" try: await asyncio.sleep(_FINALIZE_DELAY) stored = getattr(stream, "_sm_last_partial_segments", []) if not stored: return # Cooldown: skip if we recently emitted a FINAL last_final_ts = getattr(stream, "_sm_last_final_ts", 0.0) if time.monotonic() - last_final_ts < _COOLDOWN: return text = " | ".join(s.get("text", "") for s in stored) logger.info("[SM] auto-finalize: promoting %d segment(s) to FINAL: %s", len(stored), text[:120]) stream._send_frames(stored, is_final=True) stream._sm_last_partial_segments = [] # type: ignore[attr-defined] stream._sm_last_final_ts = time.monotonic() # type: ignore[attr-defined] except asyncio.CancelledError: pass def _patched_handle_partial_segment(self: SpeechStream, message: dict) -> None: # type: ignore[override] """Intercept partial segments, stash them, and reset the finalize timer.""" segments = message.get("segments", []) if segments: self._sm_last_partial_segments = segments # type: ignore[attr-defined] # Cancel previous timer and start a new one timer = getattr(self, "_sm_finalize_timer", None) if timer and not timer.done(): timer.cancel() self._sm_finalize_timer = asyncio.create_task(_auto_finalize(self)) # type: ignore[attr-defined] _original_handle_partial_segment(self, message) SpeechStream._handle_partial_segment = _patched_handle_partial_segment # type: ignore[assignment] # Map Whisper language codes to Speechmatics language codes _LANG_MAP = { "zh": "cmn", "en": "en", "ja": "ja", "ko": "ko", "de": "de", "fr": "fr", } def create_speechmatics_stt(language: str = "cmn") -> STT: """Create a Speechmatics STT instance for the voice pipeline. Args: language: Language code (Whisper or Speechmatics). Whisper codes like 'zh' are automatically mapped to Speechmatics equivalents. Returns: Configured speechmatics.STT instance with speaker diarization enabled. """ sm_lang = _LANG_MAP.get(language, language) stt = STT( language=sm_lang, include_partials=True, turn_detection_mode=TurnDetectionMode.EXTERNAL, enable_diarization=True, ) # Workaround: LiveKit's LanguageCode normalizes ISO 639-3 "cmn" back to # ISO 639-1 "zh", but Speechmatics expects "cmn". Override the internal # option after construction so the raw Speechmatics code is sent. stt._stt_options.language = sm_lang # type: ignore[assignment] logger.info("Speechmatics STT created: language=%s (input=%s), diarization=True", sm_lang, language) return stt