feat: add detailed logging to voice pipeline (STT/Agent/TTS timing)

Log timestamps, content, and event details at each pipeline stage
to help diagnose voice-agent integration issues.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-02-24 02:47:21 -08:00
parent 1d5c834dfe
commit 0dbe711ed3
1 changed files with 53 additions and 17 deletions

View File

@ -173,19 +173,20 @@ class VoicePipelineTask:
"""Transcribe speech, generate LLM response, synthesize and send TTS."""
session_id = self.session_context.get("session_id", "?")
audio_secs = len(audio_data) / (_SAMPLE_RATE * _BYTES_PER_SAMPLE)
print(f"[pipeline] Processing speech: {audio_secs:.1f}s of audio", flush=True)
print(f"[pipeline] ===== Processing speech: {audio_secs:.1f}s of audio, session={session_id} =====", flush=True)
# 1. STT
t0 = time.time()
text = await self._transcribe(audio_data)
stt_ms = int((time.time() - t0) * 1000)
if not text or not text.strip():
print(f"[pipeline] STT returned empty text, skipping", flush=True)
print(f"[pipeline] STT returned empty text (took {stt_ms}ms), skipping", flush=True)
return
print(f"[pipeline] User said: {text.strip()}", flush=True)
print(f"[pipeline] [STT] ({stt_ms}ms) User said: \"{text.strip()}\"", flush=True)
# Notify client that we heard them
try:
import json
await self.websocket.send_text(
json.dumps({"type": "transcript", "text": text.strip(), "role": "user"})
)
@ -193,12 +194,16 @@ class VoicePipelineTask:
pass
# 2. Agent service — create task + subscribe for response
print(f"[pipeline] [AGENT] Sending to agent: \"{text.strip()}\"", flush=True)
t1 = time.time()
response_text = await self._agent_generate(text.strip())
agent_ms = int((time.time() - t1) * 1000)
if not response_text:
print(f"[pipeline] Agent returned empty response", flush=True)
print(f"[pipeline] [AGENT] ({agent_ms}ms) Agent returned empty response!", flush=True)
return
print(f"[pipeline] Agent says: {response_text[:100]}", flush=True)
print(f"[pipeline] [AGENT] ({agent_ms}ms) Agent response ({len(response_text)} chars): \"{response_text[:200]}\"", flush=True)
# Notify client of the response text
try:
@ -209,7 +214,12 @@ class VoicePipelineTask:
pass
# 3. TTS → send audio back
print(f"[pipeline] [TTS] Synthesizing {len(response_text)} chars...", flush=True)
t2 = time.time()
await self._synthesize_and_send(response_text)
tts_ms = int((time.time() - t2) * 1000)
print(f"[pipeline] [TTS] ({tts_ms}ms) Audio sent to client", flush=True)
print(f"[pipeline] ===== Turn complete: STT={stt_ms}ms + Agent={agent_ms}ms + TTS={tts_ms}ms = {stt_ms+agent_ms+tts_ms}ms =====", flush=True)
async def _transcribe(self, audio_data: bytes) -> str:
"""Transcribe audio using STT service."""
@ -240,11 +250,14 @@ class VoicePipelineTask:
try:
collected_text = []
event_count = 0
timeout_secs = 120 # Max wait for agent response
print(f"[pipeline] [AGENT] Connecting WS: {ws_url}", flush=True)
async with websockets.connect(ws_url) as ws:
print(f"[pipeline] [AGENT] WS connected", flush=True)
# 1. Subscribe FIRST (before creating task to avoid missing events)
# Use existing session ID for subscription; if none, we'll re-subscribe after task creation
pre_session_id = self._agent_session_id or ""
if pre_session_id:
@ -253,29 +266,30 @@ class VoicePipelineTask:
"data": {"sessionId": pre_session_id},
})
await ws.send(subscribe_msg)
print(f"[pipeline] Pre-subscribed to agent WS session={pre_session_id}", flush=True)
print(f"[pipeline] [AGENT] Pre-subscribed session={pre_session_id}", flush=True)
# 2. Create agent task
body = {"prompt": user_text}
if self._agent_session_id:
body["sessionId"] = self._agent_session_id
print(f"[pipeline] Creating agent task: {user_text[:60]}", flush=True)
print(f"[pipeline] [AGENT] POST /tasks prompt=\"{user_text[:80]}\"", flush=True)
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.post(
f"{agent_url}/api/v1/agent/tasks",
json=body,
headers=headers,
)
print(f"[pipeline] [AGENT] POST response: {resp.status_code}", flush=True)
if resp.status_code != 200 and resp.status_code != 201:
print(f"[pipeline] Agent task creation failed: {resp.status_code} {resp.text}", flush=True)
print(f"[pipeline] [AGENT] Task creation FAILED: {resp.status_code} {resp.text[:200]}", flush=True)
return "抱歉Agent服务暂时不可用。"
data = resp.json()
session_id = data.get("sessionId", "")
task_id = data.get("taskId", "")
self._agent_session_id = session_id
print(f"[pipeline] Agent task created: session={session_id}, task={task_id}", flush=True)
print(f"[pipeline] [AGENT] Task created: session={session_id}, task={task_id}", flush=True)
# 3. Subscribe with actual session/task IDs (covers first-time case)
subscribe_msg = json.dumps({
@ -283,26 +297,35 @@ class VoicePipelineTask:
"data": {"sessionId": session_id, "taskId": task_id},
})
await ws.send(subscribe_msg)
print(f"[pipeline] Subscribed to agent WS session={session_id}, task={task_id}", flush=True)
print(f"[pipeline] [AGENT] Subscribed session={session_id}, task={task_id}", flush=True)
# 4. Collect events until completed
deadline = time.time() + timeout_secs
while time.time() < deadline:
remaining = deadline - time.time()
try:
raw = await asyncio.wait_for(ws.recv(), timeout=5.0)
raw = await asyncio.wait_for(ws.recv(), timeout=min(5.0, remaining))
except asyncio.TimeoutError:
if time.time() >= deadline:
print(f"[pipeline] [AGENT] TIMEOUT after {timeout_secs}s waiting for events", flush=True)
continue
except Exception:
except Exception as ws_err:
print(f"[pipeline] [AGENT] WS recv error: {type(ws_err).__name__}: {ws_err}", flush=True)
break
try:
msg = json.loads(raw)
except (json.JSONDecodeError, TypeError):
print(f"[pipeline] [AGENT] Non-JSON WS message: {str(raw)[:100]}", flush=True)
continue
event_type = msg.get("event", "")
event_count += 1
if event_type == "stream_event":
if event_type == "subscribed":
print(f"[pipeline] [AGENT] Subscription confirmed: {msg.get('data', {})}", flush=True)
elif event_type == "stream_event":
evt_data = msg.get("data", {})
evt_type = evt_data.get("type", "")
payload = evt_data.get("data", {})
@ -311,21 +334,34 @@ class VoicePipelineTask:
content = payload.get("content", "")
if content:
collected_text.append(content)
# Log first and periodic text events
if len(collected_text) <= 3 or len(collected_text) % 10 == 0:
total_len = sum(len(t) for t in collected_text)
print(f"[pipeline] [AGENT] Text event #{len(collected_text)}: +{len(content)} chars (total: {total_len})", flush=True)
elif evt_type == "completed":
summary = payload.get("summary", "")
if summary and not collected_text:
collected_text.append(summary)
print(f"[pipeline] Agent completed", flush=True)
print(f"[pipeline] [AGENT] Using summary as response: \"{summary[:100]}\"", flush=True)
total_chars = sum(len(t) for t in collected_text)
print(f"[pipeline] [AGENT] Completed! {len(collected_text)} text events, {total_chars} chars total, {event_count} WS events received", flush=True)
break
elif evt_type == "error":
err_msg = payload.get("message", "Unknown error")
print(f"[pipeline] Agent error: {err_msg}", flush=True)
print(f"[pipeline] [AGENT] ERROR event: {err_msg}", flush=True)
return f"Agent 错误: {err_msg}"
else:
print(f"[pipeline] [AGENT] Stream event type={evt_type}", flush=True)
else:
print(f"[pipeline] [AGENT] WS event: {event_type}", flush=True)
result = "".join(collected_text).strip()
if not result:
print(f"[pipeline] [AGENT] WARNING: No text collected after {event_count} events!", flush=True)
return "Agent 未返回回复。"
return result