diff --git a/packages/services/voice-agent/src/plugins/speechmatics_stt.py b/packages/services/voice-agent/src/plugins/speechmatics_stt.py index c50d32b..bd5cff3 100644 --- a/packages/services/voice-agent/src/plugins/speechmatics_stt.py +++ b/packages/services/voice-agent/src/plugins/speechmatics_stt.py @@ -9,7 +9,6 @@ by the livekit-plugins-speechmatics package. """ import asyncio import logging -import time from livekit.agents import stt, utils from livekit.plugins.speechmatics import STT, TurnDetectionMode @@ -23,20 +22,25 @@ logger = logging.getLogger(__name__) # In EXTERNAL turn-detection mode, the Speechmatics server never emits # AddSegment (FINAL_TRANSCRIPT) on its own. The LiveKit agents framework # does NOT call stream.flush() — it only pushes silence audio and waits -# for FINAL_TRANSCRIPT events. So no FlushSentinel ever reaches the -# stream's _process_audio loop. +# for FINAL_TRANSCRIPT events. # # Fix: each partial transcript restarts a debounce timer. When partials # stop arriving (user stops speaking), the timer fires and promotes the -# last partial to FINAL_TRANSCRIPT. A cooldown prevents duplicate finals. +# last partial to FINAL_TRANSCRIPT. Text-based deduplication prevents +# the same transcript from being finalized multiple times (Speechmatics +# re-sends identical partials during silence). # --------------------------------------------------------------------------- _FINALIZE_DELAY = 0.4 # seconds after last partial before emitting FINAL -_COOLDOWN = 1.5 # seconds after a FINAL before allowing another _original_handle_partial_segment = SpeechStream._handle_partial_segment +def _segments_text(segments: list) -> str: + """Extract combined text from segment dicts.""" + return " | ".join(s.get("text", "") for s in segments) + + async def _auto_finalize(stream: SpeechStream) -> None: """Wait, then promote stored partials to FINAL_TRANSCRIPT.""" try: @@ -44,15 +48,15 @@ async def _auto_finalize(stream: SpeechStream) -> None: stored = getattr(stream, "_sm_last_partial_segments", []) if not stored: return - # Cooldown: skip if we recently emitted a FINAL - last_final_ts = getattr(stream, "_sm_last_final_ts", 0.0) - if time.monotonic() - last_final_ts < _COOLDOWN: + # Text dedup: skip if this exact text was already finalized + text = _segments_text(stored) + last_final_text = getattr(stream, "_sm_last_final_text", "") + if text == last_final_text: return - text = " | ".join(s.get("text", "") for s in stored) - logger.info("[SM] auto-finalize: promoting %d segment(s) to FINAL: %s", len(stored), text[:120]) + logger.info("[SM] auto-finalize → FINAL: %s", text[:120]) stream._send_frames(stored, is_final=True) stream._sm_last_partial_segments = [] # type: ignore[attr-defined] - stream._sm_last_final_ts = time.monotonic() # type: ignore[attr-defined] + stream._sm_last_final_text = text # type: ignore[attr-defined] except asyncio.CancelledError: pass @@ -63,11 +67,14 @@ def _patched_handle_partial_segment(self: SpeechStream, message: dict) -> None: if segments: self._sm_last_partial_segments = segments # type: ignore[attr-defined] - # Cancel previous timer and start a new one - timer = getattr(self, "_sm_finalize_timer", None) - if timer and not timer.done(): - timer.cancel() - self._sm_finalize_timer = asyncio.create_task(_auto_finalize(self)) # type: ignore[attr-defined] + # Only start a timer if text actually changed from last finalized + text = _segments_text(segments) + last_final_text = getattr(self, "_sm_last_final_text", "") + if text != last_final_text: + timer = getattr(self, "_sm_finalize_timer", None) + if timer and not timer.done(): + timer.cancel() + self._sm_finalize_timer = asyncio.create_task(_auto_finalize(self)) # type: ignore[attr-defined] _original_handle_partial_segment(self, message)