diff --git a/packages/services/voice-agent/src/plugins/speechmatics_stt.py b/packages/services/voice-agent/src/plugins/speechmatics_stt.py index 9e632bd..b9a5aef 100644 --- a/packages/services/voice-agent/src/plugins/speechmatics_stt.py +++ b/packages/services/voice-agent/src/plugins/speechmatics_stt.py @@ -17,16 +17,37 @@ from livekit.plugins.speechmatics.stt import SpeechStream logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- -# Monkey-patch SpeechStream._process_audio so that a FlushSentinel triggers -# client.finalize(). In EXTERNAL turn-detection mode the Speechmatics server -# only emits AddSegment (FINAL_TRANSCRIPT) after an explicit finalize signal. -# The upstream plugin flushes the byte-stream but never calls finalize(), -# which means LiveKit's VAD-driven flush never produces a final transcript. +# Monkey-patch SpeechStream so that a FlushSentinel (VAD end-of-speech) +# directly emits the last partial transcript as a FINAL_TRANSCRIPT. +# +# In EXTERNAL turn-detection mode the Speechmatics server never sends +# AddSegment on its own — it relies on the client calling finalize(). +# The upstream plugin handles FlushSentinel by flushing the audio byte +# stream but never calls finalize(), so LiveKit never gets a final. +# +# VoiceAgentClient.finalize() IS the right call, but it schedules an +# async task chain that often loses the race against session teardown +# (user disconnects right after speaking). The reliable fix is to +# directly promote the latest partial segments to FINAL_TRANSCRIPT +# synchronously on the event channel. # --------------------------------------------------------------------------- + +_original_handle_partial_segment = SpeechStream._handle_partial_segment + + +def _patched_handle_partial_segment(self: SpeechStream, message: dict) -> None: # type: ignore[override] + """Intercept partial segments and stash them for flush-time promotion.""" + segments = message.get("segments", []) + if segments: + self._sm_last_partial_segments = segments # type: ignore[attr-defined] + _original_handle_partial_segment(self, message) + + _original_process_audio = SpeechStream._process_audio async def _patched_process_audio(self: SpeechStream) -> None: # type: ignore[override] + self._sm_last_partial_segments: list = [] # type: ignore[attr-defined] try: audio_bstream = utils.audio.AudioByteStream( sample_rate=self._stt._sample_rate, @@ -35,9 +56,17 @@ async def _patched_process_audio(self: SpeechStream) -> None: # type: ignore[ov async for data in self._input_ch: if isinstance(data, self._FlushSentinel): frames = audio_bstream.flush() - # >>> patch: tell Speechmatics the turn ended <<< - if self._client and self._client._is_connected: - self._client.finalize() + # Promote stored partials → FINAL_TRANSCRIPT immediately + stored = getattr(self, "_sm_last_partial_segments", []) + if stored: + logger.info( + "FlushSentinel: promoting %d partial segment(s) to FINAL", + len(stored), + ) + self._send_frames(stored, is_final=True) + self._sm_last_partial_segments = [] # type: ignore[attr-defined] + else: + logger.warning("FlushSentinel received but no partial segments stored") else: frames = audio_bstream.write(data.data.tobytes()) @@ -50,6 +79,7 @@ async def _patched_process_audio(self: SpeechStream) -> None: # type: ignore[ov SpeechStream._process_audio = _patched_process_audio # type: ignore[assignment] +SpeechStream._handle_partial_segment = _patched_handle_partial_segment # type: ignore[assignment] # Map Whisper language codes to Speechmatics language codes _LANG_MAP = {