fix: text-based dedup to prevent duplicate FINAL_TRANSCRIPT emissions
Speechmatics re-sends identical partial segments during silence, causing the debounce timer to fire multiple times with the same text. Each duplicate FINAL aborts the in-flight LLM request and restarts it. Replace time-based cooldown with text comparison: skip finalization if the segment text matches the last finalized text. Also skip starting new timers when partial text hasn't changed from last finalized. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
3b0119fe09
commit
de99990c4d
|
|
@ -9,7 +9,6 @@ by the livekit-plugins-speechmatics package.
|
||||||
"""
|
"""
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
import time
|
|
||||||
|
|
||||||
from livekit.agents import stt, utils
|
from livekit.agents import stt, utils
|
||||||
from livekit.plugins.speechmatics import STT, TurnDetectionMode
|
from livekit.plugins.speechmatics import STT, TurnDetectionMode
|
||||||
|
|
@ -23,20 +22,25 @@ logger = logging.getLogger(__name__)
|
||||||
# In EXTERNAL turn-detection mode, the Speechmatics server never emits
|
# In EXTERNAL turn-detection mode, the Speechmatics server never emits
|
||||||
# AddSegment (FINAL_TRANSCRIPT) on its own. The LiveKit agents framework
|
# AddSegment (FINAL_TRANSCRIPT) on its own. The LiveKit agents framework
|
||||||
# does NOT call stream.flush() — it only pushes silence audio and waits
|
# does NOT call stream.flush() — it only pushes silence audio and waits
|
||||||
# for FINAL_TRANSCRIPT events. So no FlushSentinel ever reaches the
|
# for FINAL_TRANSCRIPT events.
|
||||||
# stream's _process_audio loop.
|
|
||||||
#
|
#
|
||||||
# Fix: each partial transcript restarts a debounce timer. When partials
|
# Fix: each partial transcript restarts a debounce timer. When partials
|
||||||
# stop arriving (user stops speaking), the timer fires and promotes the
|
# stop arriving (user stops speaking), the timer fires and promotes the
|
||||||
# last partial to FINAL_TRANSCRIPT. A cooldown prevents duplicate finals.
|
# last partial to FINAL_TRANSCRIPT. Text-based deduplication prevents
|
||||||
|
# the same transcript from being finalized multiple times (Speechmatics
|
||||||
|
# re-sends identical partials during silence).
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
_FINALIZE_DELAY = 0.4 # seconds after last partial before emitting FINAL
|
_FINALIZE_DELAY = 0.4 # seconds after last partial before emitting FINAL
|
||||||
_COOLDOWN = 1.5 # seconds after a FINAL before allowing another
|
|
||||||
|
|
||||||
_original_handle_partial_segment = SpeechStream._handle_partial_segment
|
_original_handle_partial_segment = SpeechStream._handle_partial_segment
|
||||||
|
|
||||||
|
|
||||||
|
def _segments_text(segments: list) -> str:
|
||||||
|
"""Extract combined text from segment dicts."""
|
||||||
|
return " | ".join(s.get("text", "") for s in segments)
|
||||||
|
|
||||||
|
|
||||||
async def _auto_finalize(stream: SpeechStream) -> None:
|
async def _auto_finalize(stream: SpeechStream) -> None:
|
||||||
"""Wait, then promote stored partials to FINAL_TRANSCRIPT."""
|
"""Wait, then promote stored partials to FINAL_TRANSCRIPT."""
|
||||||
try:
|
try:
|
||||||
|
|
@ -44,15 +48,15 @@ async def _auto_finalize(stream: SpeechStream) -> None:
|
||||||
stored = getattr(stream, "_sm_last_partial_segments", [])
|
stored = getattr(stream, "_sm_last_partial_segments", [])
|
||||||
if not stored:
|
if not stored:
|
||||||
return
|
return
|
||||||
# Cooldown: skip if we recently emitted a FINAL
|
# Text dedup: skip if this exact text was already finalized
|
||||||
last_final_ts = getattr(stream, "_sm_last_final_ts", 0.0)
|
text = _segments_text(stored)
|
||||||
if time.monotonic() - last_final_ts < _COOLDOWN:
|
last_final_text = getattr(stream, "_sm_last_final_text", "")
|
||||||
|
if text == last_final_text:
|
||||||
return
|
return
|
||||||
text = " | ".join(s.get("text", "") for s in stored)
|
logger.info("[SM] auto-finalize → FINAL: %s", text[:120])
|
||||||
logger.info("[SM] auto-finalize: promoting %d segment(s) to FINAL: %s", len(stored), text[:120])
|
|
||||||
stream._send_frames(stored, is_final=True)
|
stream._send_frames(stored, is_final=True)
|
||||||
stream._sm_last_partial_segments = [] # type: ignore[attr-defined]
|
stream._sm_last_partial_segments = [] # type: ignore[attr-defined]
|
||||||
stream._sm_last_final_ts = time.monotonic() # type: ignore[attr-defined]
|
stream._sm_last_final_text = text # type: ignore[attr-defined]
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
@ -63,7 +67,10 @@ def _patched_handle_partial_segment(self: SpeechStream, message: dict) -> None:
|
||||||
if segments:
|
if segments:
|
||||||
self._sm_last_partial_segments = segments # type: ignore[attr-defined]
|
self._sm_last_partial_segments = segments # type: ignore[attr-defined]
|
||||||
|
|
||||||
# Cancel previous timer and start a new one
|
# Only start a timer if text actually changed from last finalized
|
||||||
|
text = _segments_text(segments)
|
||||||
|
last_final_text = getattr(self, "_sm_last_final_text", "")
|
||||||
|
if text != last_final_text:
|
||||||
timer = getattr(self, "_sm_finalize_timer", None)
|
timer = getattr(self, "_sm_finalize_timer", None)
|
||||||
if timer and not timer.done():
|
if timer and not timer.done():
|
||||||
timer.cancel()
|
timer.cancel()
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue