From 1431dc0c832dce279e70bec85ebc62e5ae3c8ad1 Mon Sep 17 00:00:00 2001 From: hailin Date: Tue, 3 Mar 2026 02:16:46 -0800 Subject: [PATCH] fix: directly promote partial transcripts to FINAL on FlushSentinel VoiceAgentClient.finalize() schedules an async task chain that often loses the race against session teardown. Instead, intercept partial segments as they arrive, stash them, and synchronously emit them as FINAL_TRANSCRIPT when FlushSentinel fires. Co-Authored-By: Claude Opus 4.6 --- .../src/plugins/speechmatics_stt.py | 46 +++++++++++++++---- 1 file changed, 38 insertions(+), 8 deletions(-) diff --git a/packages/services/voice-agent/src/plugins/speechmatics_stt.py b/packages/services/voice-agent/src/plugins/speechmatics_stt.py index 9e632bd..b9a5aef 100644 --- a/packages/services/voice-agent/src/plugins/speechmatics_stt.py +++ b/packages/services/voice-agent/src/plugins/speechmatics_stt.py @@ -17,16 +17,37 @@ from livekit.plugins.speechmatics.stt import SpeechStream logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- -# Monkey-patch SpeechStream._process_audio so that a FlushSentinel triggers -# client.finalize(). In EXTERNAL turn-detection mode the Speechmatics server -# only emits AddSegment (FINAL_TRANSCRIPT) after an explicit finalize signal. -# The upstream plugin flushes the byte-stream but never calls finalize(), -# which means LiveKit's VAD-driven flush never produces a final transcript. +# Monkey-patch SpeechStream so that a FlushSentinel (VAD end-of-speech) +# directly emits the last partial transcript as a FINAL_TRANSCRIPT. +# +# In EXTERNAL turn-detection mode the Speechmatics server never sends +# AddSegment on its own — it relies on the client calling finalize(). +# The upstream plugin handles FlushSentinel by flushing the audio byte +# stream but never calls finalize(), so LiveKit never gets a final. +# +# VoiceAgentClient.finalize() IS the right call, but it schedules an +# async task chain that often loses the race against session teardown +# (user disconnects right after speaking). The reliable fix is to +# directly promote the latest partial segments to FINAL_TRANSCRIPT +# synchronously on the event channel. # --------------------------------------------------------------------------- + +_original_handle_partial_segment = SpeechStream._handle_partial_segment + + +def _patched_handle_partial_segment(self: SpeechStream, message: dict) -> None: # type: ignore[override] + """Intercept partial segments and stash them for flush-time promotion.""" + segments = message.get("segments", []) + if segments: + self._sm_last_partial_segments = segments # type: ignore[attr-defined] + _original_handle_partial_segment(self, message) + + _original_process_audio = SpeechStream._process_audio async def _patched_process_audio(self: SpeechStream) -> None: # type: ignore[override] + self._sm_last_partial_segments: list = [] # type: ignore[attr-defined] try: audio_bstream = utils.audio.AudioByteStream( sample_rate=self._stt._sample_rate, @@ -35,9 +56,17 @@ async def _patched_process_audio(self: SpeechStream) -> None: # type: ignore[ov async for data in self._input_ch: if isinstance(data, self._FlushSentinel): frames = audio_bstream.flush() - # >>> patch: tell Speechmatics the turn ended <<< - if self._client and self._client._is_connected: - self._client.finalize() + # Promote stored partials → FINAL_TRANSCRIPT immediately + stored = getattr(self, "_sm_last_partial_segments", []) + if stored: + logger.info( + "FlushSentinel: promoting %d partial segment(s) to FINAL", + len(stored), + ) + self._send_frames(stored, is_final=True) + self._sm_last_partial_segments = [] # type: ignore[attr-defined] + else: + logger.warning("FlushSentinel received but no partial segments stored") else: frames = audio_bstream.write(data.data.tobytes()) @@ -50,6 +79,7 @@ async def _patched_process_audio(self: SpeechStream) -> None: # type: ignore[ov SpeechStream._process_audio = _patched_process_audio # type: ignore[assignment] +SpeechStream._handle_partial_segment = _patched_handle_partial_segment # type: ignore[assignment] # Map Whisper language codes to Speechmatics language codes _LANG_MAP = {