fix: durable monkey-patch for Speechmatics finalize on flush

Move the SpeechStream._process_audio patch from container runtime
into our own source code so it survives Docker rebuilds. The patch
adds client.finalize() on FlushSentinel so EXTERNAL mode produces
final transcripts when LiveKit's VAD detects end of speech.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-03-03 02:00:42 -08:00
parent 6707c5048d
commit 73fd56f30a
1 changed files with 38 additions and 1 deletions

View File

@ -2,17 +2,54 @@
Speechmatics STT factory for voice-agent.
Creates a livekit-plugins-speechmatics STT instance configured for
Mandarin-English bilingual recognition with speaker diarization support.
Mandarin recognition with speaker diarization support.
The SPEECHMATICS_API_KEY environment variable is read automatically
by the livekit-plugins-speechmatics package.
"""
import asyncio
import logging
from livekit.agents import stt, utils
from livekit.plugins.speechmatics import STT, TurnDetectionMode
from livekit.plugins.speechmatics.stt import SpeechStream
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Monkey-patch SpeechStream._process_audio so that a FlushSentinel triggers
# client.finalize(). In EXTERNAL turn-detection mode the Speechmatics server
# only emits AddSegment (FINAL_TRANSCRIPT) after an explicit finalize signal.
# The upstream plugin flushes the byte-stream but never calls finalize(),
# which means LiveKit's VAD-driven flush never produces a final transcript.
# ---------------------------------------------------------------------------
_original_process_audio = SpeechStream._process_audio
async def _patched_process_audio(self: SpeechStream) -> None: # type: ignore[override]
try:
audio_bstream = utils.audio.AudioByteStream(
sample_rate=self._stt._sample_rate,
num_channels=1,
)
async for data in self._input_ch:
if isinstance(data, self._FlushSentinel):
frames = audio_bstream.flush()
# >>> patch: tell Speechmatics the turn ended <<<
if self._client and self._client._is_connected:
self._client.finalize()
else:
frames = audio_bstream.write(data.data.tobytes())
if self._client:
for frame in frames:
self._speech_duration += frame.duration
await self._client.send_audio(frame.data.tobytes())
except asyncio.CancelledError:
pass
SpeechStream._process_audio = _patched_process_audio # type: ignore[assignment]
# Map Whisper language codes to Speechmatics language codes
_LANG_MAP = {