iconsulting/docs/architecture/09-tool-execution.md

34 KiB
Raw Permalink Blame History

09 - 工具并发执行系统 (Tool Execution System)

1. 设计哲学

借鉴 Claude Code 的 ToolExecutionQueueQ80 class设计

并发安全的工具并行执行,串行约束的工具顺序执行。

当 Coordinator Agent 在一次回复中返回多个 tool_use blocks 时,系统需要智能地决定哪些工具可以并行执行、哪些必须串行等待。这直接影响用户体验(响应延迟)和系统正确性(数据一致性)。

核心规则:

  1. 读操作并行search_knowledgeinvoke_*Agent 调用)等纯读操作可以并行
  2. 写操作串行save_user_memorygenerate_payment 等有副作用的操作必须串行
  3. 结果有序:无论执行顺序如何,结果按 Claude 返回的 tool_use 顺序排列
  4. 错误隔离:一个工具失败不影响其他工具的执行

2. 并发安全性声明

2.1 工具并发安全性矩阵

工具名 类型 isConcurrencySafe 原因
invoke_policy_expert Agent 调用 true 纯读,独立 API 调用
invoke_assessment_expert Agent 调用 true 纯读,独立 API 调用
invoke_strategist Agent 调用 true 纯读,独立 API 调用
invoke_objection_handler Agent 调用 true 纯读,独立 API 调用
invoke_case_analyst Agent 调用 true 纯读,独立 API 调用
invoke_memory_manager Agent 调用 false 可能写入记忆
search_knowledge 直接工具 true 纯读
get_user_context 直接工具 true 纯读
save_user_memory 直接工具 false 写操作
generate_payment 直接工具 false 产生支付订单,有副作用
get_current_datetime 直接工具 true 纯计算
web_search 直接工具 true 纯读,外部 API
get_exchange_rate 直接工具 true 纯读
check_off_topic 直接工具 true 纯读

2.2 并发安全判定规则

/**
 * 判断工具是否并发安全的规则:
 *
 * 1. Agent 调用类 (invoke_*):
 *    - 默认并发安全(各 Agent 独立运行)
 *    - 例外: invoke_memory_manager因为它可能调用 save_user_memory
 *
 * 2. 读操作类:
 *    - search_*, get_*, check_* → 并发安全
 *
 * 3. 写操作类:
 *    - save_*, update_*, delete_* → 非并发安全
 *
 * 4. 副作用类:
 *    - generate_payment, send_notification → 非并发安全
 */

3. TypeScript 类型定义

// tool-execution.types.ts

import Anthropic from '@anthropic-ai/sdk';

/**
 * 工具定义(扩展 Claude 的 Tool 类型)
 */
export interface ToolDefinition {
  /** Claude API 工具定义 */
  tool: Anthropic.Tool;

  /** 是否可以与其他工具并发执行 */
  isConcurrencySafe: boolean;

  /** 工具类型agent 调用 vs 直接工具 */
  toolType: 'agent_invocation' | 'direct_tool';

  /** 执行超时时间 (ms) */
  timeoutMs: number;

  /** 执行器函数 */
  executor: (input: Record<string, unknown>) => Promise<ToolExecutionOutput>;

  /** 预估耗时 (ms),用于前端显示和排序 */
  estimatedDurationMs: number;
}

/**
 * 工具执行输出
 */
export interface ToolExecutionOutput {
  /** 返回给 Claude 的文本结果 */
  content: string;

  /** 执行是否成功 */
  success: boolean;

  /** 错误信息(如果失败) */
  error?: string;

  /** Agent 执行元数据(仅 agent_invocation 类型) */
  agentMetadata?: {
    agentName: string;
    turnsUsed: number;
    tokensUsed: { input: number; output: number };
    toolCallsMade: string[];
    durationMs: number;
  };
}

/**
 * 工具执行请求(从 Claude 返回的 tool_use block
 */
export interface ToolExecutionRequest {
  /** Claude 分配的 tool_use ID */
  toolUseId: string;

  /** 工具名称 */
  toolName: string;

  /** 工具输入参数 */
  input: Record<string, unknown>;

  /** 在 tool_use 数组中的原始位置(用于结果排序) */
  originalIndex: number;
}

/**
 * 工具执行结果
 */
export interface ToolExecutionResult {
  /** Claude 分配的 tool_use ID */
  toolUseId: string;

  /** 工具名称 */
  toolName: string;

