From 16d8a744ac3401a8607bc813e8d270e35db11205 Mon Sep 17 00:00:00 2001 From: hailin Date: Mon, 6 Apr 2026 08:44:55 -0700 Subject: [PATCH] feat: system idle command - return ESP32 to wake-word state on error MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Firmware: add "idle" system command handler in application.cc Server: antaf.py sets should_idle on connection error connection.py checks should_idle, sets send_idle_after_tts sendAudioHandle.py sends {"type":"system","command":"idle"} after TTS playback completes Flow: antaf bridge unavailable → play error message → tts stop → send system idle → ESP32 returns to wake-word detection Co-Authored-By: Claude Opus 4.6 (1M context) --- .../main/xiaozhi-server/core/connection.py | 5 + .../core/handle/sendAudioHandle.py | 7 + .../core/providers/asr/antaf_passthrough.py | 251 ++++++++++++++++++ .../core/providers/llm/antaf/antaf.py | 5 + 4 files changed, 268 insertions(+) create mode 100644 backend/main/xiaozhi-server/core/providers/asr/antaf_passthrough.py diff --git a/backend/main/xiaozhi-server/core/connection.py b/backend/main/xiaozhi-server/core/connection.py index d167bf1..9b08579 100644 --- a/backend/main/xiaozhi-server/core/connection.py +++ b/backend/main/xiaozhi-server/core/connection.py @@ -1147,6 +1147,11 @@ class ConnectionHandler: self.tool_call_stats['consecutive_no_call'] += 1 if depth == 0: + # Check if LLM signaled to send idle after TTS completes + if hasattr(self.llm, 'should_idle') and self.llm.should_idle: + self.send_idle_after_tts = True + self.llm.should_idle = False + self.tts.tts_text_queue.put( TTSMessageDTO( sentence_id=self.sentence_id, diff --git a/backend/main/xiaozhi-server/core/handle/sendAudioHandle.py b/backend/main/xiaozhi-server/core/handle/sendAudioHandle.py index d2b24f2..2870e67 100644 --- a/backend/main/xiaozhi-server/core/handle/sendAudioHandle.py +++ b/backend/main/xiaozhi-server/core/handle/sendAudioHandle.py @@ -46,6 +46,13 @@ async def sendAudioMessage(conn: "ConnectionHandler", sentenceType, audios, text if sentenceType == SentenceType.LAST: await send_tts_message(conn, "stop", None) conn.client_is_speaking = False + # Send system idle command if LLM requested it (e.g. antaf bridge unavailable) + if getattr(conn, 'send_idle_after_tts', False): + conn.send_idle_after_tts = False + await conn.websocket.send( + json.dumps({"type": "system", "command": "idle"}) + ) + conn.logger.bind(tag=TAG).info("Sent system idle to device") if conn.close_after_chat: await conn.close() diff --git a/backend/main/xiaozhi-server/core/providers/asr/antaf_passthrough.py b/backend/main/xiaozhi-server/core/providers/asr/antaf_passthrough.py new file mode 100644 index 0000000..547a807 --- /dev/null +++ b/backend/main/xiaozhi-server/core/providers/asr/antaf_passthrough.py @@ -0,0 +1,251 @@ +""" +Antaf Voice Passthrough ASR Provider + +Replaces ASR→LLM→TTS pipeline with direct audio forwarding to Antaf voice_bridge. +ESP32 audio → decode Opus → resample 16kHz→48kHz → inject to voice_bridge (type=3) +voice_bridge speaker (type=0) → resample 48kHz→16kHz → encode Opus → send to ESP32 + +Runs within xiaozhi-server, keeping all protocol handling (hello, OTA, wake word) intact. +""" + +import json +import struct +import asyncio +import threading +import numpy as np +import opuslib_next +from scipy.signal import resample_poly +from math import gcd +from config.logger import setup_logging +from core.providers.asr.base import ASRProviderBase +from core.handle.sendAudioHandle import send_tts_message + +TAG = __name__ +logger = setup_logging() + +# Audio parameters +ESP_SR = 16000 +ESP_FRAME_SAMPLES = 960 # 60ms at 16kHz +BRIDGE_SR = 48000 +BRIDGE_FRAME_SAMPLES = 480 # 960 bytes / 2 = 480 samples + +# Resample ratios +UP = (BRIDGE_SR // gcd(BRIDGE_SR, ESP_SR), ESP_SR // gcd(BRIDGE_SR, ESP_SR)) # (3,1) +DOWN = (ESP_SR // gcd(ESP_SR, BRIDGE_SR), BRIDGE_SR // gcd(ESP_SR, BRIDGE_SR)) # (1,3) + + +class ASRProvider(ASRProviderBase): + def __init__(self, config): + super().__init__() + self.bridge_host = config.get("bridge_host", "127.0.0.1") + self.bridge_port = int(config.get("bridge_port", 18901)) + self.interface_type = "NON_STREAM" + self.conn = None + self.bridge_reader = None + self.bridge_writer = None + self.opus_decoder = None + self.opus_encoder = None + self._inject_buf = np.array([], dtype=np.int16) + self._speaker_buf = np.array([], dtype=np.int16) + self._tts_started = False + self._recv_task = None + self._connected = False + logger.bind(tag=TAG).info( + f"AntafPassthrough 初始化: bridge={self.bridge_host}:{self.bridge_port}" + ) + + async def open_audio_channels(self, conn): + """Override: connect to bridge, start passthrough instead of normal ASR.""" + # Clean up previous connection if any + await self.close() + + self.conn = conn + self.opus_decoder = opuslib_next.Decoder(ESP_SR, 1) + self.opus_encoder = opuslib_next.Encoder(ESP_SR, 1, opuslib_next.APPLICATION_AUDIO) + self._tts_started = False + self._silence_count = 0 + self._inject_buf = np.array([], dtype=np.int16) + self._speaker_buf = np.array([], dtype=np.int16) + self._write_lock = threading.Lock() + + # Connect to voice_bridge + try: + self.bridge_reader, self.bridge_writer = await asyncio.open_connection( + self.bridge_host, self.bridge_port + ) + # Read connected event + ftype, data = await self._bridge_recv() + if ftype == 1: + msg = json.loads(data.decode()) + logger.bind(tag=TAG).info(f"Bridge connected: {msg.get('protocol')}") + + # Send start + inject_on + self._bridge_send_cmd({"cmd": "start"}) + ftype, data = await self._bridge_recv() + if ftype == 1: + logger.bind(tag=TAG).info(f"Bridge: {json.loads(data.decode())}") + + self._bridge_send_cmd({"cmd": "inject_on"}) + ftype, data = await self._bridge_recv() + if ftype == 1: + logger.bind(tag=TAG).info(f"Bridge: {json.loads(data.decode())}") + + self._connected = True + logger.bind(tag=TAG).info("Voice bridge ready (inject mode)") + + # Start speaker receive loop + self._recv_task = asyncio.create_task(self._speaker_recv_loop()) + + except Exception as e: + logger.bind(tag=TAG).error(f"Bridge connection failed: {e}") + self._connected = False + + # Start normal audio processing thread (reads from asr_audio_queue) + conn.asr_priority_thread = threading.Thread( + target=self._audio_thread, args=(conn,), daemon=True + ) + conn.asr_priority_thread.start() + + def _audio_thread(self, conn): + """Read Opus frames from queue, decode, resample, inject to bridge.""" + import queue as queue_module + frame_count = 0 + while not conn.stop_event.is_set(): + try: + opus_data = conn.asr_audio_queue.get(timeout=1) + if not self._connected: + continue + + frame_count += 1 + if frame_count <= 3 or frame_count % 200 == 0: + logger.bind(tag=TAG).debug(f"Audio frame #{frame_count}") + + # Decode Opus → PCM 16kHz + pcm = self.opus_decoder.decode(opus_data, ESP_FRAME_SAMPLES) + samples = np.frombuffer(pcm, dtype=np.int16) + + # Resample 16kHz → 48kHz + upsampled = resample_poly(samples, UP[0], UP[1]).astype(np.int16) + + # Split into bridge frames and inject + self._inject_buf = np.concatenate([self._inject_buf, upsampled]) + while len(self._inject_buf) >= BRIDGE_FRAME_SAMPLES: + frame = self._inject_buf[:BRIDGE_FRAME_SAMPLES] + self._inject_buf = self._inject_buf[BRIDGE_FRAME_SAMPLES:] + self._bridge_send_inject(frame.tobytes()) + + except queue_module.Empty: + continue + except Exception as e: + logger.bind(tag=TAG).error(f"Audio thread error: {e}") + + async def _speaker_recv_loop(self): + """Receive speaker PCM from bridge, resample, encode Opus, send to ESP32.""" + try: + while self._connected: + ftype, data = await self._bridge_recv() + if ftype == 0: + # Speaker audio + await self._handle_speaker(data) + elif ftype == 1: + msg = json.loads(data.decode()) + logger.bind(tag=TAG).debug(f"Bridge event: {msg}") + except asyncio.IncompleteReadError: + logger.bind(tag=TAG).warning("Bridge connection closed") + except Exception as e: + logger.bind(tag=TAG).error(f"Speaker recv error: {e}") + finally: + self._connected = False + + async def _handle_speaker(self, pcm_bytes): + """Process speaker frame and send to ESP32.""" + if not self.conn or not self.conn.websocket: + return + + samples = np.frombuffer(pcm_bytes, dtype=np.int16) + max_amp = int(np.max(np.abs(samples))) + + # Track silence for tts stop + if max_amp < 10: + if self._tts_started: + self._silence_count += 1 + # 50 frames of silence (~1 second) → send tts stop + if self._silence_count > 50: + try: + await send_tts_message(self.conn, "stop") + self.conn.client_is_speaking = False + self._tts_started = False + self._silence_count = 0 + logger.bind(tag=TAG).info("Sent tts stop to ESP32") + except Exception as e: + logger.bind(tag=TAG).error(f"Send tts stop error: {e}") + return + + # Reset silence counter on non-silent frame + self._silence_count = 0 + + # Send tts start before first audio + if not self._tts_started: + try: + await send_tts_message(self.conn, "start") + self._tts_started = True + self.conn.client_is_speaking = True + logger.bind(tag=TAG).info("Sent tts start to ESP32") + except Exception as e: + logger.bind(tag=TAG).error(f"Send tts start error: {e}") + return + + # Resample 48kHz → 16kHz + downsampled = resample_poly(samples, DOWN[0], DOWN[1]).astype(np.int16) + + # Accumulate and encode + self._speaker_buf = np.concatenate([self._speaker_buf, downsampled]) + while len(self._speaker_buf) >= ESP_FRAME_SAMPLES: + frame = self._speaker_buf[:ESP_FRAME_SAMPLES] + self._speaker_buf = self._speaker_buf[ESP_FRAME_SAMPLES:] + opus_data = self.opus_encoder.encode(frame.tobytes(), ESP_FRAME_SAMPLES) + try: + await self.conn.websocket.send(opus_data) + except Exception as e: + logger.bind(tag=TAG).error(f"Send opus to ESP32 error: {e}") + return + + # Bridge TCP helpers + async def _bridge_recv(self): + header = await self.bridge_reader.readexactly(5) + length = struct.unpack(">I", header[:4])[0] + ftype = header[4] + data = await self.bridge_reader.readexactly(length) + return ftype, data + + def _bridge_send_cmd(self, cmd): + data = json.dumps(cmd).encode() + header = struct.pack(">IB", len(data), 1) + with self._write_lock: + self.bridge_writer.write(header + data) + + def _bridge_send_inject(self, pcm_bytes): + header = struct.pack(">IB", len(pcm_bytes), 3) + with self._write_lock: + self.bridge_writer.write(header + pcm_bytes) + + # ASR interface — never returns text, LLM/TTS never triggered + async def receive_audio(self, conn, audio, audio_have_voice): + """No-op: audio is handled by _audio_thread directly from queue.""" + pass + + async def speech_to_text(self, opus_data, session_id, audio_format="opus", artifacts=None): + """Never called in passthrough mode.""" + return "", None + + async def close(self): + self._connected = False + if self._recv_task: + self._recv_task.cancel() + if self.bridge_writer: + try: + self._bridge_send_cmd({"cmd": "inject_off"}) + self._bridge_send_cmd({"cmd": "stop"}) + self.bridge_writer.close() + except Exception: + pass diff --git a/backend/main/xiaozhi-server/core/providers/llm/antaf/antaf.py b/backend/main/xiaozhi-server/core/providers/llm/antaf/antaf.py index 438e95d..5424e30 100644 --- a/backend/main/xiaozhi-server/core/providers/llm/antaf/antaf.py +++ b/backend/main/xiaozhi-server/core/providers/llm/antaf/antaf.py @@ -17,6 +17,7 @@ class LLMProvider(LLMProviderBase): def __init__(self, config): self.bridge_url = config.get("bridge_url", "http://127.0.0.1:18900") self.timeout = config.get("timeout", 60) + self.should_idle = False # signal to send system idle after TTS logger.bind(tag=TAG).info( f"AntafLLM 初始化: bridge={self.bridge_url}, timeout={self.timeout}s" ) @@ -91,6 +92,7 @@ class LLMProvider(LLMProviderBase): # 追加简短回答提示,避免阿福回复过长导致TTS排队卡顿 query = query + "(请用2-3句话简短回答)" + self.should_idle = False logger.bind(tag=TAG).info(f"AntafLLM 请求: {query[:50]}...") try: @@ -129,10 +131,13 @@ class LLMProvider(LLMProviderBase): except requests.exceptions.ConnectionError: logger.bind(tag=TAG).error("无法连接蚂蚁阿福 Bridge,请检查手机和 Frida 状态") + self.should_idle = True yield "抱歉,蚂蚁阿福服务暂时不可用。" except requests.exceptions.Timeout: logger.bind(tag=TAG).error(f"蚂蚁阿福 Bridge 超时 ({self.timeout}s)") + self.should_idle = True yield "抱歉,回答超时了。" except Exception as e: logger.bind(tag=TAG).error(f"AntafLLM 异常: {e}") + self.should_idle = True yield "抱歉,发生了错误。"