From 216f2fe6a025528fafc2066c651aa104a4bc995a Mon Sep 17 00:00:00 2001 From: hailin Date: Mon, 6 Apr 2026 05:05:24 -0700 Subject: [PATCH] feat: add voice relay (Plan B) - ESP32 audio passthrough to Antaf MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - voice_bridge_v7.js: audio injection support (type=3 frames) - relay.py: WebSocket↔TCP bridge with Opus↔PCM + resampling - test_inject.py: injection verification script - Injection verified: 1454 frames stable, no crash Co-Authored-By: Claude Opus 4.6 (1M context) --- antaf/test_inject.py | 130 +++++++++++++++++++ antaf/voice_bridge_v7.js | 209 ++++++++++++++++++++++++++++++ relay.py | 270 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 609 insertions(+) create mode 100644 antaf/test_inject.py create mode 100644 antaf/voice_bridge_v7.js create mode 100644 relay.py diff --git a/antaf/test_inject.py b/antaf/test_inject.py new file mode 100644 index 0000000..c56a031 --- /dev/null +++ b/antaf/test_inject.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python3 +"""Test voice_bridge_v7 audio injection. +Connect to voice_bridge, open voice chat, enable inject mode, +send silence frames, and print any speaker output received. + +Usage: python test_inject.py [host] [port] +""" +import socket +import struct +import json +import time +import sys +import threading + +HOST = sys.argv[1] if len(sys.argv) > 1 else "127.0.0.1" +PORT = int(sys.argv[2]) if len(sys.argv) > 2 else 18901 + +FRAME_SIZE = 960 # 960 bytes per frame (480 samples * 16bit) + + +def send_cmd(sock, cmd): + data = json.dumps(cmd).encode("utf-8") + header = struct.pack(">IB", len(data), 1) # type=1 text + sock.sendall(header + data) + + +def send_inject(sock, pcm_frame): + header = struct.pack(">IB", len(pcm_frame), 3) # type=3 inject + sock.sendall(header + pcm_frame) + + +def recv_exact(sock, n): + buf = b"" + while len(buf) < n: + chunk = sock.recv(n - len(buf)) + if not chunk: + return None + buf += chunk + return buf + + +def recv_frame(sock): + header = recv_exact(sock, 5) + if header is None: + return None, None + length = struct.unpack(">I", header[:4])[0] + ftype = header[4] + if length > 1048576: + return None, None + data = recv_exact(sock, length) + if data is None: + return None, None + return ftype, data + + +def receiver(sock): + """Background thread to print received frames.""" + spk_count = 0 + while True: + try: + ftype, data = recv_frame(sock) + if ftype is None: + print("[RECV] Connection closed") + break + if ftype == 1: # text/json + msg = json.loads(data.decode("utf-8")) + print(f"[RECV] {msg}") + elif ftype == 0: # speaker audio + spk_count += 1 + # Check if audio is non-silent + samples = struct.unpack(f"<{len(data)//2}h", data) + max_amp = max(abs(s) for s in samples) + if spk_count <= 5 or spk_count % 100 == 0 or max_amp > 500: + print(f"[SPK] frame={spk_count} size={len(data)} max_amp={max_amp}") + elif ftype == 2: # mic audio + pass # ignore mic echo + except Exception as e: + print(f"[RECV] Error: {e}") + break + + +def main(): + print(f"Connecting to {HOST}:{PORT}...") + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((HOST, PORT)) + print("Connected") + + # Start receiver thread + t = threading.Thread(target=receiver, args=(sock,), daemon=True) + t.start() + + time.sleep(1) + + # Open voice chat + print("Opening voice chat...") + send_cmd(sock, {"cmd": "open_voice"}) + time.sleep(3) + + # Start capture + print("Starting capture...") + send_cmd(sock, {"cmd": "start"}) + time.sleep(1) + + # Enable inject mode + print("Enabling inject mode...") + send_cmd(sock, {"cmd": "inject_on"}) + time.sleep(0.5) + + # Send silence frames for 3 seconds (48kHz, 960 bytes/frame = 20ms) + # 3 seconds = 150 frames + print("Sending 150 silence frames (3 seconds)...") + silence = b"\x00" * FRAME_SIZE + for i in range(150): + send_inject(sock, silence) + time.sleep(0.02) # 20ms per frame + + print("Done sending. Waiting for speaker output...") + time.sleep(10) + + # Stop + send_cmd(sock, {"cmd": "inject_off"}) + send_cmd(sock, {"cmd": "stop"}) + time.sleep(1) + + print("Test complete") + sock.close() + + +if __name__ == "__main__": + main() diff --git a/antaf/voice_bridge_v7.js b/antaf/voice_bridge_v7.js new file mode 100644 index 0000000..5cf95a5 --- /dev/null +++ b/antaf/voice_bridge_v7.js @@ -0,0 +1,209 @@ +// voice_bridge_v7.js — Voice Bridge with Audio Injection +// Hook point: libantaudio.so MFAntAudio3AV2Filter::process(micIn, spkRef, out, size, &result) +// TCP :18901 +// Frame: 4-byte len + 1-byte type + payload +// type 0: speaker/AI audio (spkRef, downstream to client) +// type 1: text/JSON command +// type 2: mic audio (micIn, downstream to client) +// type 3: inject audio (upstream from client, replaces micIn) + +var voiceActive = false; +var clientOS = null; +var capturedSpk = 0, capturedMic = 0, spkBytes = 0, micBytes = 0; +var injectMode = false; // true = replace mic with injected audio +var injectQueue = []; // queue of PCM frames to inject + +function wf(os, type, jArr) { + try { + var len = jArr.length; + var h = Java.array("byte", [(len>>24)&0xFF,(len>>16)&0xFF,(len>>8)&0xFF,len&0xFF, type]); + os.write(h); os.write(jArr); os.flush(); + } catch(e) {} +} +function wt(os, text) { + wf(os, 1, Java.use("java.lang.String").$new(text).getBytes("UTF-8")); +} + +// === Hook libantaudio.so === +var hooked = false; +function tryHook() { + if (hooked) return; + var m = Process.findModuleByName("libantaudio.so"); + if (!m) return; + var addr = m.findExportByName("_ZN8antaudio20MFAntAudio3AV2Filter7processEPhS1_S1_iRi"); + if (!addr) return; + hooked = true; + + Interceptor.attach(addr, { + onEnter: function(args) { + if (!voiceActive || !clientOS) return; + var size = args[4].toInt32(); + if (size <= 0) return; + + try { + // If inject mode, replace micIn with queued or silence + if (injectMode) { + if (injectQueue.length > 0) { + var frame = injectQueue.shift(); + // Only write if frame size matches expected size + if (frame.byteLength === size) { + args[1].writeByteArray(frame); + } else if (frame.byteLength > 0) { + // Size mismatch — pad or truncate + var buf = new ArrayBuffer(size); + var dst = new Uint8Array(buf); + var src = new Uint8Array(frame); + var copyLen = Math.min(size, frame.byteLength); + for (var k = 0; k < copyLen; k++) dst[k] = src[k]; + args[1].writeByteArray(buf); + } + } else { + // No data queued — inject silence to avoid mic leak + var silence = new ArrayBuffer(size); + args[1].writeByteArray(silence); + } + } + + // Always capture speaker/AI output (type 0) + var spkPcm = args[2].readByteArray(size); + var spkArr = Java.array("byte", Array.from(new Uint8Array(spkPcm))); + wf(clientOS, 0, spkArr); + capturedSpk++; spkBytes += size; + + // Capture mic (type 2) only when not injecting + if (!injectMode) { + var micPcm = args[1].readByteArray(size); + var micArr = Java.array("byte", Array.from(new Uint8Array(micPcm))); + wf(clientOS, 2, micArr); + } + capturedMic++; micBytes += size; + + if (capturedMic <= 3 || capturedMic % 500 === 0) + console.log("[VOICE] mic=" + capturedMic + " spk=" + capturedSpk + " inject=" + injectQueue.length); + } catch(e) {} + } + }); + console.log("[VOICE] 3AV2Filter.process hooked @ " + addr); +} + +[0, 1000, 3000, 5000, 10000, 15000, 20000].forEach(function(ms) { setTimeout(tryHook, ms); }); +try { + new ApiResolver("module").enumerateMatches("exports:linker*!*dlopen*").forEach(function(d) { + Interceptor.attach(d.address, { onLeave: function() { setTimeout(tryHook, 500); } }); + }); +} catch(e) {} + +// === TCP Server === +Java.perform(function() { + var SS = Java.use("java.net.ServerSocket"); + var JS = Java.use("java.lang.String"); + var server = SS.$new(18901); + console.log("[VOICE] Listening :18901"); + + function openVoice(os) { + Java.scheduleOnMainThread(function() { + try { + Java.choose("com.antgroup.aijk.android.ijklauncher.biz.activity.IJKActivity", { + onMatch: function(a) { + var fm = a.getSupportFragmentManager(); + var f = Java.use("com.antgroup.aijk.android.ijkchat.biz.voicechat.IjkVoiceChatFragment").$new(); + f.show(fm, "v"); + console.log("[VOICE] Opened"); + }, onComplete: function() {} + }); + setTimeout(function() { wt(os, JSON.stringify({event:"voice_opened"})); }, 2000); + } catch(e) { wt(os, JSON.stringify({event:"error",msg:""+e})); } + }); + } + + function closeVoice(os) { + Java.scheduleOnMainThread(function() { + try { + Java.choose("com.antgroup.aijk.android.ijkchat.biz.voicechat.IjkVoiceChatFragment", { + onMatch: function(f) { f.dismiss(); console.log("[VOICE] Closed"); }, + onComplete: function() {} + }); + setTimeout(function() { wt(os, JSON.stringify({event:"voice_closed"})); }, 1000); + } catch(e) { wt(os, JSON.stringify({event:"error",msg:""+e})); } + }); + } + + var Srv = Java.registerClass({ + name: "com.antaf.voice.S7", + implements: [Java.use("java.lang.Runnable")], + methods: { + run: function() { + while (true) { + try { + console.log("[VOICE] Waiting..."); + var c = server.accept(); + var is = c.getInputStream(); + var os = c.getOutputStream(); + clientOS = os; + console.log("[VOICE] Connected"); + wt(os, JSON.stringify({ + event:"connected", protocol:"antaf-voice-v8", + commands:["open_voice","close_voice","start","stop","status","inject_on","inject_off"], + audio:"pcm-16bit-960b-frames", + frameTypes:{0:"spk_ai",1:"text",2:"mic",3:"inject"} + })); + + while (true) { + var hb = []; + for (var i=0;i<5;i++) { var b=is.read(); if(b<0) throw "EOF"; hb.push(b); } + var fl=(hb[0]<<24)|(hb[1]<<16)|(hb[2]<<8)|hb[3], ft=hb[4]; + if (fl>1048576) break; + var pb = []; + for (var i=0;iI", header[:4])[0] + ftype = header[4] + data = await self.reader.readexactly(length) + return ftype, data + + def _send_frame(self, ftype, data): + header = struct.pack(">IB", len(data), ftype) + self.writer.write(header + data) + # Note: no await drain() here — voice frames are time-sensitive, + # TCP buffer handles backpressure + + def send_cmd(self, cmd): + self._send_frame(1, json.dumps(cmd).encode()) + + def send_inject(self, pcm_bytes): + self._send_frame(3, pcm_bytes) + + async def start_recv_loop(self): + """Background task: receive frames from bridge.""" + try: + while True: + ftype, data = await self._recv_frame() + if ftype == 0 and self.on_speaker_frame: + # Speaker audio + await self.on_speaker_frame(data) + elif ftype == 1: + msg = json.loads(data.decode()) + log.info(f"Bridge event: {msg}") + except asyncio.IncompleteReadError: + log.warning("Bridge connection closed") + except Exception as e: + log.error(f"Bridge recv error: {e}") + + async def setup_voice(self): + """Open voice chat, start capture, enable inject.""" + self.send_cmd({"cmd": "open_voice"}) + await asyncio.sleep(3) + self.send_cmd({"cmd": "start"}) + await asyncio.sleep(1) + self.send_cmd({"cmd": "inject_on"}) + await asyncio.sleep(0.5) + log.info("Voice bridge ready (inject mode)") + + async def close(self): + self.send_cmd({"cmd": "inject_off"}) + self.send_cmd({"cmd": "stop"}) + self.send_cmd({"cmd": "close_voice"}) + await asyncio.sleep(1) + if self.writer: + self.writer.close() + + +class Relay: + """Main relay: ESP32 WebSocket ↔ Antaf voice_bridge TCP.""" + + def __init__(self, ws_port, bridge_host, bridge_port): + self.ws_port = ws_port + self.bridge_host = bridge_host + self.bridge_port = bridge_port + self.bridge = None + self.ws = None + self.opus_decoder = None + self.opus_encoder = None + # Buffer for resampled PCM to split into bridge frames + self._inject_buf = np.array([], dtype=np.int16) + # Buffer for speaker PCM to accumulate before encoding + self._speaker_buf = np.array([], dtype=np.int16) + + async def handle_esp32(self, websocket): + """Handle one ESP32 WebSocket connection.""" + log.info(f"ESP32 connected from {websocket.remote_address}") + self.ws = websocket + + # Init Opus codec + self.opus_decoder = opuslib.Decoder(ESP_SAMPLE_RATE, 1) + self.opus_encoder = opuslib.Encoder(ESP_SAMPLE_RATE, 1, opuslib.APPLICATION_AUDIO) + + # Connect to voice bridge + self.bridge = BridgeClient(self.bridge_host, self.bridge_port) + await self.bridge.connect() + self.bridge.on_speaker_frame = self._on_speaker_frame + recv_task = asyncio.create_task(self.bridge.start_recv_loop()) + + # Setup voice chat + await self.bridge.setup_voice() + + try: + async for message in websocket: + if isinstance(message, str): + # Text message from ESP32 (hello, listen, etc.) + await self._handle_text(message) + elif isinstance(message, bytes): + # Opus audio from ESP32 + await self._handle_audio(message) + except websockets.exceptions.ConnectionClosed: + log.info("ESP32 disconnected") + finally: + recv_task.cancel() + await self.bridge.close() + self.ws = None + log.info("Session ended") + + async def _handle_text(self, message): + """Handle text messages from ESP32.""" + try: + msg = json.loads(message) + msg_type = msg.get("type") + + if msg_type == "hello": + # Respond with hello ack + resp = { + "type": "hello", + "session_id": "relay-session", + "transport": "websocket", + } + await self.ws.send(json.dumps(resp)) + log.info(f"ESP32 hello: {msg.get('audio_params')}") + + elif msg_type == "listen": + state = msg.get("state") + log.debug(f"ESP32 listen: {state}") + if state == "detect": + # Wake word detected — acknowledge + text = msg.get("text", "") + log.info(f"Wake word: {text}") + # Send TTS start to keep ESP32 happy + await self.ws.send(json.dumps({ + "type": "tts", "state": "start", + "session_id": msg.get("session_id", "") + })) + + elif msg_type == "abort": + log.info("ESP32 abort") + + except json.JSONDecodeError: + log.warning(f"Invalid JSON from ESP32: {message[:100]}") + + async def _handle_audio(self, opus_data): + """Decode Opus from ESP32, resample, inject into voice_bridge.""" + try: + # Decode Opus → PCM 16kHz mono + pcm = self.opus_decoder.decode(opus_data, ESP_FRAME_SIZE) + samples = np.frombuffer(pcm, dtype=np.int16) + + # Resample 16kHz → 48kHz + upsampled = resample_poly(samples, UP_RATIO[0], UP_RATIO[1]).astype(np.int16) + + # Append to inject buffer and send in bridge frame sizes + self._inject_buf = np.concatenate([self._inject_buf, upsampled]) + while len(self._inject_buf) >= BRIDGE_FRAME_SAMPLES: + frame = self._inject_buf[:BRIDGE_FRAME_SAMPLES] + self._inject_buf = self._inject_buf[BRIDGE_FRAME_SAMPLES:] + self.bridge.send_inject(frame.tobytes()) + + except Exception as e: + log.error(f"Audio inject error: {e}") + + async def _on_speaker_frame(self, pcm_bytes): + """Receive speaker PCM from bridge, resample, encode Opus, send to ESP32.""" + if not self.ws: + return + try: + samples = np.frombuffer(pcm_bytes, dtype=np.int16) + + # Resample 48kHz → 16kHz + downsampled = resample_poly(samples, DOWN_RATIO[0], DOWN_RATIO[1]).astype(np.int16) + + # Accumulate into speaker buffer, encode when we have enough + self._speaker_buf = np.concatenate([self._speaker_buf, downsampled]) + while len(self._speaker_buf) >= ESP_FRAME_SIZE: + frame = self._speaker_buf[:ESP_FRAME_SIZE] + self._speaker_buf = self._speaker_buf[ESP_FRAME_SIZE:] + # Encode PCM → Opus + opus_data = self.opus_encoder.encode(frame.tobytes(), ESP_FRAME_SIZE) + await self.ws.send(opus_data) + + except Exception as e: + log.error(f"Speaker send error: {e}") + + async def run(self): + log.info(f"Relay starting on ws://0.0.0.0:{self.ws_port}/xiaozhi/v1/") + async with websockets.serve( + self.handle_esp32, "0.0.0.0", self.ws_port, + ping_interval=30, ping_timeout=10, + ): + await asyncio.Future() # run forever + + +def main(): + parser = argparse.ArgumentParser(description="ESP32-Antaf Voice Relay") + parser.add_argument("--ws-port", type=int, default=8010, help="WebSocket port for ESP32") + parser.add_argument("--bridge-host", default="127.0.0.1", help="voice_bridge host") + parser.add_argument("--bridge-port", type=int, default=18901, help="voice_bridge port") + args = parser.parse_args() + + relay = Relay(args.ws_port, args.bridge_host, args.bridge_port) + asyncio.run(relay.run()) + + +if __name__ == "__main__": + main()