  /** 原始位置 */
  originalIndex: number;

  /** 执行输出 */
  output: ToolExecutionOutput;

  /** 执行耗时 (ms) */
  durationMs: number;

  /** 开始时间 */
  startedAt: number;

  /** 结束时间 */
  completedAt: number;
}

/**
 * 队列状态事件(用于 stream 通知前端)
 */
export type QueueEvent =
  | { type: 'tool_queued'; toolName: string; toolUseId: string; position: number }
  | { type: 'tool_started'; toolName: string; toolUseId: string }
  | { type: 'tool_completed'; toolName: string; toolUseId: string; durationMs: number; success: boolean }
  | { type: 'tool_error'; toolName: string; toolUseId: string; error: string }
  | { type: 'agent_started'; agentName: string; toolUseId: string }
  | { type: 'agent_progress'; agentName: string; toolUseId: string; turn: number; maxTurns: number }
  | { type: 'agent_completed'; agentName: string; toolUseId: string; durationMs: number }
  | { type: 'queue_drained' };

4. ToolExecutionQueue 完整实现

// tool-execution-queue.ts

import { Injectable, Logger } from '@nestjs/common';
import {
  ToolDefinition,
  ToolExecutionRequest,
  ToolExecutionResult,
  ToolExecutionOutput,
  QueueEvent,
} from './tool-execution.types';

/**
 * 工具并发执行队列
 *
 * 核心算法:
 * 1. 接收一组 tool_use requests
 * 2. 按原始顺序遍历
 * 3. 将连续的 concurrency-safe 工具收集到一个 "并发批次"
 * 4. 遇到非 safe 工具时:
 *    a. 先执行await当前并发批次
 *    b. 再单独执行非 safe 工具
 * 5. 继续处理剩余工具
 * 6. 最终按 originalIndex 排序返回所有结果
 */
@Injectable()
export class ToolExecutionQueue {
  private readonly logger = new Logger(ToolExecutionQueue.name);

  /** 已注册的工具定义 */
  private toolRegistry = new Map<string, ToolDefinition>();

  /** 事件回调(用于 stream 通知) */
  private eventCallback?: (event: QueueEvent) => void;

  /**
   * 注册工具定义
   */
  registerTool(name: string, definition: ToolDefinition): void {
    this.toolRegistry.set(name, definition);
    this.logger.log(`Registered tool: ${name} (concurrencySafe: ${definition.isConcurrencySafe})`);
  }

  /**
   * 批量注册工具
   */
  registerTools(tools: Record<string, ToolDefinition>): void {
    for (const [name, def] of Object.entries(tools)) {
      this.registerTool(name, def);
    }
  }

  /**
   * 设置事件回调(用于实时通知前端)
   */
  onEvent(callback: (event: QueueEvent) => void): void {
    this.eventCallback = callback;
  }

  /**
   * 执行所有工具
   *
   * 这是主入口方法。接收 Claude 返回的所有 tool_use blocks
   * 按并发安全性分组执行,返回有序的结果。
   *
   * @param requests - 从 Claude 响应中提取的 tool_use 请求列表
   * @returns 按 originalIndex 排序的执行结果
   */
  async executeAll(requests: ToolExecutionRequest[]): Promise<ToolExecutionResult[]> {
    if (requests.length === 0) return [];

    this.logger.log(`Executing ${requests.length} tools: ${requests.map(r => r.toolName).join(', ')}`);

    const allResults: ToolExecutionResult[] = [];

    // ======== 核心算法:分批执行 ========
    let currentBatch: ToolExecutionRequest[] = [];

    for (const request of requests) {
      const definition = this.toolRegistry.get(request.toolName);

      if (!definition) {
        // 未注册的工具 → 返回错误结果
        allResults.push(this.createErrorResult(request, `Unknown tool: ${request.toolName}`));
        continue;
      }

      this.emitEvent({ type: 'tool_queued', toolName: request.toolName, toolUseId: request.toolUseId, position: request.originalIndex });

      if (definition.isConcurrencySafe) {
        // 并发安全 → 加入当前批次
        currentBatch.push(request);
      } else {
        // 非并发安全 → 先执行当前批次,再单独执行此工具

        // 1. 执行当前并发批次
        if (currentBatch.length > 0) {
          const batchResults = await this.executeConcurrentBatch(currentBatch);
          allResults.push(...batchResults);
          currentBatch = [];
        }

        // 2. 单独执行非安全工具(串行)
        const result = await this.executeSingleTool(request, definition);
        allResults.push(result);
      }
    }

    // 执行最后一个并发批次(如果有)
    if (currentBatch.length > 0) {
      const batchResults = await this.executeConcurrentBatch(currentBatch);
      allResults.push(...batchResults);
    }

    // 按 originalIndex 排序
    allResults.sort((a, b) => a.originalIndex - b.originalIndex);

    this.emitEvent({ type: 'queue_drained' });
    this.logger.log(`All ${allResults.length} tools completed`);

    return allResults;
  }

