# 09 - 工具并发执行系统 (Tool Execution System) ## 1. 设计哲学 借鉴 Claude Code 的 `ToolExecutionQueue`(Q80 class)设计: > **并发安全的工具并行执行,串行约束的工具顺序执行。** 当 Coordinator Agent 在一次回复中返回多个 `tool_use` blocks 时,系统需要智能地决定哪些工具可以并行执行、哪些必须串行等待。这直接影响用户体验(响应延迟)和系统正确性(数据一致性)。 核心规则: 1. **读操作并行**:`search_knowledge`、`invoke_*`(Agent 调用)等纯读操作可以并行 2. **写操作串行**:`save_user_memory`、`generate_payment` 等有副作用的操作必须串行 3. **结果有序**:无论执行顺序如何,结果按 Claude 返回的 `tool_use` 顺序排列 4. **错误隔离**:一个工具失败不影响其他工具的执行 ## 2. 并发安全性声明 ### 2.1 工具并发安全性矩阵 | 工具名 | 类型 | isConcurrencySafe | 原因 | |--------|------|-------------------|------| | `invoke_policy_expert` | Agent 调用 | `true` | 纯读,独立 API 调用 | | `invoke_assessment_expert` | Agent 调用 | `true` | 纯读,独立 API 调用 | | `invoke_strategist` | Agent 调用 | `true` | 纯读,独立 API 调用 | | `invoke_objection_handler` | Agent 调用 | `true` | 纯读,独立 API 调用 | | `invoke_case_analyst` | Agent 调用 | `true` | 纯读,独立 API 调用 | | `invoke_memory_manager` | Agent 调用 | `false` | **可能写入记忆** | | `search_knowledge` | 直接工具 | `true` | 纯读 | | `get_user_context` | 直接工具 | `true` | 纯读 | | `save_user_memory` | 直接工具 | `false` | **写操作** | | `generate_payment` | 直接工具 | `false` | **产生支付订单,有副作用** | | `get_current_datetime` | 直接工具 | `true` | 纯计算 | | `web_search` | 直接工具 | `true` | 纯读,外部 API | | `get_exchange_rate` | 直接工具 | `true` | 纯读 | | `check_off_topic` | 直接工具 | `true` | 纯读 | ### 2.2 并发安全判定规则 ```typescript /** * 判断工具是否并发安全的规则: * * 1. Agent 调用类 (invoke_*): * - 默认并发安全(各 Agent 独立运行) * - 例外: invoke_memory_manager(因为它可能调用 save_user_memory) * * 2. 读操作类: * - search_*, get_*, check_* → 并发安全 * * 3. 写操作类: * - save_*, update_*, delete_* → 非并发安全 * * 4. 副作用类: * - generate_payment, send_notification → 非并发安全 */ ``` ## 3. TypeScript 类型定义 ```typescript // tool-execution.types.ts import Anthropic from '@anthropic-ai/sdk'; /** * 工具定义(扩展 Claude 的 Tool 类型) */ export interface ToolDefinition { /** Claude API 工具定义 */ tool: Anthropic.Tool; /** 是否可以与其他工具并发执行 */ isConcurrencySafe: boolean; /** 工具类型:agent 调用 vs 直接工具 */ toolType: 'agent_invocation' | 'direct_tool'; /** 执行超时时间 (ms) */ timeoutMs: number; /** 执行器函数 */ executor: (input: Record) => Promise; /** 预估耗时 (ms),用于前端显示和排序 */ estimatedDurationMs: number; } /** * 工具执行输出 */ export interface ToolExecutionOutput { /** 返回给 Claude 的文本结果 */ content: string; /** 执行是否成功 */ success: boolean; /** 错误信息(如果失败) */ error?: string; /** Agent 执行元数据(仅 agent_invocation 类型) */ agentMetadata?: { agentName: string; turnsUsed: number; tokensUsed: { input: number; output: number }; toolCallsMade: string[]; durationMs: number; }; } /** * 工具执行请求(从 Claude 返回的 tool_use block) */ export interface ToolExecutionRequest { /** Claude 分配的 tool_use ID */ toolUseId: string; /** 工具名称 */ toolName: string; /** 工具输入参数 */ input: Record; /** 在 tool_use 数组中的原始位置(用于结果排序) */ originalIndex: number; } /** * 工具执行结果 */ export interface ToolExecutionResult { /** Claude 分配的 tool_use ID */ toolUseId: string; /** 工具名称 */ toolName: string; /** 原始位置 */ originalIndex: number; /** 执行输出 */ output: ToolExecutionOutput; /** 执行耗时 (ms) */ durationMs: number; /** 开始时间 */ startedAt: number; /** 结束时间 */ completedAt: number; } /** * 队列状态事件(用于 stream 通知前端) */ export type QueueEvent = | { type: 'tool_queued'; toolName: string; toolUseId: string; position: number } | { type: 'tool_started'; toolName: string; toolUseId: string } | { type: 'tool_completed'; toolName: string; toolUseId: string; durationMs: number; success: boolean } | { type: 'tool_error'; toolName: string; toolUseId: string; error: string } | { type: 'agent_started'; agentName: string; toolUseId: string } | { type: 'agent_progress'; agentName: string; toolUseId: string; turn: number; maxTurns: number } | { type: 'agent_completed'; agentName: string; toolUseId: string; durationMs: number } | { type: 'queue_drained' }; ``` ## 4. ToolExecutionQueue 完整实现 ```typescript // tool-execution-queue.ts import { Injectable, Logger } from '@nestjs/common'; import { ToolDefinition, ToolExecutionRequest, ToolExecutionResult, ToolExecutionOutput, QueueEvent, } from './tool-execution.types'; /** * 工具并发执行队列 * * 核心算法: * 1. 接收一组 tool_use requests * 2. 按原始顺序遍历 * 3. 将连续的 concurrency-safe 工具收集到一个 "并发批次" * 4. 遇到非 safe 工具时: * a. 先执行(await)当前并发批次 * b. 再单独执行非 safe 工具 * 5. 继续处理剩余工具 * 6. 最终按 originalIndex 排序返回所有结果 */ @Injectable() export class ToolExecutionQueue { private readonly logger = new Logger(ToolExecutionQueue.name); /** 已注册的工具定义 */ private toolRegistry = new Map(); /** 事件回调(用于 stream 通知) */ private eventCallback?: (event: QueueEvent) => void; /** * 注册工具定义 */ registerTool(name: string, definition: ToolDefinition): void { this.toolRegistry.set(name, definition); this.logger.log(`Registered tool: ${name} (concurrencySafe: ${definition.isConcurrencySafe})`); } /** * 批量注册工具 */ registerTools(tools: Record): void { for (const [name, def] of Object.entries(tools)) { this.registerTool(name, def); } } /** * 设置事件回调(用于实时通知前端) */ onEvent(callback: (event: QueueEvent) => void): void { this.eventCallback = callback; } /** * 执行所有工具 * * 这是主入口方法。接收 Claude 返回的所有 tool_use blocks, * 按并发安全性分组执行,返回有序的结果。 * * @param requests - 从 Claude 响应中提取的 tool_use 请求列表 * @returns 按 originalIndex 排序的执行结果 */ async executeAll(requests: ToolExecutionRequest[]): Promise { if (requests.length === 0) return []; this.logger.log(`Executing ${requests.length} tools: ${requests.map(r => r.toolName).join(', ')}`); const allResults: ToolExecutionResult[] = []; // ======== 核心算法:分批执行 ======== let currentBatch: ToolExecutionRequest[] = []; for (const request of requests) { const definition = this.toolRegistry.get(request.toolName); if (!definition) { // 未注册的工具 → 返回错误结果 allResults.push(this.createErrorResult(request, `Unknown tool: ${request.toolName}`)); continue; } this.emitEvent({ type: 'tool_queued', toolName: request.toolName, toolUseId: request.toolUseId, position: request.originalIndex }); if (definition.isConcurrencySafe) { // 并发安全 → 加入当前批次 currentBatch.push(request); } else { // 非并发安全 → 先执行当前批次,再单独执行此工具 // 1. 执行当前并发批次 if (currentBatch.length > 0) { const batchResults = await this.executeConcurrentBatch(currentBatch); allResults.push(...batchResults); currentBatch = []; } // 2. 单独执行非安全工具(串行) const result = await this.executeSingleTool(request, definition); allResults.push(result); } } // 执行最后一个并发批次(如果有) if (currentBatch.length > 0) { const batchResults = await this.executeConcurrentBatch(currentBatch); allResults.push(...batchResults); } // 按 originalIndex 排序 allResults.sort((a, b) => a.originalIndex - b.originalIndex); this.emitEvent({ type: 'queue_drained' }); this.logger.log(`All ${allResults.length} tools completed`); return allResults; } /** * 并行执行一批并发安全的工具 */ private async executeConcurrentBatch(batch: ToolExecutionRequest[]): Promise { if (batch.length === 0) return []; if (batch.length === 1) { // 单个工具不需要 Promise.all const def = this.toolRegistry.get(batch[0].toolName)!; return [await this.executeSingleTool(batch[0], def)]; } this.logger.log(`Executing concurrent batch: ${batch.map(r => r.toolName).join(', ')}`); // 并行执行所有工具 const promises = batch.map(request => { const definition = this.toolRegistry.get(request.toolName)!; return this.executeSingleTool(request, definition); }); // Promise.allSettled 确保一个失败不影响其他 const settled = await Promise.allSettled(promises); return settled.map((result, index) => { if (result.status === 'fulfilled') { return result.value; } else { return this.createErrorResult(batch[index], `Execution failed: ${result.reason}`); } }); } /** * 执行单个工具 */ private async executeSingleTool( request: ToolExecutionRequest, definition: ToolDefinition, ): Promise { const startedAt = Date.now(); // 通知:Agent 调用开始 if (definition.toolType === 'agent_invocation') { const agentName = request.toolName.replace('invoke_', ''); this.emitEvent({ type: 'agent_started', agentName, toolUseId: request.toolUseId }); } this.emitEvent({ type: 'tool_started', toolName: request.toolName, toolUseId: request.toolUseId }); try { // 带超时的执行 const output = await this.executeWithTimeout( definition.executor(request.input), definition.timeoutMs, request.toolName, ); const completedAt = Date.now(); const durationMs = completedAt - startedAt; // 通知:完成 this.emitEvent({ type: 'tool_completed', toolName: request.toolName, toolUseId: request.toolUseId, durationMs, success: output.success, }); if (definition.toolType === 'agent_invocation') { const agentName = request.toolName.replace('invoke_', ''); this.emitEvent({ type: 'agent_completed', agentName, toolUseId: request.toolUseId, durationMs }); } this.logger.log(`Tool ${request.toolName} completed in ${durationMs}ms (success: ${output.success})`); return { toolUseId: request.toolUseId, toolName: request.toolName, originalIndex: request.originalIndex, output, durationMs, startedAt, completedAt, }; } catch (error) { const completedAt = Date.now(); const durationMs = completedAt - startedAt; const errorMessage = error instanceof Error ? error.message : String(error); this.logger.error(`Tool ${request.toolName} failed after ${durationMs}ms: ${errorMessage}`); this.emitEvent({ type: 'tool_error', toolName: request.toolName, toolUseId: request.toolUseId, error: errorMessage, }); return { toolUseId: request.toolUseId, toolName: request.toolName, originalIndex: request.originalIndex, output: { content: `工具执行失败: ${errorMessage}。请重试或使用其他方式获取信息。`, success: false, error: errorMessage, }, durationMs, startedAt, completedAt, }; } } /** * 带超时的执行 */ private async executeWithTimeout( promise: Promise, timeoutMs: number, toolName: string, ): Promise { return new Promise((resolve, reject) => { const timer = setTimeout(() => { reject(new Error(`Tool ${toolName} timed out after ${timeoutMs}ms`)); }, timeoutMs); promise .then(result => { clearTimeout(timer); resolve(result); }) .catch(error => { clearTimeout(timer); reject(error); }); }); } /** * 创建错误结果 */ private createErrorResult(request: ToolExecutionRequest, error: string): ToolExecutionResult { return { toolUseId: request.toolUseId, toolName: request.toolName, originalIndex: request.originalIndex, output: { content: `错误: ${error}`, success: false, error, }, durationMs: 0, startedAt: Date.now(), completedAt: Date.now(), }; } /** * 发送事件 */ private emitEvent(event: QueueEvent): void { if (this.eventCallback) { try { this.eventCallback(event); } catch (e) { this.logger.error(`Event callback error: ${e}`); } } } /** * 获取已注册工具列表(用于 Claude API) */ getClaudeTools(): Anthropic.Tool[] { return Array.from(this.toolRegistry.values()).map(def => def.tool); } /** * 获取工具定义(用于调试) */ getToolDefinition(name: string): ToolDefinition | undefined { return this.toolRegistry.get(name); } } ``` ## 5. Agent 调用执行器 当工具类型是 `agent_invocation` 时,executor 需要创建一个新的 Claude API 调用,运行 Specialist Agent 的 mini-loop。 ```typescript // agent-executor.ts import Anthropic from '@anthropic-ai/sdk'; import { ToolExecutionOutput } from './tool-execution.types'; import { BaseSpecialistService } from '../specialists/base-specialist.service'; /** * Agent 调用执行器工厂 * * 为每个 Specialist Agent 创建一个 executor 函数, * 该函数被注册到 ToolExecutionQueue 中。 */ export function createAgentExecutor( specialist: BaseSpecialistService, eventCallback?: (event: { turn: number; maxTurns: number }) => void, ): (input: Record) => Promise { return async (input: Record): Promise => { const startTime = Date.now(); let totalInputTokens = 0; let totalOutputTokens = 0; const toolCallsMade: string[] = []; try { // Specialist Agent 的 mini-loop(最多 3 轮) const result = await specialist.execute({ input, maxTurns: 3, onProgress: (turn, maxTurns) => { eventCallback?.({ turn, maxTurns }); }, onTokenUsage: (input, output) => { totalInputTokens += input; totalOutputTokens += output; }, onToolCall: (toolName) => { toolCallsMade.push(toolName); }, }); return { content: result.output, success: true, agentMetadata: { agentName: specialist.name, turnsUsed: result.turnsUsed, tokensUsed: { input: totalInputTokens, output: totalOutputTokens }, toolCallsMade, durationMs: Date.now() - startTime, }, }; } catch (error) { return { content: `Agent ${specialist.name} 执行失败: ${error instanceof Error ? error.message : String(error)}`, success: false, error: error instanceof Error ? error.message : String(error), agentMetadata: { agentName: specialist.name, turnsUsed: 0, tokensUsed: { input: totalInputTokens, output: totalOutputTokens }, toolCallsMade, durationMs: Date.now() - startTime, }, }; } }; } ``` ## 6. BaseSpecialistService 抽象类 ```typescript // base-specialist.service.ts import Anthropic from '@anthropic-ai/sdk'; /** * Specialist Agent 执行参数 */ export interface SpecialistExecuteParams { input: Record; maxTurns: number; onProgress?: (turn: number, maxTurns: number) => void; onTokenUsage?: (inputTokens: number, outputTokens: number) => void; onToolCall?: (toolName: string) => void; } /** * Specialist Agent 执行结果 */ export interface SpecialistExecuteResult { output: string; // 返回给 Coordinator 的文本 turnsUsed: number; metadata?: Record; } /** * Specialist Agent 基类 * * 每个 Specialist Agent 都继承此基类。 * 基类提供: * 1. 标准的 mini-loop(最多 N 轮) * 2. 工具执行 * 3. Token 追踪 * 4. 错误处理 */ export abstract class BaseSpecialistService { /** Agent 名称 */ abstract readonly name: string; /** 使用的模型 */ abstract readonly model: string; /** Agent 的 system prompt */ abstract readonly systemPrompt: string; /** Agent 可用的工具 */ abstract readonly tools: Anthropic.Tool[]; /** 最大输出 tokens */ abstract readonly maxTokens: number; protected client: Anthropic; constructor(client: Anthropic) { this.client = client; } /** * 执行 Agent 的 mini-loop * * 标准流程: * 1. 构建初始 messages(基于 input 参数) * 2. 调用 Claude API * 3. 如果有 tool_use → 执行工具 → 递归 * 4. 无 tool_use → 返回最终文本 * 5. 达到 maxTurns → 强制返回当前结果 */ async execute(params: SpecialistExecuteParams): Promise { const messages: Anthropic.MessageParam[] = this.buildInitialMessages(params.input); let turnsUsed = 0; while (turnsUsed < params.maxTurns) { turnsUsed++; params.onProgress?.(turnsUsed, params.maxTurns); // 调用 Claude API(非流式,Specialist 不需要流式输出) const response = await this.client.messages.create({ model: this.model, max_tokens: this.maxTokens, system: this.systemPrompt, messages, tools: this.tools.length > 0 ? this.tools : undefined, }); // 记录 token 使用 params.onTokenUsage?.( response.usage.input_tokens, response.usage.output_tokens, ); // 提取文本和 tool_use const textBlocks = response.content.filter(b => b.type === 'text'); const toolUses = response.content.filter(b => b.type === 'tool_use'); // 无工具调用 → 返回文本 if (toolUses.length === 0) { const outputText = textBlocks.map(b => (b as Anthropic.TextBlock).text).join('\n'); return { output: outputText, turnsUsed, metadata: this.extractMetadata(outputText), }; } // 有工具调用 → 执行工具 messages.push({ role: 'assistant', content: response.content }); const toolResults: Anthropic.ToolResultBlockParam[] = []; for (const toolUse of toolUses) { const tu = toolUse as Anthropic.ToolUseBlock; params.onToolCall?.(tu.name); const result = await this.executeSpecialistTool( tu.name, tu.input as Record, ); toolResults.push({ type: 'tool_result', tool_use_id: tu.id, content: typeof result === 'string' ? result : JSON.stringify(result), }); } messages.push({ role: 'user', content: toolResults }); } // 达到 maxTurns,强制返回最后的文本 return { output: `[Agent ${this.name} 达到最大轮次 (${params.maxTurns}),以下是当前结果]\n${this.extractLastText(messages)}`, turnsUsed, }; } /** * 构建初始 messages * 子类可以覆盖以定制输入格式 */ protected buildInitialMessages(input: Record): Anthropic.MessageParam[] { return [{ role: 'user', content: JSON.stringify(input, null, 2), }]; } /** * 执行 Specialist 自己的工具 * 子类必须实现 */ protected abstract executeSpecialistTool( toolName: string, input: Record, ): Promise; /** * 从输出中提取结构化元数据 * 子类可选覆盖 */ protected extractMetadata(output: string): Record | undefined { return undefined; } /** * 提取 messages 中最后一条文本 */ private extractLastText(messages: Anthropic.MessageParam[]): string { for (let i = messages.length - 1; i >= 0; i--) { const msg = messages[i]; if (msg.role === 'assistant') { if (typeof msg.content === 'string') return msg.content; if (Array.isArray(msg.content)) { const textBlocks = msg.content.filter((b: any) => b.type === 'text'); return textBlocks.map((b: any) => b.text).join('\n'); } } } return '(无输出)'; } } ``` ## 7. 执行流程图 ``` Claude Coordinator 返回 tool_use blocks: [invoke_policy_expert, invoke_case_analyst, save_user_memory, invoke_assessment_expert] ToolExecutionQueue.executeAll(): Step 1: 遍历 tool_use 列表 ├─ invoke_policy_expert → isConcurrencySafe: true → 加入 Batch A ├─ invoke_case_analyst → isConcurrencySafe: true → 加入 Batch A ├─ save_user_memory → isConcurrencySafe: false → 触发排空! │ ├─ 先执行 Batch A(并行): │ │ ├─ invoke_policy_expert ──→ PolicyExpert.execute() ──→ Claude API │ │ └─ invoke_case_analyst ──→ CaseAnalyst.execute() ──→ Claude API │ │ ↑ 这两个并行执行! │ ├─ Batch A 完成 │ └─ 再执行 save_user_memory(串行): │ └─ knowledgeClient.saveUserMemory() ──→ knowledge-service HTTP │ └─ invoke_assessment_expert → isConcurrencySafe: true → 加入 Batch B Step 2: 执行最后的 Batch B: └─ invoke_assessment_expert ──→ AssessmentExpert.execute() ──→ Claude API Step 3: 按 originalIndex 排序结果: [0] invoke_policy_expert → "高才通B类要求..." [1] invoke_case_analyst → "类似案例:张先生..." [2] save_user_memory → "已保存用户信息" [3] invoke_assessment_expert → "评估结果: {score: 75, ...}" Step 4: 返回排序后的结果 → Coordinator 继续处理 ``` ## 8. 与 Agent Loop 的集成 ```typescript // coordinator-agent.service.ts (片段) // 在 agentLoop 中使用 ToolExecutionQueue async function* agentLoop(params: AgentLoopParams): AsyncGenerator { // ... (Step 1-4: context injection, API call, stream collection) // Step 5: 提取 tool_use blocks const toolUses = assistantBlocks.filter(b => b.type === 'tool_use'); if (toolUses.length === 0) { return; // 自然终止 } // Step 6: 构建执行请求 const requests: ToolExecutionRequest[] = toolUses.map((tu, index) => ({ toolUseId: (tu as Anthropic.ToolUseBlock).id, toolName: (tu as Anthropic.ToolUseBlock).name, input: (tu as Anthropic.ToolUseBlock).input as Record, originalIndex: index, })); // Step 7: 设置事件回调 → 转发到前端 toolExecutionQueue.onEvent((event) => { switch (event.type) { case 'agent_started': // yield 不能在回调中用,需要通过 EventEmitter 或 channel streamChannel.push({ type: 'agent_start', agentName: event.agentName }); break; case 'agent_completed': streamChannel.push({ type: 'agent_complete', agentName: event.agentName, durationMs: event.durationMs }); break; case 'tool_error': streamChannel.push({ type: 'tool_error', toolName: event.toolName, error: event.error }); break; } }); // Step 8: 执行所有工具 const results = await toolExecutionQueue.executeAll(requests); // Step 9: 构建 tool_results 消息 const toolResultBlocks: Anthropic.ToolResultBlockParam[] = results.map(r => ({ type: 'tool_result' as const, tool_use_id: r.toolUseId, content: r.output.content, is_error: !r.output.success, })); // Step 10: 递归 — 将结果喂回 Coordinator const newMessages = [ ...params.messages, { role: 'assistant' as const, content: assistantBlocks }, { role: 'user' as const, content: toolResultBlocks }, ]; yield* agentLoop({ ...params, messages: newMessages }); } ``` ## 9. 错误处理策略 ### 9.1 工具级错误处理 | 错误类型 | 处理方式 | 返回给 Coordinator | |----------|---------|-------------------| | **超时** | 超过 `timeoutMs` | `"工具执行超时,请重试或使用其他方式获取信息。"` | | **API 错误** (knowledge-service 不可用) | 捕获异常 | `"知识库服务暂时不可用,请根据已知信息回答。"` | | **Agent 内部错误** (Specialist Claude API 失败) | 捕获异常 | `"专家 Agent 暂时不可用: {error}. 请直接回答用户。"` | | **输入验证失败** | 早期返回 | `"工具输入参数无效: {details}"` | | **Rate Limit** | 指数退避重试 1 次 | 重试成功则正常返回,否则返回错误 | | **未知工具** | 跳过 | `"未知工具: {toolName}"` | ### 9.2 批次级错误处理 ```typescript /** * Promise.allSettled 确保批次中一个工具的失败不会导致整个批次失败。 * * 示例: * Batch = [invoke_policy_expert, invoke_case_analyst] * * 如果 invoke_policy_expert 超时: * - invoke_case_analyst 正常返回其结果 * - invoke_policy_expert 返回错误消息 * - Coordinator 根据有效的 case_analyst 结果继续生成回复 * * Coordinator 的 Prompt 中包含指引: * "如果某个工具调用返回错误,请根据其他可用信息继续回复用户, * 不要向用户暴露技术错误细节。" */ ``` ### 9.3 Agent 执行重试 ```typescript /** * Specialist Agent 的重试策略(在 base-specialist.service.ts 中) */ private async callClaudeWithRetry( params: Anthropic.MessageCreateParams, maxRetries: number = 1, ): Promise { let lastError: Error | null = null; for (let attempt = 0; attempt <= maxRetries; attempt++) { try { return await this.client.messages.create(params); } catch (error) { lastError = error instanceof Error ? error : new Error(String(error)); if (this.isRateLimitError(error) && attempt < maxRetries) { const retryAfter = this.extractRetryAfter(error); await this.sleep(retryAfter || 1000 * (attempt + 1)); continue; } if (this.isOverloadedError(error) && attempt < maxRetries) { // 降级到 Haiku 模型 return await this.client.messages.create({ ...params, model: 'claude-haiku-4-20250514', }); } throw error; } } throw lastError; } ``` ## 10. 性能优化 ### 10.1 并行度分析 ``` 典型场景 1: 用户问政策 + 系统需要保存记忆 tool_uses: [invoke_policy_expert, invoke_memory_manager] 顺序执行: 3s + 2s = 5s 并发执行: invoke_memory_manager 不是 safe,但在 invoke_policy_expert 之后 → Batch [invoke_policy_expert] (3s) → invoke_memory_manager (2s) = 5s 这种情况并发无优势 典型场景 2: 评估用户 + 查找案例 + 获取策略建议 tool_uses: [invoke_assessment_expert, invoke_case_analyst, invoke_strategist] 顺序执行: 4s + 2s + 2s = 8s 并发执行: 全部 safe → Batch 并行 = max(4s, 2s, 2s) = 4s 节省 50% 时间! 典型场景 3: 保存记忆 + 评估 + 生成支付 tool_uses: [save_user_memory, invoke_assessment_expert, generate_payment] 顺序执行: 1s + 4s + 2s = 7s 我们的算法: save_user_memory (not safe) → 单独执行 (1s) invoke_assessment_expert (safe) → Batch B generate_payment (not safe) → 先执行 Batch B (4s), 再单独 (2s) 总计: 1s + 4s + 2s = 7s (这种情况并发无优势) ``` ### 10.2 超时配置建议 | 工具类型 | 建议超时 | 原因 | |----------|---------|------| | Agent 调用 (Sonnet) | 30s | Sonnet 单次调用约 3-8s,Agent 最多 3 轮 | | Agent 调用 (Haiku) | 15s | Haiku 更快 | | search_knowledge | 10s | HTTP + 向量检索 | | save_user_memory | 5s | HTTP + 写入 | | generate_payment | 10s | 外部支付 API | | get_current_datetime | 1s | 本地计算 | | web_search | 15s | 外部搜索 API | ### 10.3 Token 成本追踪 ```typescript /** * 在 ToolExecutionQueue 层面追踪所有 Agent 调用的 token 开销 */ interface AgentCostSummary { agentName: string; inputTokens: number; outputTokens: number; estimatedCostUsd: number; turnsUsed: number; durationMs: number; } /** * 每次 executeAll 后可以获取本轮的成本汇总 */ getCostSummary(results: ToolExecutionResult[]): AgentCostSummary[] { return results .filter(r => r.output.agentMetadata) .map(r => { const meta = r.output.agentMetadata!; const inputCost = meta.tokensUsed.input * 0.003 / 1000; // Sonnet input price const outputCost = meta.tokensUsed.output * 0.015 / 1000; // Sonnet output price return { agentName: meta.agentName, inputTokens: meta.tokensUsed.input, outputTokens: meta.tokensUsed.output, estimatedCostUsd: inputCost + outputCost, turnsUsed: meta.turnsUsed, durationMs: meta.durationMs, }; }); } ``` ## 11. 工具注册示例 ```typescript // tool-registry.ts - 工具注册 export function registerAllTools( queue: ToolExecutionQueue, specialists: SpecialistServices, directTools: DirectToolServices, ): void { // ======== Agent 调用工具 ======== queue.registerTool('invoke_policy_expert', { tool: { name: 'invoke_policy_expert', description: '调用政策专家 Agent,查询香港移民政策的详细信息、条件要求、申请流程等。', input_schema: { type: 'object', properties: { query: { type: 'string', description: '要查询的政策问题' }, category: { type: 'string', description: '移民类别(可选):TTPS/QMAS/GEP/IANG/CIES/TECHTAS' }, }, required: ['query'], }, }, isConcurrencySafe: true, toolType: 'agent_invocation', timeoutMs: 30000, estimatedDurationMs: 5000, executor: createAgentExecutor(specialists.policyExpert), }); queue.registerTool('invoke_assessment_expert', { tool: { name: 'invoke_assessment_expert', description: '调用评估专家 Agent,根据用户信息评估移民资格和推荐方案。', input_schema: { type: 'object', properties: { userInfo: { type: 'object', description: '用户信息(age, education, workYears 等)' }, targetCategories: { type: 'array', items: { type: 'string' }, description: '要评估的目标类别(可选,默认评估所有)', }, }, required: ['userInfo'], }, }, isConcurrencySafe: true, toolType: 'agent_invocation', timeoutMs: 30000, estimatedDurationMs: 6000, executor: createAgentExecutor(specialists.assessmentExpert), }); queue.registerTool('invoke_memory_manager', { tool: { name: 'invoke_memory_manager', description: '调用记忆管理 Agent,提取和保存用户信息到长期记忆。', input_schema: { type: 'object', properties: { action: { type: 'string', enum: ['extract', 'save', 'load'] }, userMessage: { type: 'string', description: '用户消息(extract 时必须)' }, assistantMessage: { type: 'string', description: '助手回复(extract 时必须)' }, memoryData: { type: 'object', description: '要保存的记忆(save 时必须)' }, }, required: ['action'], }, }, isConcurrencySafe: false, // 可能写入! toolType: 'agent_invocation', timeoutMs: 15000, estimatedDurationMs: 3000, executor: createAgentExecutor(specialists.memoryManager), }); // ======== 直接工具 ======== queue.registerTool('generate_payment', { tool: { name: 'generate_payment', description: '生成付费评估服务的支付链接。', input_schema: { type: 'object', properties: { service: { type: 'string', enum: ['detailed_assessment'] }, userId: { type: 'string' }, }, required: ['service', 'userId'], }, }, isConcurrencySafe: false, // 产生订单,有副作用 toolType: 'direct_tool', timeoutMs: 10000, estimatedDurationMs: 2000, executor: directTools.generatePayment, }); queue.registerTool('get_current_datetime', { tool: { name: 'get_current_datetime', description: '获取当前日期和时间。', input_schema: { type: 'object', properties: {} }, }, isConcurrencySafe: true, toolType: 'direct_tool', timeoutMs: 1000, estimatedDurationMs: 5, executor: async () => ({ content: new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Hong_Kong' }), success: true, }), }); // ... 注册其他工具 } ``` ## 12. 与旧架构的对比 | 方面 | 旧架构 (ClaudeAgentServiceV2) | 新架构 (ToolExecutionQueue) | |------|-------------------------------|--------------------------| | 工具执行 | `for` 循环串行逐个执行 | 智能分批:safe 并行,unsafe 串行 | | 执行超时 | 无超时控制 | 每个工具独立超时 | | 错误隔离 | 一个工具失败 → 整个循环中断 | 一个失败不影响其他 | | 事件通知 | 无实时进度 | 完整的 QueueEvent 流 | | Agent 子调用 | N/A(无 Agent 概念) | 每个 Agent 独立 mini-loop | | 成本追踪 | 仅追踪 Coordinator 的 tokens | 追踪每个 Agent 的 tokens | | 扩展性 | 改 `executeTool()` switch 语句 | `registerTool()` 声明式注册 |