feat: add temporary TTS test page at /api/v1/test/tts

Browser-accessible page to test text-to-speech synthesis without
going through the full voice pipeline.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-02-24 05:06:02 -08:00
parent 740f8f5f88
commit 0aa20cbc73
2 changed files with 112 additions and 0 deletions

View File

@ -10,6 +10,7 @@ from ..config.settings import settings
from .health import router as health_router
from .session_router import router as session_router
from .twilio_webhook import router as twilio_router
from .test_tts import router as test_tts_router
logger = logging.getLogger(__name__)
@ -29,6 +30,7 @@ app.add_middleware(
app.include_router(health_router, tags=["health"])
app.include_router(session_router, prefix="/api/v1/voice", tags=["sessions"])
app.include_router(twilio_router, prefix="/api/v1/twilio", tags=["twilio"])
app.include_router(test_tts_router, prefix="/api/v1/test", tags=["test"])
# ---------------------------------------------------------------------------

View File

@ -0,0 +1,110 @@
"""Temporary TTS test endpoint — hit /api/v1/test/tts in a browser to verify."""
import asyncio
import io
import struct
import numpy as np
from fastapi import APIRouter, Request, Query
from fastapi.responses import HTMLResponse, Response
router = APIRouter()
_SAMPLE_RATE = 24000 # Kokoro native output rate
def _make_wav(pcm_bytes: bytes, sample_rate: int = 16000) -> bytes:
"""Wrap raw 16-bit PCM into a WAV container."""
buf = io.BytesIO()
num_samples = len(pcm_bytes) // 2
data_size = num_samples * 2
# WAV header
buf.write(b"RIFF")
buf.write(struct.pack("<I", 36 + data_size))
buf.write(b"WAVE")
buf.write(b"fmt ")
buf.write(struct.pack("<I", 16)) # chunk size
buf.write(struct.pack("<H", 1)) # PCM
buf.write(struct.pack("<H", 1)) # mono
buf.write(struct.pack("<I", sample_rate)) # sample rate
buf.write(struct.pack("<I", sample_rate * 2)) # byte rate
buf.write(struct.pack("<H", 2)) # block align
buf.write(struct.pack("<H", 16)) # bits per sample
buf.write(b"data")
buf.write(struct.pack("<I", data_size))
buf.write(pcm_bytes)
return buf.getvalue()
@router.get("/tts", response_class=HTMLResponse)
async def tts_test_page():
"""Simple HTML page to test TTS."""
return """<!DOCTYPE html>
<html><head><meta charset="utf-8"><title>TTS Test</title>
<style>
body { font-family: sans-serif; max-width: 600px; margin: 40px auto; padding: 0 20px; }
textarea { width: 100%; height: 80px; font-size: 16px; }
button { font-size: 18px; padding: 10px 30px; margin-top: 10px; cursor: pointer; }
#status { margin-top: 15px; color: #666; }
audio { margin-top: 15px; width: 100%; }
</style></head>
<body>
<h2>TTS Test</h2>
<textarea id="text" placeholder="输入要合成的文本...">你好我是IT0运维助手很高兴为您服务</textarea>
<br><button onclick="doTTS()">合成语音</button>
<div id="status"></div>
<audio id="player" controls style="display:none"></audio>
<script>
async function doTTS() {
const text = document.getElementById('text').value.trim();
if (!text) return;
const status = document.getElementById('status');
const player = document.getElementById('player');
status.textContent = '合成中...';
player.style.display = 'none';
const t0 = Date.now();
try {
const resp = await fetch('/api/v1/test/tts/synthesize?text=' + encodeURIComponent(text));
if (!resp.ok) { status.textContent = 'Error: ' + resp.status; return; }
const blob = await resp.blob();
const ms = Date.now() - t0;
status.textContent = '完成!耗时 ' + ms + 'ms大小 ' + (blob.size/1024).toFixed(1) + 'KB';
player.src = URL.createObjectURL(blob);
player.style.display = 'block';
player.play();
} catch(e) { status.textContent = 'Error: ' + e.message; }
}
</script>
</body></html>"""
@router.get("/tts/synthesize")
async def tts_synthesize(request: Request, text: str = Query(..., min_length=1, max_length=500)):
"""Synthesize text to WAV audio."""
tts = getattr(request.app.state, "tts", None)
if tts is None or tts._pipeline is None:
return Response(content="TTS model not loaded", status_code=503)
# Run TTS in thread pool (CPU-bound)
loop = asyncio.get_event_loop()
def _synth():
samples = []
for _, _, audio in tts._pipeline(text, voice=tts.voice):
if hasattr(audio, "numpy"):
samples.append(audio.numpy())
else:
samples.append(audio)
if not samples:
return b""
audio_np = np.concatenate(samples)
# Resample 24kHz → 16kHz
target_len = int(len(audio_np) / _SAMPLE_RATE * 16000)
indices = np.linspace(0, len(audio_np) - 1, target_len)
resampled = np.interp(indices, np.arange(len(audio_np)), audio_np)
pcm = (resampled * 32768).clip(-32768, 32767).astype(np.int16).tobytes()
return _make_wav(pcm, 16000)
wav_bytes = await loop.run_in_executor(None, _synth)
if not wav_bytes:
return Response(content="TTS produced no audio", status_code=500)
return Response(content=wav_bytes, media_type="audio/wav")