  /**
   * 并行执行一批并发安全的工具
   */
  private async executeConcurrentBatch(batch: ToolExecutionRequest[]): Promise<ToolExecutionResult[]> {
    if (batch.length === 0) return [];

    if (batch.length === 1) {
      // 单个工具不需要 Promise.all
      const def = this.toolRegistry.get(batch[0].toolName)!;
      return [await this.executeSingleTool(batch[0], def)];
    }

    this.logger.log(`Executing concurrent batch: ${batch.map(r => r.toolName).join(', ')}`);

    // 并行执行所有工具
    const promises = batch.map(request => {
      const definition = this.toolRegistry.get(request.toolName)!;
      return this.executeSingleTool(request, definition);
    });

    // Promise.allSettled 确保一个失败不影响其他
    const settled = await Promise.allSettled(promises);

    return settled.map((result, index) => {
      if (result.status === 'fulfilled') {
        return result.value;
      } else {
        return this.createErrorResult(batch[index], `Execution failed: ${result.reason}`);
      }
    });
  }

  /**
   * 执行单个工具
   */
  private async executeSingleTool(
    request: ToolExecutionRequest,
    definition: ToolDefinition,
  ): Promise<ToolExecutionResult> {
    const startedAt = Date.now();

    // 通知Agent 调用开始
    if (definition.toolType === 'agent_invocation') {
      const agentName = request.toolName.replace('invoke_', '');
      this.emitEvent({ type: 'agent_started', agentName, toolUseId: request.toolUseId });
    }

    this.emitEvent({ type: 'tool_started', toolName: request.toolName, toolUseId: request.toolUseId });

    try {
      // 带超时的执行
      const output = await this.executeWithTimeout(
        definition.executor(request.input),
        definition.timeoutMs,
        request.toolName,
      );

      const completedAt = Date.now();
      const durationMs = completedAt - startedAt;

      // 通知:完成
      this.emitEvent({
        type: 'tool_completed',
        toolName: request.toolName,
        toolUseId: request.toolUseId,
        durationMs,
        success: output.success,
      });

      if (definition.toolType === 'agent_invocation') {
        const agentName = request.toolName.replace('invoke_', '');
        this.emitEvent({ type: 'agent_completed', agentName, toolUseId: request.toolUseId, durationMs });
      }

      this.logger.log(`Tool ${request.toolName} completed in ${durationMs}ms (success: ${output.success})`);

      return {
        toolUseId: request.toolUseId,
        toolName: request.toolName,
        originalIndex: request.originalIndex,
        output,
        durationMs,
        startedAt,
        completedAt,
      };
    } catch (error) {
      const completedAt = Date.now();
      const durationMs = completedAt - startedAt;
      const errorMessage = error instanceof Error ? error.message : String(error);

      this.logger.error(`Tool ${request.toolName} failed after ${durationMs}ms: ${errorMessage}`);

      this.emitEvent({
        type: 'tool_error',
        toolName: request.toolName,
        toolUseId: request.toolUseId,
        error: errorMessage,
      });

      return {
        toolUseId: request.toolUseId,
        toolName: request.toolName,
        originalIndex: request.originalIndex,
        output: {
          content: `工具执行失败: ${errorMessage}。请重试或使用其他方式获取信息。`,
          success: false,
          error: errorMessage,
        },
        durationMs,
        startedAt,
        completedAt,
      };
    }
  }

  /**
   * 带超时的执行
   */
  private async executeWithTimeout(
    promise: Promise<ToolExecutionOutput>,
    timeoutMs: number,
    toolName: string,
  ): Promise<ToolExecutionOutput> {
    return new Promise<ToolExecutionOutput>((resolve, reject) => {
      const timer = setTimeout(() => {
        reject(new Error(`Tool ${toolName} timed out after ${timeoutMs}ms`));
      }, timeoutMs);

      promise
        .then(result => {
          clearTimeout(timer);
          resolve(result);
        })
        .catch(error => {
          clearTimeout(timer);
          reject(error);
        });
    });
  }

