fix: directly promote partial transcripts to FINAL on FlushSentinel

VoiceAgentClient.finalize() schedules an async task chain that often
loses the race against session teardown. Instead, intercept partial
segments as they arrive, stash them, and synchronously emit them as
FINAL_TRANSCRIPT when FlushSentinel fires.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-03-03 02:16:46 -08:00
parent 73fd56f30a
commit 1431dc0c83
1 changed files with 38 additions and 8 deletions

View File

@ -17,16 +17,37 @@ from livekit.plugins.speechmatics.stt import SpeechStream
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Monkey-patch SpeechStream._process_audio so that a FlushSentinel triggers
# client.finalize(). In EXTERNAL turn-detection mode the Speechmatics server
# only emits AddSegment (FINAL_TRANSCRIPT) after an explicit finalize signal.
# The upstream plugin flushes the byte-stream but never calls finalize(),
# which means LiveKit's VAD-driven flush never produces a final transcript.
# Monkey-patch SpeechStream so that a FlushSentinel (VAD end-of-speech)
# directly emits the last partial transcript as a FINAL_TRANSCRIPT.
#
# In EXTERNAL turn-detection mode the Speechmatics server never sends
# AddSegment on its own — it relies on the client calling finalize().
# The upstream plugin handles FlushSentinel by flushing the audio byte
# stream but never calls finalize(), so LiveKit never gets a final.
#
# VoiceAgentClient.finalize() IS the right call, but it schedules an
# async task chain that often loses the race against session teardown
# (user disconnects right after speaking). The reliable fix is to
# directly promote the latest partial segments to FINAL_TRANSCRIPT
# synchronously on the event channel.
# ---------------------------------------------------------------------------
_original_handle_partial_segment = SpeechStream._handle_partial_segment
def _patched_handle_partial_segment(self: SpeechStream, message: dict) -> None: # type: ignore[override]
"""Intercept partial segments and stash them for flush-time promotion."""
segments = message.get("segments", [])
if segments:
self._sm_last_partial_segments = segments # type: ignore[attr-defined]
_original_handle_partial_segment(self, message)
_original_process_audio = SpeechStream._process_audio
async def _patched_process_audio(self: SpeechStream) -> None: # type: ignore[override]
self._sm_last_partial_segments: list = [] # type: ignore[attr-defined]
try:
audio_bstream = utils.audio.AudioByteStream(
sample_rate=self._stt._sample_rate,
@ -35,9 +56,17 @@ async def _patched_process_audio(self: SpeechStream) -> None: # type: ignore[ov
async for data in self._input_ch:
if isinstance(data, self._FlushSentinel):
frames = audio_bstream.flush()
# >>> patch: tell Speechmatics the turn ended <<<
if self._client and self._client._is_connected:
self._client.finalize()
# Promote stored partials → FINAL_TRANSCRIPT immediately
stored = getattr(self, "_sm_last_partial_segments", [])
if stored:
logger.info(
"FlushSentinel: promoting %d partial segment(s) to FINAL",
len(stored),
)
self._send_frames(stored, is_final=True)
self._sm_last_partial_segments = [] # type: ignore[attr-defined]
else:
logger.warning("FlushSentinel received but no partial segments stored")
else:
frames = audio_bstream.write(data.data.tobytes())
@ -50,6 +79,7 @@ async def _patched_process_audio(self: SpeechStream) -> None: # type: ignore[ov
SpeechStream._process_audio = _patched_process_audio # type: ignore[assignment]
SpeechStream._handle_partial_segment = _patched_handle_partial_segment # type: ignore[assignment]
# Map Whisper language codes to Speechmatics language codes
_LANG_MAP = {