#!/usr/bin/env python3 """ ESP32 ↔ Antaf Voice Relay Bridges ESP32 (WebSocket/Opus) with Antaf voice_bridge (TCP/PCM). ESP32 → Opus decode → resample 16kHz→48kHz → voice_bridge inject (type=3) ESP32 ← Opus encode ← resample 48kHz→16kHz ← voice_bridge speaker (type=0) Usage: python relay.py [--ws-port 8010] [--bridge-host 127.0.0.1] [--bridge-port 18901] """ import asyncio import json import struct import argparse import logging import numpy as np from scipy.signal import resample_poly from math import gcd logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") log = logging.getLogger("relay") try: import opuslib_next as opuslib except ImportError: import opuslib import websockets # Audio parameters ESP_SAMPLE_RATE = 16000 # ESP32 Opus sample rate ESP_FRAME_MS = 60 # ESP32 frame duration ESP_FRAME_SIZE = ESP_SAMPLE_RATE * ESP_FRAME_MS // 1000 # 960 samples BRIDGE_SAMPLE_RATE = 48000 # voice_bridge micIn sample rate BRIDGE_FRAME_BYTES = 960 # 480 samples * 2 bytes BRIDGE_FRAME_SAMPLES = 480 # Resampling ratios UP_GCD = gcd(BRIDGE_SAMPLE_RATE, ESP_SAMPLE_RATE) # 16000 → 48000 UP_RATIO = (BRIDGE_SAMPLE_RATE // UP_GCD, ESP_SAMPLE_RATE // UP_GCD) # (3, 1) DOWN_GCD = gcd(ESP_SAMPLE_RATE, BRIDGE_SAMPLE_RATE) # 48000 → 16000 DOWN_RATIO = (ESP_SAMPLE_RATE // DOWN_GCD, BRIDGE_SAMPLE_RATE // DOWN_GCD) # (1, 3) class BridgeClient: """TCP client for voice_bridge_v7.""" def __init__(self, host, port): self.host = host self.port = port self.reader = None self.writer = None self.on_speaker_frame = None # callback(pcm_bytes) self._recv_task = None async def connect(self): self.reader, self.writer = await asyncio.open_connection(self.host, self.port) log.info(f"Connected to voice_bridge {self.host}:{self.port}") # Read connected event ftype, data = await self._recv_frame() if ftype == 1: msg = json.loads(data.decode()) log.info(f"Bridge: {msg.get('protocol')}") async def _recv_frame(self): header = await self.reader.readexactly(5) length = struct.unpack(">I", header[:4])[0] ftype = header[4] data = await self.reader.readexactly(length) return ftype, data def _send_frame(self, ftype, data): header = struct.pack(">IB", len(data), ftype) self.writer.write(header + data) # Note: no await drain() here — voice frames are time-sensitive, # TCP buffer handles backpressure def send_cmd(self, cmd): self._send_frame(1, json.dumps(cmd).encode()) def send_inject(self, pcm_bytes): self._send_frame(3, pcm_bytes) async def start_recv_loop(self): """Background task: receive frames from bridge.""" try: while True: ftype, data = await self._recv_frame() if ftype == 0 and self.on_speaker_frame: # Speaker audio await self.on_speaker_frame(data) elif ftype == 1: msg = json.loads(data.decode()) log.info(f"Bridge event: {msg}") except asyncio.IncompleteReadError: log.warning("Bridge connection closed") except Exception as e: log.error(f"Bridge recv error: {e}") async def setup_voice(self): """Start capture and enable inject. Voice chat must already be open.""" self.send_cmd({"cmd": "start"}) ftype, data = await self._recv_frame() if ftype == 1: msg = json.loads(data.decode()) log.info(f"Bridge: {msg}") self.send_cmd({"cmd": "inject_on"}) ftype, data = await self._recv_frame() if ftype == 1: msg = json.loads(data.decode()) log.info(f"Bridge: {msg}") log.info("Voice bridge ready (inject mode)") async def close(self): try: self.send_cmd({"cmd": "inject_off"}) self.send_cmd({"cmd": "stop"}) except Exception: pass if self.writer: self.writer.close() class Relay: """Main relay: ESP32 WebSocket ↔ Antaf voice_bridge TCP.""" def __init__(self, ws_port, bridge_host, bridge_port): self.ws_port = ws_port self.bridge_host = bridge_host self.bridge_port = bridge_port self.bridge = None self.ws = None self.opus_decoder = None self.opus_encoder = None # Buffer for resampled PCM to split into bridge frames self._inject_buf = np.array([], dtype=np.int16) # Buffer for speaker PCM to accumulate before encoding self._speaker_buf = np.array([], dtype=np.int16) self._audio_in_count = 0 self._audio_out_count = 0 self._tts_started = False # track if we sent tts start to ESP32 async def handle_esp32(self, websocket): """Handle one ESP32 WebSocket connection.""" log.info(f"ESP32 connected from {websocket.remote_address}") self.ws = websocket # Init Opus codec self.opus_decoder = opuslib.Decoder(ESP_SAMPLE_RATE, 1) self.opus_encoder = opuslib.Encoder(ESP_SAMPLE_RATE, 1, opuslib.APPLICATION_AUDIO) # Connect to voice bridge and setup voice chat first self.bridge = BridgeClient(self.bridge_host, self.bridge_port) await self.bridge.connect() await self.bridge.setup_voice() # Now start receiving speaker audio self.bridge.on_speaker_frame = self._on_speaker_frame recv_task = asyncio.create_task(self.bridge.start_recv_loop()) try: async for message in websocket: if isinstance(message, str): # Text message from ESP32 (hello, listen, etc.) await self._handle_text(message) elif isinstance(message, bytes): # Opus audio from ESP32 await self._handle_audio(message) except websockets.exceptions.ConnectionClosed: log.info("ESP32 disconnected") finally: recv_task.cancel() await self.bridge.close() self.ws = None log.info("Session ended") async def _handle_text(self, message): """Handle text messages from ESP32.""" try: msg = json.loads(message) msg_type = msg.get("type") if msg_type == "hello": # Respond with proper hello — must match xiaozhi protocol resp = { "type": "hello", "version": 1, "transport": "websocket", "session_id": "relay-session", "audio_params": { "format": "opus", "sample_rate": ESP_SAMPLE_RATE, "channels": 1, "frame_duration": ESP_FRAME_MS, }, } await self.ws.send(json.dumps(resp)) log.info(f"ESP32 hello: {msg.get('audio_params')}") elif msg_type == "listen": state = msg.get("state") log.info(f"ESP32 listen: {state}") if state == "detect": text = msg.get("text", "") log.info(f"Wake word: {text}") # Don't send tts start — let ESP32 continue recording elif msg_type == "abort": log.info("ESP32 abort") except json.JSONDecodeError: log.warning(f"Invalid JSON from ESP32: {message[:100]}") async def _handle_audio(self, opus_data): """Decode Opus from ESP32, resample, inject into voice_bridge.""" try: self._audio_in_count += 1 if self._audio_in_count <= 3 or self._audio_in_count % 100 == 0: log.info(f"ESP32 audio frame #{self._audio_in_count}, size={len(opus_data)}") # Decode Opus → PCM 16kHz mono pcm = self.opus_decoder.decode(opus_data, ESP_FRAME_SIZE) samples = np.frombuffer(pcm, dtype=np.int16) # Resample 16kHz → 48kHz upsampled = resample_poly(samples, UP_RATIO[0], UP_RATIO[1]).astype(np.int16) # Append to inject buffer and send in bridge frame sizes self._inject_buf = np.concatenate([self._inject_buf, upsampled]) while len(self._inject_buf) >= BRIDGE_FRAME_SAMPLES: frame = self._inject_buf[:BRIDGE_FRAME_SAMPLES] self._inject_buf = self._inject_buf[BRIDGE_FRAME_SAMPLES:] self.bridge.send_inject(frame.tobytes()) except Exception as e: log.error(f"Audio inject error: {e}") async def _on_speaker_frame(self, pcm_bytes): """Receive speaker PCM from bridge, resample, encode Opus, send to ESP32.""" if not self.ws or getattr(self.ws, 'closed', False): return try: self._audio_out_count += 1 samples = np.frombuffer(pcm_bytes, dtype=np.int16) max_amp = int(np.max(np.abs(samples))) if self._audio_out_count <= 3 or self._audio_out_count % 100 == 0: log.info(f"Speaker frame #{self._audio_out_count}, size={len(pcm_bytes)}, max_amp={max_amp}") # Only send non-silent frames to ESP32 if max_amp < 10: # If we were playing and now silent for a while, send tts stop if self._tts_started and self._audio_out_count % 50 == 0: # Check later — don't stop immediately, silence gaps are normal pass return # Send tts start before first audio frame if not self._tts_started: await self.ws.send(json.dumps({ "type": "tts", "state": "start", "session_id": "relay-session" })) self._tts_started = True log.info("Sent tts start to ESP32") # Resample 48kHz → 16kHz downsampled = resample_poly(samples, DOWN_RATIO[0], DOWN_RATIO[1]).astype(np.int16) # Accumulate into speaker buffer, encode when we have enough self._speaker_buf = np.concatenate([self._speaker_buf, downsampled]) while len(self._speaker_buf) >= ESP_FRAME_SIZE: frame = self._speaker_buf[:ESP_FRAME_SIZE] self._speaker_buf = self._speaker_buf[ESP_FRAME_SIZE:] # Encode PCM → Opus opus_data = self.opus_encoder.encode(frame.tobytes(), ESP_FRAME_SIZE) await self.ws.send(opus_data) except Exception as e: log.error(f"Speaker send error: {e}") async def run(self): log.info(f"Relay starting on ws://0.0.0.0:{self.ws_port}/xiaozhi/v1/") async with websockets.serve( self.handle_esp32, "0.0.0.0", self.ws_port, ping_interval=30, ping_timeout=10, ): await asyncio.Future() # run forever def main(): parser = argparse.ArgumentParser(description="ESP32-Antaf Voice Relay") parser.add_argument("--ws-port", type=int, default=8010, help="WebSocket port for ESP32") parser.add_argument("--bridge-host", default="127.0.0.1", help="voice_bridge host") parser.add_argument("--bridge-port", type=int, default=18901, help="voice_bridge port") args = parser.parse_args() relay = Relay(args.ws_port, args.bridge_host, args.bridge_port) asyncio.run(relay.run()) if __name__ == "__main__": main()