  /**
   * 创建错误结果
   */
  private createErrorResult(request: ToolExecutionRequest, error: string): ToolExecutionResult {
    return {
      toolUseId: request.toolUseId,
      toolName: request.toolName,
      originalIndex: request.originalIndex,
      output: {
        content: `错误: ${error}`,
        success: false,
        error,
      },
      durationMs: 0,
      startedAt: Date.now(),
      completedAt: Date.now(),
    };
  }

  /**
   * 发送事件
   */
  private emitEvent(event: QueueEvent): void {
    if (this.eventCallback) {
      try {
        this.eventCallback(event);
      } catch (e) {
        this.logger.error(`Event callback error: ${e}`);
      }
    }
  }

  /**
   * 获取已注册工具列表(用于 Claude API
   */
  getClaudeTools(): Anthropic.Tool[] {
    return Array.from(this.toolRegistry.values()).map(def => def.tool);
  }

  /**
   * 获取工具定义(用于调试)
   */
  getToolDefinition(name: string): ToolDefinition | undefined {
    return this.toolRegistry.get(name);
  }
}

5. Agent 调用执行器

当工具类型是 agent_invocationexecutor 需要创建一个新的 Claude API 调用,运行 Specialist Agent 的 mini-loop。

// agent-executor.ts

import Anthropic from '@anthropic-ai/sdk';
import { ToolExecutionOutput } from './tool-execution.types';
import { BaseSpecialistService } from '../specialists/base-specialist.service';

/**
 * Agent 调用执行器工厂
 *
 * 为每个 Specialist Agent 创建一个 executor 函数,
 * 该函数被注册到 ToolExecutionQueue 中。
 */
export function createAgentExecutor(
  specialist: BaseSpecialistService,
  eventCallback?: (event: { turn: number; maxTurns: number }) => void,
): (input: Record<string, unknown>) => Promise<ToolExecutionOutput> {
  return async (input: Record<string, unknown>): Promise<ToolExecutionOutput> => {
    const startTime = Date.now();
    let totalInputTokens = 0;
    let totalOutputTokens = 0;
    const toolCallsMade: string[] = [];

    try {
      // Specialist Agent 的 mini-loop最多 3 轮)
      const result = await specialist.execute({
        input,
        maxTurns: 3,
        onProgress: (turn, maxTurns) => {
          eventCallback?.({ turn, maxTurns });
        },
        onTokenUsage: (input, output) => {
          totalInputTokens += input;
          totalOutputTokens += output;
        },
        onToolCall: (toolName) => {
          toolCallsMade.push(toolName);
        },
      });

      return {
        content: result.output,
        success: true,
        agentMetadata: {
          agentName: specialist.name,
          turnsUsed: result.turnsUsed,
          tokensUsed: { input: totalInputTokens, output: totalOutputTokens },
          toolCallsMade,
          durationMs: Date.now() - startTime,
        },
      };
    } catch (error) {
      return {
        content: `Agent ${specialist.name} 执行失败: ${error instanceof Error ? error.message : String(error)}`,
        success: false,
        error: error instanceof Error ? error.message : String(error),
        agentMetadata: {
          agentName: specialist.name,
          turnsUsed: 0,
          tokensUsed: { input: totalInputTokens, output: totalOutputTokens },
          toolCallsMade,
          durationMs: Date.now() - startTime,
        },
      };
    }
  };
}

6. BaseSpecialistService 抽象类

// base-specialist.service.ts

import Anthropic from '@anthropic-ai/sdk';

/**
 * Specialist Agent 执行参数
 */
export interface SpecialistExecuteParams {
  input: Record<string, unknown>;
  maxTurns: number;
  onProgress?: (turn: number, maxTurns: number) => void;
  onTokenUsage?: (inputTokens: number, outputTokens: number) => void;
  onToolCall?: (toolName: string) => void;
}

/**
 * Specialist Agent 执行结果
 */
export interface SpecialistExecuteResult {
  output: string;          // 返回给 Coordinator 的文本
  turnsUsed: number;
  metadata?: Record<string, unknown>;
}

/**
 * Specialist Agent 基类
 *
 * 每个 Specialist Agent 都继承此基类。
 * 基类提供:
 * 1. 标准的 mini-loop最多 N 轮)
 * 2. 工具执行
 * 3. Token 追踪
 * 4. 错误处理
 */
