taixf/modules/antaf/antaf_passthrough.py

252 lines
9.7 KiB
Python

"""
Antaf Voice Passthrough ASR Provider
Replaces ASR→LLM→TTS pipeline with direct audio forwarding to Antaf voice_bridge.
ESP32 audio → decode Opus → resample 16kHz→48kHz → inject to voice_bridge (type=3)
voice_bridge speaker (type=0) → resample 48kHz→16kHz → encode Opus → send to ESP32
Runs within xiaozhi-server, keeping all protocol handling (hello, OTA, wake word) intact.
"""
import json
import struct
import asyncio
import threading
import numpy as np
import opuslib_next
from scipy.signal import resample_poly
from math import gcd
from config.logger import setup_logging
from core.providers.asr.base import ASRProviderBase
from core.handle.sendAudioHandle import send_tts_message
TAG = __name__
logger = setup_logging()
# Audio parameters
ESP_SR = 16000
ESP_FRAME_SAMPLES = 960 # 60ms at 16kHz
BRIDGE_SR = 48000
BRIDGE_FRAME_SAMPLES = 480 # 960 bytes / 2 = 480 samples
# Resample ratios
UP = (BRIDGE_SR // gcd(BRIDGE_SR, ESP_SR), ESP_SR // gcd(BRIDGE_SR, ESP_SR)) # (3,1)
DOWN = (ESP_SR // gcd(ESP_SR, BRIDGE_SR), BRIDGE_SR // gcd(ESP_SR, BRIDGE_SR)) # (1,3)
class ASRProvider(ASRProviderBase):
def __init__(self, config):
super().__init__()
self.bridge_host = config.get("bridge_host", "127.0.0.1")
self.bridge_port = int(config.get("bridge_port", 18901))
self.interface_type = "NON_STREAM"
self.conn = None
self.bridge_reader = None
self.bridge_writer = None
self.opus_decoder = None
self.opus_encoder = None
self._inject_buf = np.array([], dtype=np.int16)
self._speaker_buf = np.array([], dtype=np.int16)
self._tts_started = False
self._recv_task = None
self._connected = False
logger.bind(tag=TAG).info(
f"AntafPassthrough 初始化: bridge={self.bridge_host}:{self.bridge_port}"
)
async def open_audio_channels(self, conn):
"""Override: connect to bridge, start passthrough instead of normal ASR."""
# Clean up previous connection if any
await self.close()
self.conn = conn
self.opus_decoder = opuslib_next.Decoder(ESP_SR, 1)
self.opus_encoder = opuslib_next.Encoder(ESP_SR, 1, opuslib_next.APPLICATION_AUDIO)
self._tts_started = False
self._silence_count = 0
self._inject_buf = np.array([], dtype=np.int16)
self._speaker_buf = np.array([], dtype=np.int16)
self._write_lock = threading.Lock()
# Connect to voice_bridge
try:
self.bridge_reader, self.bridge_writer = await asyncio.open_connection(
self.bridge_host, self.bridge_port
)
# Read connected event
ftype, data = await self._bridge_recv()
if ftype == 1:
msg = json.loads(data.decode())
logger.bind(tag=TAG).info(f"Bridge connected: {msg.get('protocol')}")
# Send start + inject_on
self._bridge_send_cmd({"cmd": "start"})
ftype, data = await self._bridge_recv()
if ftype == 1:
logger.bind(tag=TAG).info(f"Bridge: {json.loads(data.decode())}")
self._bridge_send_cmd({"cmd": "inject_on"})
ftype, data = await self._bridge_recv()
if ftype == 1:
logger.bind(tag=TAG).info(f"Bridge: {json.loads(data.decode())}")
self._connected = True
logger.bind(tag=TAG).info("Voice bridge ready (inject mode)")
# Start speaker receive loop
self._recv_task = asyncio.create_task(self._speaker_recv_loop())
except Exception as e:
logger.bind(tag=TAG).error(f"Bridge connection failed: {e}")
self._connected = False
# Start normal audio processing thread (reads from asr_audio_queue)
conn.asr_priority_thread = threading.Thread(
target=self._audio_thread, args=(conn,), daemon=True
)
conn.asr_priority_thread.start()
def _audio_thread(self, conn):
"""Read Opus frames from queue, decode, resample, inject to bridge."""
import queue as queue_module
frame_count = 0
while not conn.stop_event.is_set():
try:
opus_data = conn.asr_audio_queue.get(timeout=1)
if not self._connected:
continue
frame_count += 1
if frame_count <= 3 or frame_count % 200 == 0:
logger.bind(tag=TAG).debug(f"Audio frame #{frame_count}")
# Decode Opus → PCM 16kHz
pcm = self.opus_decoder.decode(opus_data, ESP_FRAME_SAMPLES)
samples = np.frombuffer(pcm, dtype=np.int16)
# Resample 16kHz → 48kHz
upsampled = resample_poly(samples, UP[0], UP[1]).astype(np.int16)
# Split into bridge frames and inject
self._inject_buf = np.concatenate([self._inject_buf, upsampled])
while len(self._inject_buf) >= BRIDGE_FRAME_SAMPLES:
frame = self._inject_buf[:BRIDGE_FRAME_SAMPLES]
self._inject_buf = self._inject_buf[BRIDGE_FRAME_SAMPLES:]
self._bridge_send_inject(frame.tobytes())
except queue_module.Empty:
continue
except Exception as e:
logger.bind(tag=TAG).error(f"Audio thread error: {e}")
async def _speaker_recv_loop(self):
"""Receive speaker PCM from bridge, resample, encode Opus, send to ESP32."""
try:
while self._connected:
ftype, data = await self._bridge_recv()
if ftype == 0:
# Speaker audio
await self._handle_speaker(data)
elif ftype == 1:
msg = json.loads(data.decode())
logger.bind(tag=TAG).debug(f"Bridge event: {msg}")
except asyncio.IncompleteReadError:
logger.bind(tag=TAG).warning("Bridge connection closed")
except Exception as e:
logger.bind(tag=TAG).error(f"Speaker recv error: {e}")
finally:
self._connected = False
async def _handle_speaker(self, pcm_bytes):
"""Process speaker frame and send to ESP32."""
if not self.conn or not self.conn.websocket:
return
samples = np.frombuffer(pcm_bytes, dtype=np.int16)
max_amp = int(np.max(np.abs(samples)))
# Track silence for tts stop
if max_amp < 10:
if self._tts_started:
self._silence_count += 1
# 50 frames of silence (~1 second) → send tts stop
if self._silence_count > 50:
try:
await send_tts_message(self.conn, "stop")
self.conn.client_is_speaking = False
self._tts_started = False
self._silence_count = 0
logger.bind(tag=TAG).info("Sent tts stop to ESP32")
except Exception as e:
logger.bind(tag=TAG).error(f"Send tts stop error: {e}")
return
# Reset silence counter on non-silent frame
self._silence_count = 0
# Send tts start before first audio
if not self._tts_started:
try:
await send_tts_message(self.conn, "start")
self._tts_started = True
self.conn.client_is_speaking = True
logger.bind(tag=TAG).info("Sent tts start to ESP32")
except Exception as e:
logger.bind(tag=TAG).error(f"Send tts start error: {e}")
return
# Resample 48kHz → 16kHz
downsampled = resample_poly(samples, DOWN[0], DOWN[1]).astype(np.int16)
# Accumulate and encode
self._speaker_buf = np.concatenate([self._speaker_buf, downsampled])
while len(self._speaker_buf) >= ESP_FRAME_SAMPLES:
frame = self._speaker_buf[:ESP_FRAME_SAMPLES]
self._speaker_buf = self._speaker_buf[ESP_FRAME_SAMPLES:]
opus_data = self.opus_encoder.encode(frame.tobytes(), ESP_FRAME_SAMPLES)
try:
await self.conn.websocket.send(opus_data)
except Exception as e:
logger.bind(tag=TAG).error(f"Send opus to ESP32 error: {e}")
return
# Bridge TCP helpers
async def _bridge_recv(self):
header = await self.bridge_reader.readexactly(5)
length = struct.unpack(">I", header[:4])[0]
ftype = header[4]
data = await self.bridge_reader.readexactly(length)
return ftype, data
def _bridge_send_cmd(self, cmd):
data = json.dumps(cmd).encode()
header = struct.pack(">IB", len(data), 1)
with self._write_lock:
self.bridge_writer.write(header + data)
def _bridge_send_inject(self, pcm_bytes):
header = struct.pack(">IB", len(pcm_bytes), 3)
with self._write_lock:
self.bridge_writer.write(header + pcm_bytes)
# ASR interface — never returns text, LLM/TTS never triggered
async def receive_audio(self, conn, audio, audio_have_voice):
"""No-op: audio is handled by _audio_thread directly from queue."""
pass
async def speech_to_text(self, opus_data, session_id, audio_format="opus", artifacts=None):
"""Never called in passthrough mode."""
return "", None
async def close(self):
self._connected = False
if self._recv_task:
self._recv_task.cancel()
if self.bridge_writer:
try:
self._bridge_send_cmd({"cmd": "inject_off"})
self._bridge_send_cmd({"cmd": "stop"})
self.bridge_writer.close()
except Exception:
pass