From 0dbe711ed3bc0a62f1dc61f076b37556d9dfb163 Mon Sep 17 00:00:00 2001 From: hailin Date: Tue, 24 Feb 2026 02:47:21 -0800 Subject: [PATCH] feat: add detailed logging to voice pipeline (STT/Agent/TTS timing) Log timestamps, content, and event details at each pipeline stage to help diagnose voice-agent integration issues. Co-Authored-By: Claude Opus 4.6 --- .../src/pipeline/base_pipeline.py | 70 ++++++++++++++----- 1 file changed, 53 insertions(+), 17 deletions(-) diff --git a/packages/services/voice-service/src/pipeline/base_pipeline.py b/packages/services/voice-service/src/pipeline/base_pipeline.py index 720b8f0..a8e9144 100644 --- a/packages/services/voice-service/src/pipeline/base_pipeline.py +++ b/packages/services/voice-service/src/pipeline/base_pipeline.py @@ -173,19 +173,20 @@ class VoicePipelineTask: """Transcribe speech, generate LLM response, synthesize and send TTS.""" session_id = self.session_context.get("session_id", "?") audio_secs = len(audio_data) / (_SAMPLE_RATE * _BYTES_PER_SAMPLE) - print(f"[pipeline] Processing speech: {audio_secs:.1f}s of audio", flush=True) + print(f"[pipeline] ===== Processing speech: {audio_secs:.1f}s of audio, session={session_id} =====", flush=True) # 1. STT + t0 = time.time() text = await self._transcribe(audio_data) + stt_ms = int((time.time() - t0) * 1000) if not text or not text.strip(): - print(f"[pipeline] STT returned empty text, skipping", flush=True) + print(f"[pipeline] STT returned empty text (took {stt_ms}ms), skipping", flush=True) return - print(f"[pipeline] User said: {text.strip()}", flush=True) + print(f"[pipeline] [STT] ({stt_ms}ms) User said: \"{text.strip()}\"", flush=True) # Notify client that we heard them try: - import json await self.websocket.send_text( json.dumps({"type": "transcript", "text": text.strip(), "role": "user"}) ) @@ -193,12 +194,16 @@ class VoicePipelineTask: pass # 2. Agent service — create task + subscribe for response + print(f"[pipeline] [AGENT] Sending to agent: \"{text.strip()}\"", flush=True) + t1 = time.time() response_text = await self._agent_generate(text.strip()) + agent_ms = int((time.time() - t1) * 1000) + if not response_text: - print(f"[pipeline] Agent returned empty response", flush=True) + print(f"[pipeline] [AGENT] ({agent_ms}ms) Agent returned empty response!", flush=True) return - print(f"[pipeline] Agent says: {response_text[:100]}", flush=True) + print(f"[pipeline] [AGENT] ({agent_ms}ms) Agent response ({len(response_text)} chars): \"{response_text[:200]}\"", flush=True) # Notify client of the response text try: @@ -209,7 +214,12 @@ class VoicePipelineTask: pass # 3. TTS → send audio back + print(f"[pipeline] [TTS] Synthesizing {len(response_text)} chars...", flush=True) + t2 = time.time() await self._synthesize_and_send(response_text) + tts_ms = int((time.time() - t2) * 1000) + print(f"[pipeline] [TTS] ({tts_ms}ms) Audio sent to client", flush=True) + print(f"[pipeline] ===== Turn complete: STT={stt_ms}ms + Agent={agent_ms}ms + TTS={tts_ms}ms = {stt_ms+agent_ms+tts_ms}ms =====", flush=True) async def _transcribe(self, audio_data: bytes) -> str: """Transcribe audio using STT service.""" @@ -240,11 +250,14 @@ class VoicePipelineTask: try: collected_text = [] + event_count = 0 timeout_secs = 120 # Max wait for agent response + print(f"[pipeline] [AGENT] Connecting WS: {ws_url}", flush=True) async with websockets.connect(ws_url) as ws: + print(f"[pipeline] [AGENT] WS connected", flush=True) + # 1. Subscribe FIRST (before creating task to avoid missing events) - # Use existing session ID for subscription; if none, we'll re-subscribe after task creation pre_session_id = self._agent_session_id or "" if pre_session_id: @@ -253,29 +266,30 @@ class VoicePipelineTask: "data": {"sessionId": pre_session_id}, }) await ws.send(subscribe_msg) - print(f"[pipeline] Pre-subscribed to agent WS session={pre_session_id}", flush=True) + print(f"[pipeline] [AGENT] Pre-subscribed session={pre_session_id}", flush=True) # 2. Create agent task body = {"prompt": user_text} if self._agent_session_id: body["sessionId"] = self._agent_session_id - print(f"[pipeline] Creating agent task: {user_text[:60]}", flush=True) + print(f"[pipeline] [AGENT] POST /tasks prompt=\"{user_text[:80]}\"", flush=True) async with httpx.AsyncClient(timeout=30) as client: resp = await client.post( f"{agent_url}/api/v1/agent/tasks", json=body, headers=headers, ) + print(f"[pipeline] [AGENT] POST response: {resp.status_code}", flush=True) if resp.status_code != 200 and resp.status_code != 201: - print(f"[pipeline] Agent task creation failed: {resp.status_code} {resp.text}", flush=True) + print(f"[pipeline] [AGENT] Task creation FAILED: {resp.status_code} {resp.text[:200]}", flush=True) return "抱歉,Agent服务暂时不可用。" data = resp.json() session_id = data.get("sessionId", "") task_id = data.get("taskId", "") self._agent_session_id = session_id - print(f"[pipeline] Agent task created: session={session_id}, task={task_id}", flush=True) + print(f"[pipeline] [AGENT] Task created: session={session_id}, task={task_id}", flush=True) # 3. Subscribe with actual session/task IDs (covers first-time case) subscribe_msg = json.dumps({ @@ -283,26 +297,35 @@ class VoicePipelineTask: "data": {"sessionId": session_id, "taskId": task_id}, }) await ws.send(subscribe_msg) - print(f"[pipeline] Subscribed to agent WS session={session_id}, task={task_id}", flush=True) + print(f"[pipeline] [AGENT] Subscribed session={session_id}, task={task_id}", flush=True) # 4. Collect events until completed deadline = time.time() + timeout_secs while time.time() < deadline: + remaining = deadline - time.time() try: - raw = await asyncio.wait_for(ws.recv(), timeout=5.0) + raw = await asyncio.wait_for(ws.recv(), timeout=min(5.0, remaining)) except asyncio.TimeoutError: + if time.time() >= deadline: + print(f"[pipeline] [AGENT] TIMEOUT after {timeout_secs}s waiting for events", flush=True) continue - except Exception: + except Exception as ws_err: + print(f"[pipeline] [AGENT] WS recv error: {type(ws_err).__name__}: {ws_err}", flush=True) break try: msg = json.loads(raw) except (json.JSONDecodeError, TypeError): + print(f"[pipeline] [AGENT] Non-JSON WS message: {str(raw)[:100]}", flush=True) continue event_type = msg.get("event", "") + event_count += 1 - if event_type == "stream_event": + if event_type == "subscribed": + print(f"[pipeline] [AGENT] Subscription confirmed: {msg.get('data', {})}", flush=True) + + elif event_type == "stream_event": evt_data = msg.get("data", {}) evt_type = evt_data.get("type", "") payload = evt_data.get("data", {}) @@ -311,21 +334,34 @@ class VoicePipelineTask: content = payload.get("content", "") if content: collected_text.append(content) + # Log first and periodic text events + if len(collected_text) <= 3 or len(collected_text) % 10 == 0: + total_len = sum(len(t) for t in collected_text) + print(f"[pipeline] [AGENT] Text event #{len(collected_text)}: +{len(content)} chars (total: {total_len})", flush=True) elif evt_type == "completed": summary = payload.get("summary", "") if summary and not collected_text: collected_text.append(summary) - print(f"[pipeline] Agent completed", flush=True) + print(f"[pipeline] [AGENT] Using summary as response: \"{summary[:100]}\"", flush=True) + total_chars = sum(len(t) for t in collected_text) + print(f"[pipeline] [AGENT] Completed! {len(collected_text)} text events, {total_chars} chars total, {event_count} WS events received", flush=True) break elif evt_type == "error": err_msg = payload.get("message", "Unknown error") - print(f"[pipeline] Agent error: {err_msg}", flush=True) + print(f"[pipeline] [AGENT] ERROR event: {err_msg}", flush=True) return f"Agent 错误: {err_msg}" + else: + print(f"[pipeline] [AGENT] Stream event type={evt_type}", flush=True) + + else: + print(f"[pipeline] [AGENT] WS event: {event_type}", flush=True) + result = "".join(collected_text).strip() if not result: + print(f"[pipeline] [AGENT] WARNING: No text collected after {event_count} events!", flush=True) return "Agent 未返回回复。" return result