#!/usr/bin/env python3 """Test voice_bridge_v7 audio injection. Connect to voice_bridge, open voice chat, enable inject mode, send silence frames, and print any speaker output received. Usage: python test_inject.py [host] [port] """ import socket import struct import json import time import sys import threading HOST = sys.argv[1] if len(sys.argv) > 1 else "127.0.0.1" PORT = int(sys.argv[2]) if len(sys.argv) > 2 else 18901 FRAME_SIZE = 960 # 960 bytes per frame (480 samples * 16bit) def send_cmd(sock, cmd): data = json.dumps(cmd).encode("utf-8") header = struct.pack(">IB", len(data), 1) # type=1 text sock.sendall(header + data) def send_inject(sock, pcm_frame): header = struct.pack(">IB", len(pcm_frame), 3) # type=3 inject sock.sendall(header + pcm_frame) def recv_exact(sock, n): buf = b"" while len(buf) < n: chunk = sock.recv(n - len(buf)) if not chunk: return None buf += chunk return buf def recv_frame(sock): header = recv_exact(sock, 5) if header is None: return None, None length = struct.unpack(">I", header[:4])[0] ftype = header[4] if length > 1048576: return None, None data = recv_exact(sock, length) if data is None: return None, None return ftype, data def receiver(sock): """Background thread to print received frames.""" spk_count = 0 while True: try: ftype, data = recv_frame(sock) if ftype is None: print("[RECV] Connection closed") break if ftype == 1: # text/json msg = json.loads(data.decode("utf-8")) print(f"[RECV] {msg}") elif ftype == 0: # speaker audio spk_count += 1 # Check if audio is non-silent samples = struct.unpack(f"<{len(data)//2}h", data) max_amp = max(abs(s) for s in samples) if spk_count <= 5 or spk_count % 100 == 0 or max_amp > 500: print(f"[SPK] frame={spk_count} size={len(data)} max_amp={max_amp}") elif ftype == 2: # mic audio pass # ignore mic echo except Exception as e: print(f"[RECV] Error: {e}") break def main(): print(f"Connecting to {HOST}:{PORT}...") sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((HOST, PORT)) print("Connected") # Start receiver thread t = threading.Thread(target=receiver, args=(sock,), daemon=True) t.start() time.sleep(1) # Open voice chat print("Opening voice chat...") send_cmd(sock, {"cmd": "open_voice"}) time.sleep(3) # Start capture print("Starting capture...") send_cmd(sock, {"cmd": "start"}) time.sleep(1) # Enable inject mode print("Enabling inject mode...") send_cmd(sock, {"cmd": "inject_on"}) time.sleep(0.5) # Send silence frames for 3 seconds (48kHz, 960 bytes/frame = 20ms) # 3 seconds = 150 frames print("Sending 150 silence frames (3 seconds)...") silence = b"\x00" * FRAME_SIZE for i in range(150): send_inject(sock, silence) time.sleep(0.02) # 20ms per frame print("Done sending. Waiting for speaker output...") time.sleep(10) # Stop send_cmd(sock, {"cmd": "inject_off"}) send_cmd(sock, {"cmd": "stop"}) time.sleep(1) print("Test complete") sock.close() if __name__ == "__main__": main()