diff --git a/modules/antaf/antaf_llm.py b/modules/antaf/antaf_llm.py new file mode 100644 index 0000000..5424e30 --- /dev/null +++ b/modules/antaf/antaf_llm.py @@ -0,0 +1,143 @@ +import json +import requests +from config.logger import setup_logging +from core.providers.llm.base import LLMProviderBase + +TAG = __name__ +logger = setup_logging() + + +class LLMProvider(LLMProviderBase): + """ + 蚂蚁阿福 LLM Provider + 通过 Frida HTTP Bridge (port 18900) 对接蚂蚁阿福 App 的文字对话 API。 + Bridge 运行在手机上,通过 adb forward 或网络暴露 SSE 流式接口。 + """ + + def __init__(self, config): + self.bridge_url = config.get("bridge_url", "http://127.0.0.1:18900") + self.timeout = config.get("timeout", 60) + self.should_idle = False # signal to send system idle after TTS + logger.bind(tag=TAG).info( + f"AntafLLM 初始化: bridge={self.bridge_url}, timeout={self.timeout}s" + ) + + @staticmethod + def _is_thinking(text): + """检测蚂蚁阿福的内心思考/推理过程,这些不应该发给用户""" + thinking_patterns = [ + "用户问", "用户说", "用户的", "用户可能", "用户真正", + "我得", "我会", "我在想", "我决定", "我要", + "语气比较", "感觉他", "让他知道", "让他觉得", + "先安抚", "得先", "不想表现", + "整体语气", "这样能", "这样他", + "所以我", "还带了个", + ] + for p in thinking_patterns: + if p in text: + return True + return False + + @staticmethod + def _clean_text(text): + """清理阿福返回文本中的脏数据""" + # 去掉阿福内部状态文本 + junk = [ + "完成资料引用", "内容生成", "正在思考", "正在搜索", + ] + for j in junk: + text = text.replace(j, "") + return text.strip() + + @staticmethod + def _is_system_injected(content): + """检测是否为系统注入的消息(非用户真实输入)""" + if not content: + return True + markers = [ + "[系统提示]", "tool_call", "", "TOOL USE", + "系统提示", "工具调用", "function_call", + "handle_exit_intent", "你有以下工具", "You have access", + ] + for m in markers: + if m in content: + return True + # 超过200字的 user 消息大概率是系统注入的 + if len(content) > 200: + return True + return False + + def response(self, session_id, dialogue, **kwargs): + # 从 dialogue 中提取真正的用户消息(跳过系统注入的 user 消息) + query = "" + for msg in reversed(dialogue): + if msg.get("role") == "user": + content = msg.get("content", "") + if not self._is_system_injected(content): + # ASR 结果可能是 JSON: {"content":"...", "language":"zh", "emotion":"..."} + try: + parsed = json.loads(content) + if isinstance(parsed, dict) and "content" in parsed: + query = parsed["content"] + else: + query = content + except (json.JSONDecodeError, TypeError): + query = content + break + + if not query: + logger.bind(tag=TAG).warning("对话中没有用户消息") + yield "抱歉,我没有收到您的问题。" + return + + # 追加简短回答提示,避免阿福回复过长导致TTS排队卡顿 + query = query + "(请用2-3句话简短回答)" + self.should_idle = False + logger.bind(tag=TAG).info(f"AntafLLM 请求: {query[:50]}...") + + try: + url = f"{self.bridge_url}/chat" + resp = requests.get( + url, + params={"q": query}, + stream=True, + timeout=self.timeout, + ) + resp.encoding = "utf-8" + + seen_texts = set() + for line in resp.iter_lines(decode_unicode=True): + if not line: + continue + if line.startswith("data: "): + data = line[6:] + if data == "[DONE]": + break + if not data or len(data.strip()) == 0: + continue + # 去重:跳过完全相同的文本块 + if data in seen_texts: + continue + seen_texts.add(data) + # 过滤思考过程 + if self._is_thinking(data): + logger.bind(tag=TAG).debug(f"过滤思考内容: {data[:50]}...") + continue + # 清理脏数据 + data = self._clean_text(data) + if not data: + continue + yield data + + except requests.exceptions.ConnectionError: + logger.bind(tag=TAG).error("无法连接蚂蚁阿福 Bridge,请检查手机和 Frida 状态") + self.should_idle = True + yield "抱歉,蚂蚁阿福服务暂时不可用。" + except requests.exceptions.Timeout: + logger.bind(tag=TAG).error(f"蚂蚁阿福 Bridge 超时 ({self.timeout}s)") + self.should_idle = True + yield "抱歉,回答超时了。" + except Exception as e: + logger.bind(tag=TAG).error(f"AntafLLM 异常: {e}") + self.should_idle = True + yield "抱歉,发生了错误。" diff --git a/modules/antaf/antaf_passthrough.py b/modules/antaf/antaf_passthrough.py new file mode 100644 index 0000000..547a807 --- /dev/null +++ b/modules/antaf/antaf_passthrough.py @@ -0,0 +1,251 @@ +""" +Antaf Voice Passthrough ASR Provider + +Replaces ASR→LLM→TTS pipeline with direct audio forwarding to Antaf voice_bridge. +ESP32 audio → decode Opus → resample 16kHz→48kHz → inject to voice_bridge (type=3) +voice_bridge speaker (type=0) → resample 48kHz→16kHz → encode Opus → send to ESP32 + +Runs within xiaozhi-server, keeping all protocol handling (hello, OTA, wake word) intact. +""" + +import json +import struct +import asyncio +import threading +import numpy as np +import opuslib_next +from scipy.signal import resample_poly +from math import gcd +from config.logger import setup_logging +from core.providers.asr.base import ASRProviderBase +from core.handle.sendAudioHandle import send_tts_message + +TAG = __name__ +logger = setup_logging() + +# Audio parameters +ESP_SR = 16000 +ESP_FRAME_SAMPLES = 960 # 60ms at 16kHz +BRIDGE_SR = 48000 +BRIDGE_FRAME_SAMPLES = 480 # 960 bytes / 2 = 480 samples + +# Resample ratios +UP = (BRIDGE_SR // gcd(BRIDGE_SR, ESP_SR), ESP_SR // gcd(BRIDGE_SR, ESP_SR)) # (3,1) +DOWN = (ESP_SR // gcd(ESP_SR, BRIDGE_SR), BRIDGE_SR // gcd(ESP_SR, BRIDGE_SR)) # (1,3) + + +class ASRProvider(ASRProviderBase): + def __init__(self, config): + super().__init__() + self.bridge_host = config.get("bridge_host", "127.0.0.1") + self.bridge_port = int(config.get("bridge_port", 18901)) + self.interface_type = "NON_STREAM" + self.conn = None + self.bridge_reader = None + self.bridge_writer = None + self.opus_decoder = None + self.opus_encoder = None + self._inject_buf = np.array([], dtype=np.int16) + self._speaker_buf = np.array([], dtype=np.int16) + self._tts_started = False + self._recv_task = None + self._connected = False + logger.bind(tag=TAG).info( + f"AntafPassthrough 初始化: bridge={self.bridge_host}:{self.bridge_port}" + ) + + async def open_audio_channels(self, conn): + """Override: connect to bridge, start passthrough instead of normal ASR.""" + # Clean up previous connection if any + await self.close() + + self.conn = conn + self.opus_decoder = opuslib_next.Decoder(ESP_SR, 1) + self.opus_encoder = opuslib_next.Encoder(ESP_SR, 1, opuslib_next.APPLICATION_AUDIO) + self._tts_started = False + self._silence_count = 0 + self._inject_buf = np.array([], dtype=np.int16) + self._speaker_buf = np.array([], dtype=np.int16) + self._write_lock = threading.Lock() + + # Connect to voice_bridge + try: + self.bridge_reader, self.bridge_writer = await asyncio.open_connection( + self.bridge_host, self.bridge_port + ) + # Read connected event + ftype, data = await self._bridge_recv() + if ftype == 1: + msg = json.loads(data.decode()) + logger.bind(tag=TAG).info(f"Bridge connected: {msg.get('protocol')}") + + # Send start + inject_on + self._bridge_send_cmd({"cmd": "start"}) + ftype, data = await self._bridge_recv() + if ftype == 1: + logger.bind(tag=TAG).info(f"Bridge: {json.loads(data.decode())}") + + self._bridge_send_cmd({"cmd": "inject_on"}) + ftype, data = await self._bridge_recv() + if ftype == 1: + logger.bind(tag=TAG).info(f"Bridge: {json.loads(data.decode())}") + + self._connected = True + logger.bind(tag=TAG).info("Voice bridge ready (inject mode)") + + # Start speaker receive loop + self._recv_task = asyncio.create_task(self._speaker_recv_loop()) + + except Exception as e: + logger.bind(tag=TAG).error(f"Bridge connection failed: {e}") + self._connected = False + + # Start normal audio processing thread (reads from asr_audio_queue) + conn.asr_priority_thread = threading.Thread( + target=self._audio_thread, args=(conn,), daemon=True + ) + conn.asr_priority_thread.start() + + def _audio_thread(self, conn): + """Read Opus frames from queue, decode, resample, inject to bridge.""" + import queue as queue_module + frame_count = 0 + while not conn.stop_event.is_set(): + try: + opus_data = conn.asr_audio_queue.get(timeout=1) + if not self._connected: + continue + + frame_count += 1 + if frame_count <= 3 or frame_count % 200 == 0: + logger.bind(tag=TAG).debug(f"Audio frame #{frame_count}") + + # Decode Opus → PCM 16kHz + pcm = self.opus_decoder.decode(opus_data, ESP_FRAME_SAMPLES) + samples = np.frombuffer(pcm, dtype=np.int16) + + # Resample 16kHz → 48kHz + upsampled = resample_poly(samples, UP[0], UP[1]).astype(np.int16) + + # Split into bridge frames and inject + self._inject_buf = np.concatenate([self._inject_buf, upsampled]) + while len(self._inject_buf) >= BRIDGE_FRAME_SAMPLES: + frame = self._inject_buf[:BRIDGE_FRAME_SAMPLES] + self._inject_buf = self._inject_buf[BRIDGE_FRAME_SAMPLES:] + self._bridge_send_inject(frame.tobytes()) + + except queue_module.Empty: + continue + except Exception as e: + logger.bind(tag=TAG).error(f"Audio thread error: {e}") + + async def _speaker_recv_loop(self): + """Receive speaker PCM from bridge, resample, encode Opus, send to ESP32.""" + try: + while self._connected: + ftype, data = await self._bridge_recv() + if ftype == 0: + # Speaker audio + await self._handle_speaker(data) + elif ftype == 1: + msg = json.loads(data.decode()) + logger.bind(tag=TAG).debug(f"Bridge event: {msg}") + except asyncio.IncompleteReadError: + logger.bind(tag=TAG).warning("Bridge connection closed") + except Exception as e: + logger.bind(tag=TAG).error(f"Speaker recv error: {e}") + finally: + self._connected = False + + async def _handle_speaker(self, pcm_bytes): + """Process speaker frame and send to ESP32.""" + if not self.conn or not self.conn.websocket: + return + + samples = np.frombuffer(pcm_bytes, dtype=np.int16) + max_amp = int(np.max(np.abs(samples))) + + # Track silence for tts stop + if max_amp < 10: + if self._tts_started: + self._silence_count += 1 + # 50 frames of silence (~1 second) → send tts stop + if self._silence_count > 50: + try: + await send_tts_message(self.conn, "stop") + self.conn.client_is_speaking = False + self._tts_started = False + self._silence_count = 0 + logger.bind(tag=TAG).info("Sent tts stop to ESP32") + except Exception as e: + logger.bind(tag=TAG).error(f"Send tts stop error: {e}") + return + + # Reset silence counter on non-silent frame + self._silence_count = 0 + + # Send tts start before first audio + if not self._tts_started: + try: + await send_tts_message(self.conn, "start") + self._tts_started = True + self.conn.client_is_speaking = True + logger.bind(tag=TAG).info("Sent tts start to ESP32") + except Exception as e: + logger.bind(tag=TAG).error(f"Send tts start error: {e}") + return + + # Resample 48kHz → 16kHz + downsampled = resample_poly(samples, DOWN[0], DOWN[1]).astype(np.int16) + + # Accumulate and encode + self._speaker_buf = np.concatenate([self._speaker_buf, downsampled]) + while len(self._speaker_buf) >= ESP_FRAME_SAMPLES: + frame = self._speaker_buf[:ESP_FRAME_SAMPLES] + self._speaker_buf = self._speaker_buf[ESP_FRAME_SAMPLES:] + opus_data = self.opus_encoder.encode(frame.tobytes(), ESP_FRAME_SAMPLES) + try: + await self.conn.websocket.send(opus_data) + except Exception as e: + logger.bind(tag=TAG).error(f"Send opus to ESP32 error: {e}") + return + + # Bridge TCP helpers + async def _bridge_recv(self): + header = await self.bridge_reader.readexactly(5) + length = struct.unpack(">I", header[:4])[0] + ftype = header[4] + data = await self.bridge_reader.readexactly(length) + return ftype, data + + def _bridge_send_cmd(self, cmd): + data = json.dumps(cmd).encode() + header = struct.pack(">IB", len(data), 1) + with self._write_lock: + self.bridge_writer.write(header + data) + + def _bridge_send_inject(self, pcm_bytes): + header = struct.pack(">IB", len(pcm_bytes), 3) + with self._write_lock: + self.bridge_writer.write(header + pcm_bytes) + + # ASR interface — never returns text, LLM/TTS never triggered + async def receive_audio(self, conn, audio, audio_have_voice): + """No-op: audio is handled by _audio_thread directly from queue.""" + pass + + async def speech_to_text(self, opus_data, session_id, audio_format="opus", artifacts=None): + """Never called in passthrough mode.""" + return "", None + + async def close(self): + self._connected = False + if self._recv_task: + self._recv_task.cancel() + if self.bridge_writer: + try: + self._bridge_send_cmd({"cmd": "inject_off"}) + self._bridge_send_cmd({"cmd": "stop"}) + self.bridge_writer.close() + except Exception: + pass diff --git a/modules/antaf/relay.py b/modules/antaf/relay.py new file mode 100644 index 0000000..cb8009e --- /dev/null +++ b/modules/antaf/relay.py @@ -0,0 +1,307 @@ +#!/usr/bin/env python3 +""" +ESP32 ↔ Antaf Voice Relay +Bridges ESP32 (WebSocket/Opus) with Antaf voice_bridge (TCP/PCM). + +ESP32 → Opus decode → resample 16kHz→48kHz → voice_bridge inject (type=3) +ESP32 ← Opus encode ← resample 48kHz→16kHz ← voice_bridge speaker (type=0) + +Usage: python relay.py [--ws-port 8010] [--bridge-host 127.0.0.1] [--bridge-port 18901] +""" + +import asyncio +import json +import struct +import argparse +import logging +import numpy as np +from scipy.signal import resample_poly +from math import gcd + +logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") +log = logging.getLogger("relay") + +try: + import opuslib_next as opuslib +except ImportError: + import opuslib + +import websockets + +# Audio parameters +ESP_SAMPLE_RATE = 16000 # ESP32 Opus sample rate +ESP_FRAME_MS = 60 # ESP32 frame duration +ESP_FRAME_SIZE = ESP_SAMPLE_RATE * ESP_FRAME_MS // 1000 # 960 samples + +BRIDGE_SAMPLE_RATE = 48000 # voice_bridge micIn sample rate +BRIDGE_FRAME_BYTES = 960 # 480 samples * 2 bytes +BRIDGE_FRAME_SAMPLES = 480 + +# Resampling ratios +UP_GCD = gcd(BRIDGE_SAMPLE_RATE, ESP_SAMPLE_RATE) # 16000 → 48000 +UP_RATIO = (BRIDGE_SAMPLE_RATE // UP_GCD, ESP_SAMPLE_RATE // UP_GCD) # (3, 1) +DOWN_GCD = gcd(ESP_SAMPLE_RATE, BRIDGE_SAMPLE_RATE) # 48000 → 16000 +DOWN_RATIO = (ESP_SAMPLE_RATE // DOWN_GCD, BRIDGE_SAMPLE_RATE // DOWN_GCD) # (1, 3) + + +class BridgeClient: + """TCP client for voice_bridge_v7.""" + + def __init__(self, host, port): + self.host = host + self.port = port + self.reader = None + self.writer = None + self.on_speaker_frame = None # callback(pcm_bytes) + self._recv_task = None + + async def connect(self): + self.reader, self.writer = await asyncio.open_connection(self.host, self.port) + log.info(f"Connected to voice_bridge {self.host}:{self.port}") + + # Read connected event + ftype, data = await self._recv_frame() + if ftype == 1: + msg = json.loads(data.decode()) + log.info(f"Bridge: {msg.get('protocol')}") + + async def _recv_frame(self): + header = await self.reader.readexactly(5) + length = struct.unpack(">I", header[:4])[0] + ftype = header[4] + data = await self.reader.readexactly(length) + return ftype, data + + def _send_frame(self, ftype, data): + header = struct.pack(">IB", len(data), ftype) + self.writer.write(header + data) + # Note: no await drain() here — voice frames are time-sensitive, + # TCP buffer handles backpressure + + def send_cmd(self, cmd): + self._send_frame(1, json.dumps(cmd).encode()) + + def send_inject(self, pcm_bytes): + self._send_frame(3, pcm_bytes) + + async def start_recv_loop(self): + """Background task: receive frames from bridge.""" + try: + while True: + ftype, data = await self._recv_frame() + if ftype == 0 and self.on_speaker_frame: + # Speaker audio + await self.on_speaker_frame(data) + elif ftype == 1: + msg = json.loads(data.decode()) + log.info(f"Bridge event: {msg}") + except asyncio.IncompleteReadError: + log.warning("Bridge connection closed") + except Exception as e: + log.error(f"Bridge recv error: {e}") + + async def setup_voice(self): + """Start capture and enable inject. Voice chat must already be open.""" + self.send_cmd({"cmd": "start"}) + ftype, data = await self._recv_frame() + if ftype == 1: + msg = json.loads(data.decode()) + log.info(f"Bridge: {msg}") + self.send_cmd({"cmd": "inject_on"}) + ftype, data = await self._recv_frame() + if ftype == 1: + msg = json.loads(data.decode()) + log.info(f"Bridge: {msg}") + log.info("Voice bridge ready (inject mode)") + + async def close(self): + try: + self.send_cmd({"cmd": "inject_off"}) + self.send_cmd({"cmd": "stop"}) + except Exception: + pass + if self.writer: + self.writer.close() + + +class Relay: + """Main relay: ESP32 WebSocket ↔ Antaf voice_bridge TCP.""" + + def __init__(self, ws_port, bridge_host, bridge_port): + self.ws_port = ws_port + self.bridge_host = bridge_host + self.bridge_port = bridge_port + self.bridge = None + self.ws = None + self.opus_decoder = None + self.opus_encoder = None + # Buffer for resampled PCM to split into bridge frames + self._inject_buf = np.array([], dtype=np.int16) + # Buffer for speaker PCM to accumulate before encoding + self._speaker_buf = np.array([], dtype=np.int16) + self._audio_in_count = 0 + self._audio_out_count = 0 + self._tts_started = False # track if we sent tts start to ESP32 + + async def handle_esp32(self, websocket): + """Handle one ESP32 WebSocket connection.""" + log.info(f"ESP32 connected from {websocket.remote_address}") + self.ws = websocket + + # Init Opus codec + self.opus_decoder = opuslib.Decoder(ESP_SAMPLE_RATE, 1) + self.opus_encoder = opuslib.Encoder(ESP_SAMPLE_RATE, 1, opuslib.APPLICATION_AUDIO) + + # Connect to voice bridge and setup voice chat first + self.bridge = BridgeClient(self.bridge_host, self.bridge_port) + await self.bridge.connect() + await self.bridge.setup_voice() + + # Now start receiving speaker audio + self.bridge.on_speaker_frame = self._on_speaker_frame + recv_task = asyncio.create_task(self.bridge.start_recv_loop()) + + try: + async for message in websocket: + if isinstance(message, str): + # Text message from ESP32 (hello, listen, etc.) + await self._handle_text(message) + elif isinstance(message, bytes): + # Opus audio from ESP32 + await self._handle_audio(message) + except websockets.exceptions.ConnectionClosed: + log.info("ESP32 disconnected") + finally: + recv_task.cancel() + await self.bridge.close() + self.ws = None + log.info("Session ended") + + async def _handle_text(self, message): + """Handle text messages from ESP32.""" + try: + msg = json.loads(message) + msg_type = msg.get("type") + + if msg_type == "hello": + # Respond with proper hello — must match xiaozhi protocol + resp = { + "type": "hello", + "version": 1, + "transport": "websocket", + "session_id": "relay-session", + "audio_params": { + "format": "opus", + "sample_rate": ESP_SAMPLE_RATE, + "channels": 1, + "frame_duration": ESP_FRAME_MS, + }, + } + await self.ws.send(json.dumps(resp)) + log.info(f"ESP32 hello: {msg.get('audio_params')}") + + elif msg_type == "listen": + state = msg.get("state") + log.info(f"ESP32 listen: {state}") + if state == "detect": + text = msg.get("text", "") + log.info(f"Wake word: {text}") + # Don't send tts start — let ESP32 continue recording + + elif msg_type == "abort": + log.info("ESP32 abort") + + except json.JSONDecodeError: + log.warning(f"Invalid JSON from ESP32: {message[:100]}") + + async def _handle_audio(self, opus_data): + """Decode Opus from ESP32, resample, inject into voice_bridge.""" + try: + self._audio_in_count += 1 + if self._audio_in_count <= 3 or self._audio_in_count % 100 == 0: + log.info(f"ESP32 audio frame #{self._audio_in_count}, size={len(opus_data)}") + + # Decode Opus → PCM 16kHz mono + pcm = self.opus_decoder.decode(opus_data, ESP_FRAME_SIZE) + samples = np.frombuffer(pcm, dtype=np.int16) + + # Resample 16kHz → 48kHz + upsampled = resample_poly(samples, UP_RATIO[0], UP_RATIO[1]).astype(np.int16) + + # Append to inject buffer and send in bridge frame sizes + self._inject_buf = np.concatenate([self._inject_buf, upsampled]) + while len(self._inject_buf) >= BRIDGE_FRAME_SAMPLES: + frame = self._inject_buf[:BRIDGE_FRAME_SAMPLES] + self._inject_buf = self._inject_buf[BRIDGE_FRAME_SAMPLES:] + self.bridge.send_inject(frame.tobytes()) + + except Exception as e: + log.error(f"Audio inject error: {e}") + + async def _on_speaker_frame(self, pcm_bytes): + """Receive speaker PCM from bridge, resample, encode Opus, send to ESP32.""" + if not self.ws or getattr(self.ws, 'closed', False): + return + try: + self._audio_out_count += 1 + + samples = np.frombuffer(pcm_bytes, dtype=np.int16) + max_amp = int(np.max(np.abs(samples))) + + if self._audio_out_count <= 3 or self._audio_out_count % 100 == 0: + log.info(f"Speaker frame #{self._audio_out_count}, size={len(pcm_bytes)}, max_amp={max_amp}") + + # Only send non-silent frames to ESP32 + if max_amp < 10: + # If we were playing and now silent for a while, send tts stop + if self._tts_started and self._audio_out_count % 50 == 0: + # Check later — don't stop immediately, silence gaps are normal + pass + return + + # Send tts start before first audio frame + if not self._tts_started: + await self.ws.send(json.dumps({ + "type": "tts", "state": "start", + "session_id": "relay-session" + })) + self._tts_started = True + log.info("Sent tts start to ESP32") + + # Resample 48kHz → 16kHz + downsampled = resample_poly(samples, DOWN_RATIO[0], DOWN_RATIO[1]).astype(np.int16) + + # Accumulate into speaker buffer, encode when we have enough + self._speaker_buf = np.concatenate([self._speaker_buf, downsampled]) + while len(self._speaker_buf) >= ESP_FRAME_SIZE: + frame = self._speaker_buf[:ESP_FRAME_SIZE] + self._speaker_buf = self._speaker_buf[ESP_FRAME_SIZE:] + # Encode PCM → Opus + opus_data = self.opus_encoder.encode(frame.tobytes(), ESP_FRAME_SIZE) + await self.ws.send(opus_data) + + except Exception as e: + log.error(f"Speaker send error: {e}") + + async def run(self): + log.info(f"Relay starting on ws://0.0.0.0:{self.ws_port}/xiaozhi/v1/") + async with websockets.serve( + self.handle_esp32, "0.0.0.0", self.ws_port, + ping_interval=30, ping_timeout=10, + ): + await asyncio.Future() # run forever + + +def main(): + parser = argparse.ArgumentParser(description="ESP32-Antaf Voice Relay") + parser.add_argument("--ws-port", type=int, default=8010, help="WebSocket port for ESP32") + parser.add_argument("--bridge-host", default="127.0.0.1", help="voice_bridge host") + parser.add_argument("--bridge-port", type=int, default=18901, help="voice_bridge port") + args = parser.parse_args() + + relay = Relay(args.ws_port, args.bridge_host, args.bridge_port) + asyncio.run(relay.run()) + + +if __name__ == "__main__": + main() diff --git a/modules/antaf/test_inject.py b/modules/antaf/test_inject.py new file mode 100644 index 0000000..c56a031 --- /dev/null +++ b/modules/antaf/test_inject.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python3 +"""Test voice_bridge_v7 audio injection. +Connect to voice_bridge, open voice chat, enable inject mode, +send silence frames, and print any speaker output received. + +Usage: python test_inject.py [host] [port] +""" +import socket +import struct +import json +import time +import sys +import threading + +HOST = sys.argv[1] if len(sys.argv) > 1 else "127.0.0.1" +PORT = int(sys.argv[2]) if len(sys.argv) > 2 else 18901 + +FRAME_SIZE = 960 # 960 bytes per frame (480 samples * 16bit) + + +def send_cmd(sock, cmd): + data = json.dumps(cmd).encode("utf-8") + header = struct.pack(">IB", len(data), 1) # type=1 text + sock.sendall(header + data) + + +def send_inject(sock, pcm_frame): + header = struct.pack(">IB", len(pcm_frame), 3) # type=3 inject + sock.sendall(header + pcm_frame) + + +def recv_exact(sock, n): + buf = b"" + while len(buf) < n: + chunk = sock.recv(n - len(buf)) + if not chunk: + return None + buf += chunk + return buf + + +def recv_frame(sock): + header = recv_exact(sock, 5) + if header is None: + return None, None + length = struct.unpack(">I", header[:4])[0] + ftype = header[4] + if length > 1048576: + return None, None + data = recv_exact(sock, length) + if data is None: + return None, None + return ftype, data + + +def receiver(sock): + """Background thread to print received frames.""" + spk_count = 0 + while True: + try: + ftype, data = recv_frame(sock) + if ftype is None: + print("[RECV] Connection closed") + break + if ftype == 1: # text/json + msg = json.loads(data.decode("utf-8")) + print(f"[RECV] {msg}") + elif ftype == 0: # speaker audio + spk_count += 1 + # Check if audio is non-silent + samples = struct.unpack(f"<{len(data)//2}h", data) + max_amp = max(abs(s) for s in samples) + if spk_count <= 5 or spk_count % 100 == 0 or max_amp > 500: + print(f"[SPK] frame={spk_count} size={len(data)} max_amp={max_amp}") + elif ftype == 2: # mic audio + pass # ignore mic echo + except Exception as e: + print(f"[RECV] Error: {e}") + break + + +def main(): + print(f"Connecting to {HOST}:{PORT}...") + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((HOST, PORT)) + print("Connected") + + # Start receiver thread + t = threading.Thread(target=receiver, args=(sock,), daemon=True) + t.start() + + time.sleep(1) + + # Open voice chat + print("Opening voice chat...") + send_cmd(sock, {"cmd": "open_voice"}) + time.sleep(3) + + # Start capture + print("Starting capture...") + send_cmd(sock, {"cmd": "start"}) + time.sleep(1) + + # Enable inject mode + print("Enabling inject mode...") + send_cmd(sock, {"cmd": "inject_on"}) + time.sleep(0.5) + + # Send silence frames for 3 seconds (48kHz, 960 bytes/frame = 20ms) + # 3 seconds = 150 frames + print("Sending 150 silence frames (3 seconds)...") + silence = b"\x00" * FRAME_SIZE + for i in range(150): + send_inject(sock, silence) + time.sleep(0.02) # 20ms per frame + + print("Done sending. Waiting for speaker output...") + time.sleep(10) + + # Stop + send_cmd(sock, {"cmd": "inject_off"}) + send_cmd(sock, {"cmd": "stop"}) + time.sleep(1) + + print("Test complete") + sock.close() + + +if __name__ == "__main__": + main() diff --git a/modules/antaf/voice_bridge_v7.js b/modules/antaf/voice_bridge_v7.js new file mode 100644 index 0000000..5cf95a5 --- /dev/null +++ b/modules/antaf/voice_bridge_v7.js @@ -0,0 +1,209 @@ +// voice_bridge_v7.js — Voice Bridge with Audio Injection +// Hook point: libantaudio.so MFAntAudio3AV2Filter::process(micIn, spkRef, out, size, &result) +// TCP :18901 +// Frame: 4-byte len + 1-byte type + payload +// type 0: speaker/AI audio (spkRef, downstream to client) +// type 1: text/JSON command +// type 2: mic audio (micIn, downstream to client) +// type 3: inject audio (upstream from client, replaces micIn) + +var voiceActive = false; +var clientOS = null; +var capturedSpk = 0, capturedMic = 0, spkBytes = 0, micBytes = 0; +var injectMode = false; // true = replace mic with injected audio +var injectQueue = []; // queue of PCM frames to inject + +function wf(os, type, jArr) { + try { + var len = jArr.length; + var h = Java.array("byte", [(len>>24)&0xFF,(len>>16)&0xFF,(len>>8)&0xFF,len&0xFF, type]); + os.write(h); os.write(jArr); os.flush(); + } catch(e) {} +} +function wt(os, text) { + wf(os, 1, Java.use("java.lang.String").$new(text).getBytes("UTF-8")); +} + +// === Hook libantaudio.so === +var hooked = false; +function tryHook() { + if (hooked) return; + var m = Process.findModuleByName("libantaudio.so"); + if (!m) return; + var addr = m.findExportByName("_ZN8antaudio20MFAntAudio3AV2Filter7processEPhS1_S1_iRi"); + if (!addr) return; + hooked = true; + + Interceptor.attach(addr, { + onEnter: function(args) { + if (!voiceActive || !clientOS) return; + var size = args[4].toInt32(); + if (size <= 0) return; + + try { + // If inject mode, replace micIn with queued or silence + if (injectMode) { + if (injectQueue.length > 0) { + var frame = injectQueue.shift(); + // Only write if frame size matches expected size + if (frame.byteLength === size) { + args[1].writeByteArray(frame); + } else if (frame.byteLength > 0) { + // Size mismatch — pad or truncate + var buf = new ArrayBuffer(size); + var dst = new Uint8Array(buf); + var src = new Uint8Array(frame); + var copyLen = Math.min(size, frame.byteLength); + for (var k = 0; k < copyLen; k++) dst[k] = src[k]; + args[1].writeByteArray(buf); + } + } else { + // No data queued — inject silence to avoid mic leak + var silence = new ArrayBuffer(size); + args[1].writeByteArray(silence); + } + } + + // Always capture speaker/AI output (type 0) + var spkPcm = args[2].readByteArray(size); + var spkArr = Java.array("byte", Array.from(new Uint8Array(spkPcm))); + wf(clientOS, 0, spkArr); + capturedSpk++; spkBytes += size; + + // Capture mic (type 2) only when not injecting + if (!injectMode) { + var micPcm = args[1].readByteArray(size); + var micArr = Java.array("byte", Array.from(new Uint8Array(micPcm))); + wf(clientOS, 2, micArr); + } + capturedMic++; micBytes += size; + + if (capturedMic <= 3 || capturedMic % 500 === 0) + console.log("[VOICE] mic=" + capturedMic + " spk=" + capturedSpk + " inject=" + injectQueue.length); + } catch(e) {} + } + }); + console.log("[VOICE] 3AV2Filter.process hooked @ " + addr); +} + +[0, 1000, 3000, 5000, 10000, 15000, 20000].forEach(function(ms) { setTimeout(tryHook, ms); }); +try { + new ApiResolver("module").enumerateMatches("exports:linker*!*dlopen*").forEach(function(d) { + Interceptor.attach(d.address, { onLeave: function() { setTimeout(tryHook, 500); } }); + }); +} catch(e) {} + +// === TCP Server === +Java.perform(function() { + var SS = Java.use("java.net.ServerSocket"); + var JS = Java.use("java.lang.String"); + var server = SS.$new(18901); + console.log("[VOICE] Listening :18901"); + + function openVoice(os) { + Java.scheduleOnMainThread(function() { + try { + Java.choose("com.antgroup.aijk.android.ijklauncher.biz.activity.IJKActivity", { + onMatch: function(a) { + var fm = a.getSupportFragmentManager(); + var f = Java.use("com.antgroup.aijk.android.ijkchat.biz.voicechat.IjkVoiceChatFragment").$new(); + f.show(fm, "v"); + console.log("[VOICE] Opened"); + }, onComplete: function() {} + }); + setTimeout(function() { wt(os, JSON.stringify({event:"voice_opened"})); }, 2000); + } catch(e) { wt(os, JSON.stringify({event:"error",msg:""+e})); } + }); + } + + function closeVoice(os) { + Java.scheduleOnMainThread(function() { + try { + Java.choose("com.antgroup.aijk.android.ijkchat.biz.voicechat.IjkVoiceChatFragment", { + onMatch: function(f) { f.dismiss(); console.log("[VOICE] Closed"); }, + onComplete: function() {} + }); + setTimeout(function() { wt(os, JSON.stringify({event:"voice_closed"})); }, 1000); + } catch(e) { wt(os, JSON.stringify({event:"error",msg:""+e})); } + }); + } + + var Srv = Java.registerClass({ + name: "com.antaf.voice.S7", + implements: [Java.use("java.lang.Runnable")], + methods: { + run: function() { + while (true) { + try { + console.log("[VOICE] Waiting..."); + var c = server.accept(); + var is = c.getInputStream(); + var os = c.getOutputStream(); + clientOS = os; + console.log("[VOICE] Connected"); + wt(os, JSON.stringify({ + event:"connected", protocol:"antaf-voice-v8", + commands:["open_voice","close_voice","start","stop","status","inject_on","inject_off"], + audio:"pcm-16bit-960b-frames", + frameTypes:{0:"spk_ai",1:"text",2:"mic",3:"inject"} + })); + + while (true) { + var hb = []; + for (var i=0;i<5;i++) { var b=is.read(); if(b<0) throw "EOF"; hb.push(b); } + var fl=(hb[0]<<24)|(hb[1]<<16)|(hb[2]<<8)|hb[3], ft=hb[4]; + if (fl>1048576) break; + var pb = []; + for (var i=0;i>24)&0xFF,(len>>16)&0xFF,(len>>8)&0xFF,len&0xFF, type]); + os.write(h); os.write(jArr); os.flush(); + } catch(e) {} +} +function wt(os, text) { + wf(os, 1, Java.use("java.lang.String").$new(text).getBytes("UTF-8")); +} + +// === Hook libantaudio.so (should already be loaded) === +var hooked = false; +function tryHook() { + if (hooked) return; + var m = Process.findModuleByName("libantaudio.so"); + if (!m) return; + var addr = m.findExportByName("_ZN8antaudio20MFAntAudio3AV2Filter7processEPhS1_S1_iRi"); + if (!addr) return; + hooked = true; + + Interceptor.attach(addr, { + onEnter: function(args) { + if (!voiceActive || !clientOS) return; + var size = args[4].toInt32(); + if (size <= 0) return; + + try { + if (injectMode) { + if (injectQueue.length > 0) { + var frame = injectQueue.shift(); + if (frame.byteLength === size) { + args[1].writeByteArray(frame); + } else { + var buf = new ArrayBuffer(size); + var dst = new Uint8Array(buf); + var src = new Uint8Array(frame); + var copyLen = Math.min(size, frame.byteLength); + for (var k = 0; k < copyLen; k++) dst[k] = src[k]; + args[1].writeByteArray(buf); + } + } else { + var silence = new ArrayBuffer(size); + args[1].writeByteArray(silence); + } + } + + // Always capture speaker/AI output (type 0) + var spkPcm = args[2].readByteArray(size); + var spkArr = Java.array("byte", Array.from(new Uint8Array(spkPcm))); + wf(clientOS, 0, spkArr); + capturedSpk++; spkBytes += size; + + if (!injectMode) { + var micPcm = args[1].readByteArray(size); + var micArr = Java.array("byte", Array.from(new Uint8Array(micPcm))); + wf(clientOS, 2, micArr); + } + capturedMic++; micBytes += size; + + if (capturedMic <= 3 || capturedMic % 500 === 0) + console.log("[VOICE] mic=" + capturedMic + " spk=" + capturedSpk + " inject=" + injectQueue.length); + } catch(e) {} + } + }); + console.log("[VOICE] process hooked @ " + addr); +} + +// Hook immediately — lib should already be loaded since voice chat is open +tryHook(); +if (!hooked) { + // Retry a few times in case of timing + [500, 1000, 2000, 5000].forEach(function(ms) { setTimeout(tryHook, ms); }); +} + +// === TCP Server === +Java.perform(function() { + var SS = Java.use("java.net.ServerSocket"); + var JS = Java.use("java.lang.String"); + var server = SS.$new(18901); + console.log("[VOICE] Listening :18901"); + + var Srv = Java.registerClass({ + name: "com.antaf.voice.S8", + implements: [Java.use("java.lang.Runnable")], + methods: { + run: function() { + while (true) { + try { + console.log("[VOICE] Waiting for client..."); + var c = server.accept(); + var is = c.getInputStream(); + var os = c.getOutputStream(); + clientOS = os; + console.log("[VOICE] Client connected"); + wt(os, JSON.stringify({ + event:"connected", protocol:"antaf-voice-v8", + hooked: hooked, + commands:["start","stop","status","inject_on","inject_off"], + audio:"pcm-16bit-960b-frames", + frameTypes:{0:"spk_ai",1:"text",2:"mic",3:"inject"} + })); + + while (true) { + var hb = []; + for (var i=0;i<5;i++) { var b=is.read(); if(b<0) throw "EOF"; hb.push(b); } + var fl=(hb[0]<<24)|(hb[1]<<16)|(hb[2]<<8)|hb[3], ft=hb[4]; + if (fl>1048576) break; + var pb = []; + for (var i=0;i 0: + yield data + + except requests.exceptions.ConnectionError: + logger.bind(tag=TAG).error("无法连接蚂��阿福 Bridge,请检查手机和 Frida 状态") + yield "抱歉,蚂蚁阿福服务暂时不可用。" + except requests.exceptions.Timeout: + logger.bind(tag=TAG).error(f"蚂蚁阿福 Bridge 超时 ({self.timeout}s)") + yield "抱歉,回答超时了。" + except Exception as e: + logger.bind(tag=TAG).error(f"AntafLLM 异常: {e}") + yield "抱歉,发生了错误。" +``` + +### 第2步:修改服务端配置 + +编辑 `backend/main/xiaozhi-server/data/.config.yaml`: + +```yaml +selected_module: + LLM: antaf # 改为���蚁阿福 + +LLM: + antaf: + type: antaf + bridge_url: http://<手机IP>:18900 # 手机的 HTTP Bridge 地址 + timeout: 60 # SSE 流超时时间 +``` + +也可以保留原来的 Qwen3 配置,方便切换: + +```yaml +LLM: + antaf: + type: antaf + bridge_url: http://<手机IP>:18900 + timeout: 60 + Qwen3: + type: openai + model_name: Qwen3-32B + url: http://127.0.0.1:30000/v1 + api_key: EMPTY +``` + +### 第3步:网络打通 + +手机的 Frida Bridge 端口需要让 PlugAI 服���器能访问到。有两种方式: + +#### 方式1:手机直连局域网(推荐) + +如果手机和 PlugAI 服务器在同一网络(或手机有公网可达 IP): +```bash +# 手机上启动 bridge 后,服务端直接访问 +# bridge_url: http://<手机内网IP>:18900 +curl http://<手机IP>:18900/chat?q=hello +``` + +#### 方式2:adb forward + SSH 隧道 + +手机通过 USB 连接一台中间机器,再通过 SSH 隧道暴露��� +```bash +# 中间机器上 +adb forward tcp:18900 tcp:18900 + +# PlugAI 上建 SSH 隧道 +ssh -L 18900:127.0.0.1:18900 user@中间机器IP +# bridge_url: http://127.0.0.1:18900 +``` + +### 第4步:启动与测试 + +```bash +# 1. 手机端:启动 Frida + HTTP Bridge +frida -U -p -l http_bridge_stream.js + +# 2. 先测 bridge 连通性 +curl -N 'http://<手机IP>:18900/chat?q=你好' + +# 3. PlugAI 服务端:重启小智服务 +cd /home/ZeroStack/xiaozhi/xiaozhi-esp32-server/main/xiaozhi-server +source /home/ZeroStack/xiaozhi/venv/bin/activate +python app.py + +# 4. ESP32 设备:唤醒测试 +# 说 "你好小智" → 提问 → 应该听到蚂蚁阿福的回答(EdgeTTS 合成的语音) +``` + +--- + +## 方案B 详细实施(后续) + +### 核心改造:voice_bridge.js 支持音频注入 + +当前 voice_bridge.js 的 `MFAntAudio3AV2Filter::process` hook 只**读取** micIn 缓冲区。 +需要改造为可以从外部**写入** micIn 缓冲区,替换真实麦克风输入。 + +#### 改造要点 + +```javascript +// voice_bridge.js 新增功能 +var injectBuffer = null; // 外部注入的 PCM 数据 + +// 新增 inject 命令:接收外部 PCM 音频帧 +// 客户端发送: [4字节长度][type=3][960字节PCM数据] +// type 3 = inject audio + +Interceptor.attach(processAddr, { + onEnter: function(args) { + var micIn = args[1]; // 麦克风输入缓冲区 (960 bytes) + var frameSize = args[4]; // 960 + + if (injectBuffer !== null) { + // 用注入数据覆盖真实麦克风输入 + micIn.writeByteArray(injectBuffer); + injectBuffer = null; + } + } +}); +``` + +#### 采样率转换 + +| 来源 | 格式 | 需转换为 | +|------|------|---------| +| ESP32 → 服务端 | Opus 24kHz mono | PCM 48kHz mono (阿福 mic) | +| 阿福 speaker 输出 | PCM 24kHz stereo | Opus 24kHz mono (ESP32) | + +服务端需要: +- libopus 解码/编码 +- resampy 或 scipy 做采样率转换 +- 960字节帧对齐(20ms @ 48kHz) + +#### 新增音频转发模块 + +文件��径:`backend/main/xiaozhi-server/core/providers/asr/antaf_voice/antaf_voice.py` + +这是一个特殊的 ASR Provider,它不做语音识别,而是: +1. 接收 ESP32 的 Opus 音频流 +2. 解码为 PCM,重采样 24k→48k +3. 通过 TCP 发送到 voice_bridge (port 18901) 的 inject 命令 +4. 接收 voice_bridge 的 speaker 输出 +5. 重采样 24k stereo → 24k mono,Opus 编码 +6. 直接发回 ESP32(跳过 LLM 和 TTS) + +#### 方案B 风险点 + +1. **帧时序同步**: ESP32 音频帧和阿福 process() 调用频率可能不一致 +2. **延迟累积**: 网络传输 + 两次重采样 + 注入延迟 +3. **VAD 冲突**: 阿福自带 VAD 可能与注入音频不匹配 +4. **回声消除失效**: 注入 mic 数据后,阿��的 AEC 参考信号(spkRef)对不上 +5. **对话控制**: 何时 open_voice / close_voice 需要与 ESP32 唤醒状态同步 + +--- + +## 依赖清单 + +### 方案A(新增依赖) +- `requests` — Python HTTP 库(服务端 venv 中应已有) + +### 方案B(新增��赖) +- `opuslib` 或 `pyogg` — Opus 编解码 +- `resampy` 或 `scipy.signal` — 采样率转换 +- `numpy` — 音频数据处理 + +--- + +## 文件清单 + +### 方案A +| 操�� | 文件 | +|------|------| +| 新增 | `backend/main/xiaozhi-server/core/providers/llm/antaf/__init__.py` | +| 新增 | `backend/main/xiaozhi-server/core/providers/llm/antaf/antaf.py` | +| 修改 | `backend/main/xiaozhi-server/data/.config.yaml` | + +### 方案B(额外) +| 操作 | 文件 | +|------|------| +| 修改 | `antaf/voice_bridge.js` (新增 inject 命令) | +| 新增 | `backend/main/xiaozhi-server/core/providers/asr/antaf_voice/antaf_voice.py` | +| 新增 | `backend/main/xiaozhi-server/core/utils/audio_resample.py` | + +--- + +## 里程碑 + +| 阶段 | 目标 | 预期产出 | +|------|------|---------| +| M1 | 方案A 代码实现 | AntafLLM Provider + 配置 | +| M2 | 网络打通 | PlugAI ↔ 手机 Bridge 连通 | +| M3 | 端到端测试 | ESP32 唤醒→阿福回答→语音播报 | +| M4 | 方案B 原型 | voice_bridge 音频注入验证 | +| M5 | 方案B 集成 | 全语音直通链路 | diff --git a/modules/tts/sherpa_tts.py b/modules/tts/sherpa_tts.py new file mode 100644 index 0000000..de0efc8 --- /dev/null +++ b/modules/tts/sherpa_tts.py @@ -0,0 +1,79 @@ +import io +import os +import wave +import asyncio +import numpy as np +import sherpa_onnx +from config.logger import setup_logging +from core.providers.tts.base import TTSProviderBase + +TAG = __name__ +logger = setup_logging() + + +class TTSProvider(TTSProviderBase): + def __init__(self, config, delete_audio_file): + super().__init__(config, delete_audio_file) + model_dir = config.get("model_dir", "models/vits-melo-tts-zh_en") + speed = config.get("speed", 1.0) + self.speed = float(speed) if speed else 1.0 + self.sid = int(config.get("sid", 0)) + + # 优先使用 int8 量化模型(更快) + model_file = f"{model_dir}/model.int8.onnx" + if not os.path.exists(model_file) or os.path.getsize(model_file) < 1024: + model_file = f"{model_dir}/model.onnx" + + num_threads = int(config.get("num_threads", 8)) + + tts_config = sherpa_onnx.OfflineTtsConfig( + model=sherpa_onnx.OfflineTtsModelConfig( + vits=sherpa_onnx.OfflineTtsVitsModelConfig( + model=model_file, + lexicon=f"{model_dir}/lexicon.txt", + tokens=f"{model_dir}/tokens.txt", + dict_dir=f"{model_dir}/dict", + ), + num_threads=num_threads, + ), + rule_fsts=f"{model_dir}/date.fst,{model_dir}/phone.fst,{model_dir}/number.fst,{model_dir}/new_heteronym.fst", + max_num_sentences=1, + ) + self.tts = sherpa_onnx.OfflineTts(tts_config) + self.sample_rate = self.tts.sample_rate + logger.bind(tag=TAG).info( + f"SherpaOnnxTTS 初始化完成: model_dir={model_dir}, sample_rate={self.sample_rate}, sid={self.sid}" + ) + + def _generate_wav(self, text): + """同步合成,在线程池中调用""" + from scipy.signal import resample_poly + from math import gcd + + audio = self.tts.generate(text, sid=self.sid, speed=self.speed) + samples = np.array(audio.samples, dtype=np.float32) + + # 重采样到目标采样率(设备要求 24000Hz,模型输出 44100Hz) + target_sr = 24000 + if self.sample_rate != target_sr: + g = gcd(self.sample_rate, target_sr) + samples = resample_poly(samples, target_sr // g, self.sample_rate // g) + + pcm = (samples * 32767).astype(np.int16) + + wav_io = io.BytesIO() + with wave.open(wav_io, "wb") as wf: + wf.setnchannels(1) + wf.setsampwidth(2) + wf.setframerate(target_sr) + wf.writeframes(pcm.tobytes()) + return wav_io.getvalue() + + async def text_to_speak(self, text, output_file): + wav_data = self._generate_wav(text) + + if output_file: + with open(output_file, "wb") as f: + f.write(wav_data) + else: + return wav_data