taixf/antaf/test_inject.py

131 lines
3.4 KiB
Python

#!/usr/bin/env python3
"""Test voice_bridge_v7 audio injection.
Connect to voice_bridge, open voice chat, enable inject mode,
send silence frames, and print any speaker output received.
Usage: python test_inject.py [host] [port]
"""
import socket
import struct
import json
import time
import sys
import threading
HOST = sys.argv[1] if len(sys.argv) > 1 else "127.0.0.1"
PORT = int(sys.argv[2]) if len(sys.argv) > 2 else 18901
FRAME_SIZE = 960 # 960 bytes per frame (480 samples * 16bit)
def send_cmd(sock, cmd):
data = json.dumps(cmd).encode("utf-8")
header = struct.pack(">IB", len(data), 1) # type=1 text
sock.sendall(header + data)
def send_inject(sock, pcm_frame):
header = struct.pack(">IB", len(pcm_frame), 3) # type=3 inject
sock.sendall(header + pcm_frame)
def recv_exact(sock, n):
buf = b""
while len(buf) < n:
chunk = sock.recv(n - len(buf))
if not chunk:
return None
buf += chunk
return buf
def recv_frame(sock):
header = recv_exact(sock, 5)
if header is None:
return None, None
length = struct.unpack(">I", header[:4])[0]
ftype = header[4]
if length > 1048576:
return None, None
data = recv_exact(sock, length)
if data is None:
return None, None
return ftype, data
def receiver(sock):
"""Background thread to print received frames."""
spk_count = 0
while True:
try:
ftype, data = recv_frame(sock)
if ftype is None:
print("[RECV] Connection closed")
break
if ftype == 1: # text/json
msg = json.loads(data.decode("utf-8"))
print(f"[RECV] {msg}")
elif ftype == 0: # speaker audio
spk_count += 1
# Check if audio is non-silent
samples = struct.unpack(f"<{len(data)//2}h", data)
max_amp = max(abs(s) for s in samples)
if spk_count <= 5 or spk_count % 100 == 0 or max_amp > 500:
print(f"[SPK] frame={spk_count} size={len(data)} max_amp={max_amp}")
elif ftype == 2: # mic audio
pass # ignore mic echo
except Exception as e:
print(f"[RECV] Error: {e}")
break
def main():
print(f"Connecting to {HOST}:{PORT}...")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((HOST, PORT))
print("Connected")
# Start receiver thread
t = threading.Thread(target=receiver, args=(sock,), daemon=True)
t.start()
time.sleep(1)
# Open voice chat
print("Opening voice chat...")
send_cmd(sock, {"cmd": "open_voice"})
time.sleep(3)
# Start capture
print("Starting capture...")
send_cmd(sock, {"cmd": "start"})
time.sleep(1)
# Enable inject mode
print("Enabling inject mode...")
send_cmd(sock, {"cmd": "inject_on"})
time.sleep(0.5)
# Send silence frames for 3 seconds (48kHz, 960 bytes/frame = 20ms)
# 3 seconds = 150 frames
print("Sending 150 silence frames (3 seconds)...")
silence = b"\x00" * FRAME_SIZE
for i in range(150):
send_inject(sock, silence)
time.sleep(0.02) # 20ms per frame
print("Done sending. Waiting for speaker output...")
time.sleep(10)
# Stop
send_cmd(sock, {"cmd": "inject_off"})
send_cmd(sock, {"cmd": "stop"})
time.sleep(1)
print("Test complete")
sock.close()
if __name__ == "__main__":
main()