From 73fd56f30a449f3b7b8ba594a8aec1d01036f158 Mon Sep 17 00:00:00 2001 From: hailin Date: Tue, 3 Mar 2026 02:00:42 -0800 Subject: [PATCH] fix: durable monkey-patch for Speechmatics finalize on flush Move the SpeechStream._process_audio patch from container runtime into our own source code so it survives Docker rebuilds. The patch adds client.finalize() on FlushSentinel so EXTERNAL mode produces final transcripts when LiveKit's VAD detects end of speech. Co-Authored-By: Claude Opus 4.6 --- .../src/plugins/speechmatics_stt.py | 39 ++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/packages/services/voice-agent/src/plugins/speechmatics_stt.py b/packages/services/voice-agent/src/plugins/speechmatics_stt.py index fd0fdcd..9e632bd 100644 --- a/packages/services/voice-agent/src/plugins/speechmatics_stt.py +++ b/packages/services/voice-agent/src/plugins/speechmatics_stt.py @@ -2,17 +2,54 @@ Speechmatics STT factory for voice-agent. Creates a livekit-plugins-speechmatics STT instance configured for -Mandarin-English bilingual recognition with speaker diarization support. +Mandarin recognition with speaker diarization support. The SPEECHMATICS_API_KEY environment variable is read automatically by the livekit-plugins-speechmatics package. """ +import asyncio import logging +from livekit.agents import stt, utils from livekit.plugins.speechmatics import STT, TurnDetectionMode +from livekit.plugins.speechmatics.stt import SpeechStream logger = logging.getLogger(__name__) +# --------------------------------------------------------------------------- +# Monkey-patch SpeechStream._process_audio so that a FlushSentinel triggers +# client.finalize(). In EXTERNAL turn-detection mode the Speechmatics server +# only emits AddSegment (FINAL_TRANSCRIPT) after an explicit finalize signal. +# The upstream plugin flushes the byte-stream but never calls finalize(), +# which means LiveKit's VAD-driven flush never produces a final transcript. +# --------------------------------------------------------------------------- +_original_process_audio = SpeechStream._process_audio + + +async def _patched_process_audio(self: SpeechStream) -> None: # type: ignore[override] + try: + audio_bstream = utils.audio.AudioByteStream( + sample_rate=self._stt._sample_rate, + num_channels=1, + ) + async for data in self._input_ch: + if isinstance(data, self._FlushSentinel): + frames = audio_bstream.flush() + # >>> patch: tell Speechmatics the turn ended <<< + if self._client and self._client._is_connected: + self._client.finalize() + else: + frames = audio_bstream.write(data.data.tobytes()) + + if self._client: + for frame in frames: + self._speech_duration += frame.duration + await self._client.send_audio(frame.data.tobytes()) + except asyncio.CancelledError: + pass + + +SpeechStream._process_audio = _patched_process_audio # type: ignore[assignment] # Map Whisper language codes to Speechmatics language codes _LANG_MAP = {