export abstract class BaseSpecialistService {
  /** Agent 名称 */
  abstract readonly name: string;

  /** 使用的模型 */
  abstract readonly model: string;

  /** Agent 的 system prompt */
  abstract readonly systemPrompt: string;

  /** Agent 可用的工具 */
  abstract readonly tools: Anthropic.Tool[];

  /** 最大输出 tokens */
  abstract readonly maxTokens: number;

  protected client: Anthropic;

  constructor(client: Anthropic) {
    this.client = client;
  }

  /**
   * 执行 Agent 的 mini-loop
   *
   * 标准流程:
   * 1. 构建初始 messages基于 input 参数)
   * 2. 调用 Claude API
   * 3. 如果有 tool_use → 执行工具 → 递归
   * 4. 无 tool_use → 返回最终文本
   * 5. 达到 maxTurns → 强制返回当前结果
   */
  async execute(params: SpecialistExecuteParams): Promise<SpecialistExecuteResult> {
    const messages: Anthropic.MessageParam[] = this.buildInitialMessages(params.input);
    let turnsUsed = 0;

    while (turnsUsed < params.maxTurns) {
      turnsUsed++;
      params.onProgress?.(turnsUsed, params.maxTurns);

      // 调用 Claude API非流式Specialist 不需要流式输出)
      const response = await this.client.messages.create({
        model: this.model,
        max_tokens: this.maxTokens,
        system: this.systemPrompt,
        messages,
        tools: this.tools.length > 0 ? this.tools : undefined,
      });

      // 记录 token 使用
      params.onTokenUsage?.(
        response.usage.input_tokens,
        response.usage.output_tokens,
      );

      // 提取文本和 tool_use
      const textBlocks = response.content.filter(b => b.type === 'text');
      const toolUses = response.content.filter(b => b.type === 'tool_use');

      // 无工具调用 → 返回文本
      if (toolUses.length === 0) {
        const outputText = textBlocks.map(b => (b as Anthropic.TextBlock).text).join('\n');
        return {
          output: outputText,
          turnsUsed,
          metadata: this.extractMetadata(outputText),
        };
      }

      // 有工具调用 → 执行工具
      messages.push({ role: 'assistant', content: response.content });

      const toolResults: Anthropic.ToolResultBlockParam[] = [];
      for (const toolUse of toolUses) {
        const tu = toolUse as Anthropic.ToolUseBlock;
        params.onToolCall?.(tu.name);

        const result = await this.executeSpecialistTool(
          tu.name,
          tu.input as Record<string, unknown>,
        );

        toolResults.push({
          type: 'tool_result',
          tool_use_id: tu.id,
          content: typeof result === 'string' ? result : JSON.stringify(result),
        });
      }

      messages.push({ role: 'user', content: toolResults });
    }

    // 达到 maxTurns强制返回最后的文本
    return {
      output: `[Agent ${this.name} 达到最大轮次 (${params.maxTurns}),以下是当前结果]\n${this.extractLastText(messages)}`,
      turnsUsed,
    };
  }

  /**
   * 构建初始 messages
   * 子类可以覆盖以定制输入格式
   */
  protected buildInitialMessages(input: Record<string, unknown>): Anthropic.MessageParam[] {
    return [{
      role: 'user',
      content: JSON.stringify(input, null, 2),
    }];
  }

  /**
   * 执行 Specialist 自己的工具
   * 子类必须实现
   */
  protected abstract executeSpecialistTool(
    toolName: string,
    input: Record<string, unknown>,
  ): Promise<unknown>;

  /**
   * 从输出中提取结构化元数据
   * 子类可选覆盖
   */
  protected extractMetadata(output: string): Record<string, unknown> | undefined {
    return undefined;
  }

  /**
   * 提取 messages 中最后一条文本
   */
  private extractLastText(messages: Anthropic.MessageParam[]): string {
    for (let i = messages.length - 1; i >= 0; i--) {
      const msg = messages[i];
      if (msg.role === 'assistant') {
        if (typeof msg.content === 'string') return msg.content;
        if (Array.isArray(msg.content)) {
          const textBlocks = msg.content.filter((b: any) => b.type === 'text');
          return textBlocks.map((b: any) => b.text).join('\n');
        }
      }
    }
    return '(无输出)';
  }
}

7. 执行流程图

Claude Coordinator 返回 tool_use blocks:
  [invoke_policy_expert, invoke_case_analyst, save_user_memory, invoke_assessment_expert]

