fix: use debounce timer to auto-finalize Speechmatics partial transcripts
The LiveKit framework never sends FlushSentinel to the STT stream. Instead it pushes silence frames and waits for FINAL_TRANSCRIPT events. In EXTERNAL turn-detection mode, Speechmatics only emits partials. New approach: each partial transcript restarts a 700ms debounce timer. When partials stop (user stops speaking), the timer fires and promotes the last partial to FINAL_TRANSCRIPT, unblocking the pipeline. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
de3eccafd0
commit
8ac1884ab4
|
|
@ -17,76 +17,54 @@ from livekit.plugins.speechmatics.stt import SpeechStream
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Monkey-patch SpeechStream so that a FlushSentinel (VAD end-of-speech)
|
# Monkey-patch: auto-finalize partial transcripts after silence.
|
||||||
# directly emits the last partial transcript as a FINAL_TRANSCRIPT.
|
|
||||||
#
|
#
|
||||||
# In EXTERNAL turn-detection mode the Speechmatics server never sends
|
# In EXTERNAL turn-detection mode, the Speechmatics server never emits
|
||||||
# AddSegment on its own — it relies on the client calling finalize().
|
# AddSegment (FINAL_TRANSCRIPT) on its own. The LiveKit agents framework
|
||||||
# The upstream plugin handles FlushSentinel by flushing the audio byte
|
# does NOT call stream.flush() — it only pushes silence audio and waits
|
||||||
# stream but never calls finalize(), so LiveKit never gets a final.
|
# for FINAL_TRANSCRIPT events. So no FlushSentinel ever reaches the
|
||||||
|
# stream's _process_audio loop.
|
||||||
#
|
#
|
||||||
# VoiceAgentClient.finalize() IS the right call, but it schedules an
|
# Fix: each partial transcript restarts a debounce timer. When partials
|
||||||
# async task chain that often loses the race against session teardown
|
# stop arriving (user stops speaking), the timer fires and promotes the
|
||||||
# (user disconnects right after speaking). The reliable fix is to
|
# last partial to FINAL_TRANSCRIPT. The 700ms delay balances latency
|
||||||
# directly promote the latest partial segments to FINAL_TRANSCRIPT
|
# vs. avoiding mid-sentence finals.
|
||||||
# synchronously on the event channel.
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
_FINALIZE_DELAY = 0.7 # seconds after last partial before emitting FINAL
|
||||||
|
|
||||||
_original_handle_partial_segment = SpeechStream._handle_partial_segment
|
_original_handle_partial_segment = SpeechStream._handle_partial_segment
|
||||||
|
|
||||||
|
|
||||||
|
async def _auto_finalize(stream: SpeechStream) -> None:
|
||||||
|
"""Wait, then promote stored partials to FINAL_TRANSCRIPT."""
|
||||||
|
try:
|
||||||
|
await asyncio.sleep(_FINALIZE_DELAY)
|
||||||
|
stored = getattr(stream, "_sm_last_partial_segments", [])
|
||||||
|
if stored:
|
||||||
|
text = " | ".join(s.get("text", "") for s in stored)
|
||||||
|
logger.info("[SM] auto-finalize: promoting %d segment(s) to FINAL: %s", len(stored), text[:120])
|
||||||
|
stream._send_frames(stored, is_final=True)
|
||||||
|
stream._sm_last_partial_segments = [] # type: ignore[attr-defined]
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
def _patched_handle_partial_segment(self: SpeechStream, message: dict) -> None: # type: ignore[override]
|
def _patched_handle_partial_segment(self: SpeechStream, message: dict) -> None: # type: ignore[override]
|
||||||
"""Intercept partial segments and stash them for flush-time promotion."""
|
"""Intercept partial segments, stash them, and reset the finalize timer."""
|
||||||
segments = message.get("segments", [])
|
segments = message.get("segments", [])
|
||||||
if segments:
|
if segments:
|
||||||
text = " | ".join(s.get("text", "") for s in segments)
|
|
||||||
logger.info("[SM-PATCH] stashing %d partial segment(s): %s", len(segments), text[:100])
|
|
||||||
self._sm_last_partial_segments = segments # type: ignore[attr-defined]
|
self._sm_last_partial_segments = segments # type: ignore[attr-defined]
|
||||||
|
|
||||||
|
# Cancel previous timer and start a new one
|
||||||
|
timer = getattr(self, "_sm_finalize_timer", None)
|
||||||
|
if timer and not timer.done():
|
||||||
|
timer.cancel()
|
||||||
|
self._sm_finalize_timer = asyncio.create_task(_auto_finalize(self)) # type: ignore[attr-defined]
|
||||||
|
|
||||||
_original_handle_partial_segment(self, message)
|
_original_handle_partial_segment(self, message)
|
||||||
|
|
||||||
|
|
||||||
_original_process_audio = SpeechStream._process_audio
|
|
||||||
|
|
||||||
|
|
||||||
async def _patched_process_audio(self: SpeechStream) -> None: # type: ignore[override]
|
|
||||||
logger.info("[SM-PATCH] _patched_process_audio STARTED")
|
|
||||||
self._sm_last_partial_segments: list = [] # type: ignore[attr-defined]
|
|
||||||
try:
|
|
||||||
audio_bstream = utils.audio.AudioByteStream(
|
|
||||||
sample_rate=self._stt._sample_rate,
|
|
||||||
num_channels=1,
|
|
||||||
)
|
|
||||||
frame_count = 0
|
|
||||||
async for data in self._input_ch:
|
|
||||||
if isinstance(data, self._FlushSentinel):
|
|
||||||
logger.info("[SM-PATCH] FlushSentinel received (after %d frames)", frame_count)
|
|
||||||
frames = audio_bstream.flush()
|
|
||||||
# Promote stored partials → FINAL_TRANSCRIPT immediately
|
|
||||||
stored = getattr(self, "_sm_last_partial_segments", [])
|
|
||||||
if stored:
|
|
||||||
logger.info(
|
|
||||||
"[SM-PATCH] promoting %d partial segment(s) to FINAL",
|
|
||||||
len(stored),
|
|
||||||
)
|
|
||||||
self._send_frames(stored, is_final=True)
|
|
||||||
self._sm_last_partial_segments = [] # type: ignore[attr-defined]
|
|
||||||
else:
|
|
||||||
logger.warning("[SM-PATCH] FlushSentinel but no partial segments stored")
|
|
||||||
else:
|
|
||||||
frames = audio_bstream.write(data.data.tobytes())
|
|
||||||
frame_count += 1
|
|
||||||
|
|
||||||
if self._client:
|
|
||||||
for frame in frames:
|
|
||||||
self._speech_duration += frame.duration
|
|
||||||
await self._client.send_audio(frame.data.tobytes())
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
logger.info("[SM-PATCH] _patched_process_audio cancelled (processed %d frames)", frame_count if 'frame_count' in dir() else -1)
|
|
||||||
except Exception as e:
|
|
||||||
logger.error("[SM-PATCH] _patched_process_audio ERROR: %s", e, exc_info=True)
|
|
||||||
|
|
||||||
|
|
||||||
SpeechStream._process_audio = _patched_process_audio # type: ignore[assignment]
|
|
||||||
SpeechStream._handle_partial_segment = _patched_handle_partial_segment # type: ignore[assignment]
|
SpeechStream._handle_partial_segment = _patched_handle_partial_segment # type: ignore[assignment]
|
||||||
|
|
||||||
# Map Whisper language codes to Speechmatics language codes
|
# Map Whisper language codes to Speechmatics language codes
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue