diff --git a/packages/services/voice-agent/src/plugins/speechmatics_stt.py b/packages/services/voice-agent/src/plugins/speechmatics_stt.py index 80785c2..dd788f8 100644 --- a/packages/services/voice-agent/src/plugins/speechmatics_stt.py +++ b/packages/services/voice-agent/src/plugins/speechmatics_stt.py @@ -17,76 +17,54 @@ from livekit.plugins.speechmatics.stt import SpeechStream logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- -# Monkey-patch SpeechStream so that a FlushSentinel (VAD end-of-speech) -# directly emits the last partial transcript as a FINAL_TRANSCRIPT. +# Monkey-patch: auto-finalize partial transcripts after silence. # -# In EXTERNAL turn-detection mode the Speechmatics server never sends -# AddSegment on its own — it relies on the client calling finalize(). -# The upstream plugin handles FlushSentinel by flushing the audio byte -# stream but never calls finalize(), so LiveKit never gets a final. +# In EXTERNAL turn-detection mode, the Speechmatics server never emits +# AddSegment (FINAL_TRANSCRIPT) on its own. The LiveKit agents framework +# does NOT call stream.flush() — it only pushes silence audio and waits +# for FINAL_TRANSCRIPT events. So no FlushSentinel ever reaches the +# stream's _process_audio loop. # -# VoiceAgentClient.finalize() IS the right call, but it schedules an -# async task chain that often loses the race against session teardown -# (user disconnects right after speaking). The reliable fix is to -# directly promote the latest partial segments to FINAL_TRANSCRIPT -# synchronously on the event channel. +# Fix: each partial transcript restarts a debounce timer. When partials +# stop arriving (user stops speaking), the timer fires and promotes the +# last partial to FINAL_TRANSCRIPT. The 700ms delay balances latency +# vs. avoiding mid-sentence finals. # --------------------------------------------------------------------------- +_FINALIZE_DELAY = 0.7 # seconds after last partial before emitting FINAL + _original_handle_partial_segment = SpeechStream._handle_partial_segment +async def _auto_finalize(stream: SpeechStream) -> None: + """Wait, then promote stored partials to FINAL_TRANSCRIPT.""" + try: + await asyncio.sleep(_FINALIZE_DELAY) + stored = getattr(stream, "_sm_last_partial_segments", []) + if stored: + text = " | ".join(s.get("text", "") for s in stored) + logger.info("[SM] auto-finalize: promoting %d segment(s) to FINAL: %s", len(stored), text[:120]) + stream._send_frames(stored, is_final=True) + stream._sm_last_partial_segments = [] # type: ignore[attr-defined] + except asyncio.CancelledError: + pass + + def _patched_handle_partial_segment(self: SpeechStream, message: dict) -> None: # type: ignore[override] - """Intercept partial segments and stash them for flush-time promotion.""" + """Intercept partial segments, stash them, and reset the finalize timer.""" segments = message.get("segments", []) if segments: - text = " | ".join(s.get("text", "") for s in segments) - logger.info("[SM-PATCH] stashing %d partial segment(s): %s", len(segments), text[:100]) self._sm_last_partial_segments = segments # type: ignore[attr-defined] + + # Cancel previous timer and start a new one + timer = getattr(self, "_sm_finalize_timer", None) + if timer and not timer.done(): + timer.cancel() + self._sm_finalize_timer = asyncio.create_task(_auto_finalize(self)) # type: ignore[attr-defined] + _original_handle_partial_segment(self, message) -_original_process_audio = SpeechStream._process_audio - - -async def _patched_process_audio(self: SpeechStream) -> None: # type: ignore[override] - logger.info("[SM-PATCH] _patched_process_audio STARTED") - self._sm_last_partial_segments: list = [] # type: ignore[attr-defined] - try: - audio_bstream = utils.audio.AudioByteStream( - sample_rate=self._stt._sample_rate, - num_channels=1, - ) - frame_count = 0 - async for data in self._input_ch: - if isinstance(data, self._FlushSentinel): - logger.info("[SM-PATCH] FlushSentinel received (after %d frames)", frame_count) - frames = audio_bstream.flush() - # Promote stored partials → FINAL_TRANSCRIPT immediately - stored = getattr(self, "_sm_last_partial_segments", []) - if stored: - logger.info( - "[SM-PATCH] promoting %d partial segment(s) to FINAL", - len(stored), - ) - self._send_frames(stored, is_final=True) - self._sm_last_partial_segments = [] # type: ignore[attr-defined] - else: - logger.warning("[SM-PATCH] FlushSentinel but no partial segments stored") - else: - frames = audio_bstream.write(data.data.tobytes()) - frame_count += 1 - - if self._client: - for frame in frames: - self._speech_duration += frame.duration - await self._client.send_audio(frame.data.tobytes()) - except asyncio.CancelledError: - logger.info("[SM-PATCH] _patched_process_audio cancelled (processed %d frames)", frame_count if 'frame_count' in dir() else -1) - except Exception as e: - logger.error("[SM-PATCH] _patched_process_audio ERROR: %s", e, exc_info=True) - - -SpeechStream._process_audio = _patched_process_audio # type: ignore[assignment] SpeechStream._handle_partial_segment = _patched_handle_partial_segment # type: ignore[assignment] # Map Whisper language codes to Speechmatics language codes