ToolExecutionQueue.executeAll():

Step 1: 遍历 tool_use 列表
  ├─ invoke_policy_expert    → isConcurrencySafe: true  → 加入 Batch A
  ├─ invoke_case_analyst     → isConcurrencySafe: true  → 加入 Batch A
  ├─ save_user_memory        → isConcurrencySafe: false → 触发排空!
  │   ├─ 先执行 Batch A并行:
  │   │   ├─ invoke_policy_expert ──→ PolicyExpert.execute() ──→ Claude API
  │   │   └─ invoke_case_analyst  ──→ CaseAnalyst.execute()  ──→ Claude API
  │   │                                    ↑ 这两个并行执行!
  │   ├─ Batch A 完成
  │   └─ 再执行 save_user_memory串行:
  │       └─ knowledgeClient.saveUserMemory() ──→ knowledge-service HTTP
  │
  └─ invoke_assessment_expert → isConcurrencySafe: true  → 加入 Batch B

Step 2: 执行最后的 Batch B:
  └─ invoke_assessment_expert ──→ AssessmentExpert.execute() ──→ Claude API

Step 3: 按 originalIndex 排序结果:
  [0] invoke_policy_expert    → "高才通B类要求..."
  [1] invoke_case_analyst     → "类似案例:张先生..."
  [2] save_user_memory        → "已保存用户信息"
  [3] invoke_assessment_expert → "评估结果: {score: 75, ...}"

Step 4: 返回排序后的结果 → Coordinator 继续处理

8. 与 Agent Loop 的集成

// coordinator-agent.service.ts (片段)

// 在 agentLoop 中使用 ToolExecutionQueue

async function* agentLoop(params: AgentLoopParams): AsyncGenerator<StreamEvent> {
  // ... (Step 1-4: context injection, API call, stream collection)

  // Step 5: 提取 tool_use blocks
  const toolUses = assistantBlocks.filter(b => b.type === 'tool_use');

  if (toolUses.length === 0) {
    return; // 自然终止
  }

  // Step 6: 构建执行请求
  const requests: ToolExecutionRequest[] = toolUses.map((tu, index) => ({
    toolUseId: (tu as Anthropic.ToolUseBlock).id,
    toolName: (tu as Anthropic.ToolUseBlock).name,
    input: (tu as Anthropic.ToolUseBlock).input as Record<string, unknown>,
    originalIndex: index,
  }));

  // Step 7: 设置事件回调 → 转发到前端
  toolExecutionQueue.onEvent((event) => {
    switch (event.type) {
      case 'agent_started':
        // yield 不能在回调中用,需要通过 EventEmitter 或 channel
        streamChannel.push({ type: 'agent_start', agentName: event.agentName });
        break;
      case 'agent_completed':
        streamChannel.push({ type: 'agent_complete', agentName: event.agentName, durationMs: event.durationMs });
        break;
      case 'tool_error':
        streamChannel.push({ type: 'tool_error', toolName: event.toolName, error: event.error });
        break;
    }
  });

  // Step 8: 执行所有工具
  const results = await toolExecutionQueue.executeAll(requests);

  // Step 9: 构建 tool_results 消息
  const toolResultBlocks: Anthropic.ToolResultBlockParam[] = results.map(r => ({
    type: 'tool_result' as const,
    tool_use_id: r.toolUseId,
    content: r.output.content,
    is_error: !r.output.success,
  }));

  // Step 10: 递归 — 将结果喂回 Coordinator
  const newMessages = [
    ...params.messages,
    { role: 'assistant' as const, content: assistantBlocks },
    { role: 'user' as const, content: toolResultBlocks },
  ];

  yield* agentLoop({ ...params, messages: newMessages });
}

9. 错误处理策略

9.1 工具级错误处理

错误类型 处理方式 返回给 Coordinator
超时 超过 timeoutMs "工具执行超时,请重试或使用其他方式获取信息。"
API 错误 (knowledge-service 不可用) 捕获异常 "知识库服务暂时不可用,请根据已知信息回答。"
Agent 内部错误 (Specialist Claude API 失败) 捕获异常 "专家 Agent 暂时不可用: {error}. 请直接回答用户。"
输入验证失败 早期返回 "工具输入参数无效: {details}"
Rate Limit 指数退避重试 1 次 重试成功则正常返回,否则返回错误
未知工具 跳过 "未知工具: {toolName}"

