From 5d4fd96d43fb57e6efc9d4fa5401e334ec541d5a Mon Sep 17 00:00:00 2001 From: hailin Date: Tue, 24 Feb 2026 05:30:11 -0800 Subject: [PATCH] feat: streaming claude-api engine, engineType override, fix voice test page - Claude API engine now uses streaming API (messages.stream) for real-time text delta output instead of waiting for full response - Agent controller accepts optional engineType body parameter to allow callers (e.g. voice pipeline) to select a specific engine - Fix voice_test_page.dart compilation error: replace audioplayers (not installed) with flutter_sound (already in pubspec.yaml) Co-Authored-By: Claude Opus 4.6 --- .../presentation/pages/voice_test_page.dart | 32 +++++++++-- .../engines/claude-api/claude-api-engine.ts | 56 +++++++++++++------ .../rest/controllers/agent.controller.ts | 7 ++- 3 files changed, 70 insertions(+), 25 deletions(-) diff --git a/it0_app/lib/features/agent_call/presentation/pages/voice_test_page.dart b/it0_app/lib/features/agent_call/presentation/pages/voice_test_page.dart index 739f07e..3458d0a 100644 --- a/it0_app/lib/features/agent_call/presentation/pages/voice_test_page.dart +++ b/it0_app/lib/features/agent_call/presentation/pages/voice_test_page.dart @@ -4,7 +4,7 @@ import 'package:flutter/material.dart'; import 'package:flutter_riverpod/flutter_riverpod.dart'; import 'package:dio/dio.dart'; import 'package:record/record.dart'; -import 'package:audioplayers/audioplayers.dart'; +import 'package:flutter_sound/flutter_sound.dart'; import '../../../../core/network/dio_client.dart'; /// Temporary voice I/O test page — TTS + STT + Round-trip. @@ -19,8 +19,9 @@ class _VoiceTestPageState extends ConsumerState { final _ttsController = TextEditingController( text: '你好,我是IT0运维助手。很高兴为您服务!', ); - final _audioPlayer = AudioPlayer(); + final _audioPlayer = FlutterSoundPlayer(); final _recorder = AudioRecorder(); + bool _playerInitialized = false; String _ttsStatus = ''; String _sttStatus = ''; @@ -46,10 +47,21 @@ class _VoiceTestPageState extends ConsumerState { /// Dio for JSON responses (STT). Dio get _dioJson => ref.read(dioClientProvider); + @override + void initState() { + super.initState(); + _initPlayer(); + } + + Future _initPlayer() async { + await _audioPlayer.openPlayer(); + _playerInitialized = true; + } + @override void dispose() { _ttsController.dispose(); - _audioPlayer.dispose(); + if (_playerInitialized) _audioPlayer.closePlayer(); _recorder.dispose(); super.dispose(); } @@ -73,7 +85,7 @@ class _VoiceTestPageState extends ConsumerState { setState(() { _ttsStatus = '完成!耗时 ${sw.elapsedMilliseconds}ms,大小 ${(bytes.length / 1024).toStringAsFixed(1)}KB'; }); - await _audioPlayer.play(BytesSource(Uint8List.fromList(bytes))); + await _playWavBytes(Uint8List.fromList(bytes)); } catch (e) { sw.stop(); setState(() => _ttsStatus = '错误: $e'); @@ -191,7 +203,7 @@ class _VoiceTestPageState extends ConsumerState { _rtResult += '\nTTS (${ttsSw.elapsedMilliseconds}ms): ${(audioBytes.length / 1024).toStringAsFixed(1)}KB'; _rtStatus = '完成!STT=${sttSw.elapsedMilliseconds}ms + TTS=${ttsSw.elapsedMilliseconds}ms = ${totalSw.elapsedMilliseconds}ms'; }); - await _audioPlayer.play(BytesSource(Uint8List.fromList(audioBytes))); + await _playWavBytes(Uint8List.fromList(audioBytes)); } catch (e) { totalSw.stop(); setState(() { @@ -200,6 +212,16 @@ class _VoiceTestPageState extends ConsumerState { } } + /// Play WAV bytes through flutter_sound player. + Future _playWavBytes(Uint8List wavBytes) async { + if (!_playerInitialized) return; + await _audioPlayer.startPlayer( + fromDataBuffer: wavBytes, + codec: Codec.pcm16WAV, + whenFinished: () {}, + ); + } + @override Widget build(BuildContext context) { return Scaffold( diff --git a/packages/services/agent-service/src/infrastructure/engines/claude-api/claude-api-engine.ts b/packages/services/agent-service/src/infrastructure/engines/claude-api/claude-api-engine.ts index 7ad284f..f25d095 100644 --- a/packages/services/agent-service/src/infrastructure/engines/claude-api/claude-api-engine.ts +++ b/packages/services/agent-service/src/infrastructure/engines/claude-api/claude-api-engine.ts @@ -95,31 +95,52 @@ export class ClaudeApiEngine implements AgentEnginePort { requestParams.tools = tools; } - const response = await client.messages.create(requestParams, { + // Use streaming API for token-level output + const stream = client.messages.stream(requestParams, { signal: abortController.signal as any, }); + const contentBlocks: AnthropicContentBlock[] = []; + const toolUseBlocks: Array<{ id: string; name: string; input: Record }> = []; + + // Stream text deltas in real-time + for await (const event of stream) { + if (event.type === 'content_block_delta') { + const delta = (event as any).delta; + if (delta?.type === 'text_delta' && delta.text) { + yield { type: 'text' as const, content: delta.text }; + } else if (delta?.type === 'thinking_delta' && delta.thinking) { + yield { type: 'thinking' as const, content: delta.thinking }; + } + } else if (event.type === 'content_block_start') { + const block = (event as any).content_block; + if (block?.type === 'tool_use') { + contentBlocks.push(block); + } + } else if (event.type === 'content_block_stop') { + // Tool use blocks are fully accumulated at stop + } + } + + // Get final message for tool use and usage + const response = await stream.finalMessage(); + // Track token usage if (response.usage) { totalTokensUsed += (response.usage.input_tokens ?? 0) + (response.usage.output_tokens ?? 0); } - // Process content blocks from the response - const contentBlocks = response.content as AnthropicContentBlock[]; - const toolUseBlocks: Array<{ id: string; name: string; input: Record }> = []; - - for (const block of contentBlocks) { - if (block.type === 'text' && block.text) { - yield { type: 'text', content: block.text }; - } else if (block.type === 'tool_use') { + // Collect tool_use blocks from final response + for (const block of response.content) { + if (block.type === 'tool_use') { yield { - type: 'tool_use', - toolName: block.name!, + type: 'tool_use' as const, + toolName: block.name, input: (block.input as Record) ?? {}, }; toolUseBlocks.push({ - id: block.id!, - name: block.name!, + id: block.id, + name: block.name, input: (block.input as Record) ?? {}, }); } @@ -127,14 +148,13 @@ export class ClaudeApiEngine implements AgentEnginePort { // Check stop reason if (response.stop_reason === 'end_turn' || toolUseBlocks.length === 0) { - // Extract final text as summary - const summaryBlock = contentBlocks.find( - (b) => b.type === 'text' && b.text, + const summaryBlock = response.content.find( + (b: any) => b.type === 'text' && b.text, ); - const summary = summaryBlock?.text ?? 'Task completed'; + const summary = (summaryBlock as any)?.text ?? 'Task completed'; yield { - type: 'completed', + type: 'completed' as const, summary, tokensUsed: totalTokensUsed, }; diff --git a/packages/services/agent-service/src/interfaces/rest/controllers/agent.controller.ts b/packages/services/agent-service/src/interfaces/rest/controllers/agent.controller.ts index d91361c..9e78e20 100644 --- a/packages/services/agent-service/src/interfaces/rest/controllers/agent.controller.ts +++ b/packages/services/agent-service/src/interfaces/rest/controllers/agent.controller.ts @@ -25,9 +25,12 @@ export class AgentController { @Post('tasks') async executeTask( @TenantId() tenantId: string, - @Body() body: { prompt: string; systemPrompt?: string; maxTurns?: number; allowedTools?: string[] }, + @Body() body: { prompt: string; systemPrompt?: string; maxTurns?: number; allowedTools?: string[]; engineType?: string }, ) { - const engine = this.engineRegistry.getActiveEngine(); + // Allow callers to override the engine (e.g. voice uses claude_api for streaming) + const engine = body.engineType + ? this.engineRegistry.switchEngine(body.engineType as AgentEngineType) + : this.engineRegistry.getActiveEngine(); const session = new AgentSession(); session.id = crypto.randomUUID();