9.2 批次级错误处理

/**
 * Promise.allSettled 确保批次中一个工具的失败不会导致整个批次失败。
 *
 * 示例:
 * Batch = [invoke_policy_expert, invoke_case_analyst]
 *
 * 如果 invoke_policy_expert 超时:
 * - invoke_case_analyst 正常返回其结果
 * - invoke_policy_expert 返回错误消息
 * - Coordinator 根据有效的 case_analyst 结果继续生成回复
 *
 * Coordinator 的 Prompt 中包含指引:
 * "如果某个工具调用返回错误,请根据其他可用信息继续回复用户,
 *  不要向用户暴露技术错误细节。"
 */

9.3 Agent 执行重试

/**
 * Specialist Agent 的重试策略(在 base-specialist.service.ts 中)
 */
private async callClaudeWithRetry(
  params: Anthropic.MessageCreateParams,
  maxRetries: number = 1,
): Promise<Anthropic.Message> {
  let lastError: Error | null = null;

  for (let attempt = 0; attempt <= maxRetries; attempt++) {
    try {
      return await this.client.messages.create(params);
    } catch (error) {
      lastError = error instanceof Error ? error : new Error(String(error));

      if (this.isRateLimitError(error) && attempt < maxRetries) {
        const retryAfter = this.extractRetryAfter(error);
        await this.sleep(retryAfter || 1000 * (attempt + 1));
        continue;
      }

      if (this.isOverloadedError(error) && attempt < maxRetries) {
        // 降级到 Haiku 模型
        return await this.client.messages.create({
          ...params,
          model: 'claude-haiku-4-20250514',
        });
      }

      throw error;
    }
  }

  throw lastError;
}

10. 性能优化

10.1 并行度分析

典型场景 1: 用户问政策 + 系统需要保存记忆
  tool_uses: [invoke_policy_expert, invoke_memory_manager]

  顺序执行: 3s + 2s = 5s
  并发执行: invoke_memory_manager 不是 safe但在 invoke_policy_expert 之后
            → Batch [invoke_policy_expert] (3s) → invoke_memory_manager (2s) = 5s
            这种情况并发无优势

典型场景 2: 评估用户 + 查找案例 + 获取策略建议
  tool_uses: [invoke_assessment_expert, invoke_case_analyst, invoke_strategist]

  顺序执行: 4s + 2s + 2s = 8s
  并发执行: 全部 safe → Batch 并行 = max(4s, 2s, 2s) = 4s
            节省 50% 时间!

典型场景 3: 保存记忆 + 评估 + 生成支付
  tool_uses: [save_user_memory, invoke_assessment_expert, generate_payment]

  顺序执行: 1s + 4s + 2s = 7s
  我们的算法:
    save_user_memory (not safe) → 单独执行 (1s)
    invoke_assessment_expert (safe) → Batch B
    generate_payment (not safe) → 先执行 Batch B (4s), 再单独 (2s)
    总计: 1s + 4s + 2s = 7s (这种情况并发无优势)

10.2 超时配置建议

工具类型 建议超时 原因
Agent 调用 (Sonnet) 30s Sonnet 单次调用约 3-8sAgent 最多 3 轮
Agent 调用 (Haiku) 15s Haiku 更快
search_knowledge 10s HTTP + 向量检索
save_user_memory 5s HTTP + 写入
generate_payment 10s 外部支付 API
get_current_datetime 1s 本地计算
web_search 15s 外部搜索 API

10.3 Token 成本追踪

/**
 * 在 ToolExecutionQueue 层面追踪所有 Agent 调用的 token 开销
 */
interface AgentCostSummary {
  agentName: string;
  inputTokens: number;
  outputTokens: number;
  estimatedCostUsd: number;
  turnsUsed: number;
  durationMs: number;
}

/**
 * 每次 executeAll 后可以获取本轮的成本汇总
 */
getCostSummary(results: ToolExecutionResult[]): AgentCostSummary[] {
  return results
    .filter(r => r.output.agentMetadata)
    .map(r => {
      const meta = r.output.agentMetadata!;
      const inputCost = meta.tokensUsed.input * 0.003 / 1000; // Sonnet input price
      const outputCost = meta.tokensUsed.output * 0.015 / 1000; // Sonnet output price
      return {
        agentName: meta.agentName,
        inputTokens: meta.tokensUsed.input,
        outputTokens: meta.tokensUsed.output,
        estimatedCostUsd: inputCost + outputCost,
        turnsUsed: meta.turnsUsed,
        durationMs: meta.durationMs,
      };
    });
}

11. 工具注册示例

// tool-registry.ts - 工具注册

export function registerAllTools(
  queue: ToolExecutionQueue,
  specialists: SpecialistServices,
  directTools: DirectToolServices,
): void {

  // ======== Agent 调用工具 ========

  queue.registerTool('invoke_policy_expert', {
    tool: {
      name: 'invoke_policy_expert',
      description: '调用政策专家 Agent查询香港移民政策的详细信息、条件要求、申请流程等。',
      input_schema: {
        type: 'object',
        properties: {
          query: { type: 'string', description: '要查询的政策问题' },
          category: { type: 'string', description: '移民类别可选TTPS/QMAS/GEP/IANG/CIES/TECHTAS' },
        },
        required: ['query'],
      },
    },
    isConcurrencySafe: true,
    toolType: 'agent_invocation',
    timeoutMs: 30000,
    estimatedDurationMs: 5000,
    executor: createAgentExecutor(specialists.policyExpert),
  });

  queue.registerTool('invoke_assessment_expert', {
    tool: {
      name: 'invoke_assessment_expert',
      description: '调用评估专家 Agent根据用户信息评估移民资格和推荐方案。',
      input_schema: {
        type: 'object',
        properties: {
          userInfo: { type: 'object', description: '用户信息age, education, workYears 等)' },
          targetCategories: {
            type: 'array',
            items: { type: 'string' },
            description: '要评估的目标类别(可选,默认评估所有)',
          },
        },
        required: ['userInfo'],
      },
    },
    isConcurrencySafe: true,
    toolType: 'agent_invocation',
    timeoutMs: 30000,
    estimatedDurationMs: 6000,
    executor: createAgentExecutor(specialists.assessmentExpert),
  });

  queue.registerTool('invoke_memory_manager', {
    tool: {
      name: 'invoke_memory_manager',
      description: '调用记忆管理 Agent提取和保存用户信息到长期记忆。',
      input_schema: {
        type: 'object',
        properties: {
          action: { type: 'string', enum: ['extract', 'save', 'load'] },
          userMessage: { type: 'string', description: '用户消息extract 时必须)' },
          assistantMessage: { type: 'string', description: '助手回复extract 时必须)' },
          memoryData: { type: 'object', description: '要保存的记忆save 时必须)' },
        },
        required: ['action'],
      },
    },
    isConcurrencySafe: false,  // 可能写入!
    toolType: 'agent_invocation',
    timeoutMs: 15000,
    estimatedDurationMs: 3000,
    executor: createAgentExecutor(specialists.memoryManager),
  });

  // ======== 直接工具 ========

  queue.registerTool('generate_payment', {
    tool: {
      name: 'generate_payment',
      description: '生成付费评估服务的支付链接。',
      input_schema: {
        type: 'object',
        properties: {
          service: { type: 'string', enum: ['detailed_assessment'] },
          userId: { type: 'string' },
        },
        required: ['service', 'userId'],
      },
    },
    isConcurrencySafe: false,  // 产生订单,有副作用
    toolType: 'direct_tool',
    timeoutMs: 10000,
    estimatedDurationMs: 2000,
    executor: directTools.generatePayment,
  });

  queue.registerTool('get_current_datetime', {
    tool: {
      name: 'get_current_datetime',
      description: '获取当前日期和时间。',
      input_schema: { type: 'object', properties: {} },
    },
    isConcurrencySafe: true,
    toolType: 'direct_tool',
    timeoutMs: 1000,
    estimatedDurationMs: 5,
    executor: async () => ({
      content: new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Hong_Kong' }),
      success: true,
    }),
  });

  // ... 注册其他工具
}

12. 与旧架构的对比

方面 旧架构 (ClaudeAgentServiceV2) 新架构 (ToolExecutionQueue)
工具执行 for 循环串行逐个执行 智能分批safe 并行unsafe 串行
执行超时 无超时控制 每个工具独立超时
错误隔离 一个工具失败 → 整个循环中断 一个失败不影响其他
事件通知 无实时进度 完整的 QueueEvent 流
Agent 子调用 N/A无 Agent 概念) 每个 Agent 独立 mini-loop
成本追踪 仅追踪 Coordinator 的 tokens 追踪每个 Agent 的 tokens
扩展性 executeTool() switch 语句 registerTool() 声明式注册