iconsulting/docs/architecture/09-tool-execution.md

1120 lines
34 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 09 - 工具并发执行系统 (Tool Execution System)
## 1. 设计哲学
借鉴 Claude Code 的 `ToolExecutionQueue`Q80 class设计
> **并发安全的工具并行执行,串行约束的工具顺序执行。**
当 Coordinator Agent 在一次回复中返回多个 `tool_use` blocks 时,系统需要智能地决定哪些工具可以并行执行、哪些必须串行等待。这直接影响用户体验(响应延迟)和系统正确性(数据一致性)。
核心规则:
1. **读操作并行**`search_knowledge`、`invoke_*`Agent 调用)等纯读操作可以并行
2. **写操作串行**`save_user_memory`、`generate_payment` 等有副作用的操作必须串行
3. **结果有序**:无论执行顺序如何,结果按 Claude 返回的 `tool_use` 顺序排列
4. **错误隔离**:一个工具失败不影响其他工具的执行
## 2. 并发安全性声明
### 2.1 工具并发安全性矩阵
| 工具名 | 类型 | isConcurrencySafe | 原因 |
|--------|------|-------------------|------|
| `invoke_policy_expert` | Agent 调用 | `true` | 纯读,独立 API 调用 |
| `invoke_assessment_expert` | Agent 调用 | `true` | 纯读,独立 API 调用 |
| `invoke_strategist` | Agent 调用 | `true` | 纯读,独立 API 调用 |
| `invoke_objection_handler` | Agent 调用 | `true` | 纯读,独立 API 调用 |
| `invoke_case_analyst` | Agent 调用 | `true` | 纯读,独立 API 调用 |
| `invoke_memory_manager` | Agent 调用 | `false` | **可能写入记忆** |
| `search_knowledge` | 直接工具 | `true` | 纯读 |
| `get_user_context` | 直接工具 | `true` | 纯读 |
| `save_user_memory` | 直接工具 | `false` | **写操作** |
| `generate_payment` | 直接工具 | `false` | **产生支付订单,有副作用** |
| `get_current_datetime` | 直接工具 | `true` | 纯计算 |
| `web_search` | 直接工具 | `true` | 纯读,外部 API |
| `get_exchange_rate` | 直接工具 | `true` | 纯读 |
| `check_off_topic` | 直接工具 | `true` | 纯读 |
### 2.2 并发安全判定规则
```typescript
/**
* 判断工具是否并发安全的规则:
*
* 1. Agent 调用类 (invoke_*):
* - 默认并发安全(各 Agent 独立运行)
* - 例外: invoke_memory_manager因为它可能调用 save_user_memory
*
* 2. 读操作类:
* - search_*, get_*, check_* → 并发安全
*
* 3. 写操作类:
* - save_*, update_*, delete_* → 非并发安全
*
* 4. 副作用类:
* - generate_payment, send_notification → 非并发安全
*/
```
## 3. TypeScript 类型定义
```typescript
// tool-execution.types.ts
import Anthropic from '@anthropic-ai/sdk';
/**
* 工具定义(扩展 Claude 的 Tool 类型)
*/
export interface ToolDefinition {
/** Claude API 工具定义 */
tool: Anthropic.Tool;
/** 是否可以与其他工具并发执行 */
isConcurrencySafe: boolean;
/** 工具类型agent 调用 vs 直接工具 */
toolType: 'agent_invocation' | 'direct_tool';
/** 执行超时时间 (ms) */
timeoutMs: number;
/** 执行器函数 */
executor: (input: Record<string, unknown>) => Promise<ToolExecutionOutput>;
/** 预估耗时 (ms),用于前端显示和排序 */
estimatedDurationMs: number;
}
/**
* 工具执行输出
*/
export interface ToolExecutionOutput {
/** 返回给 Claude 的文本结果 */
content: string;
/** 执行是否成功 */
success: boolean;
/** 错误信息(如果失败) */
error?: string;
/** Agent 执行元数据(仅 agent_invocation 类型) */
agentMetadata?: {
agentName: string;
turnsUsed: number;
tokensUsed: { input: number; output: number };
toolCallsMade: string[];
durationMs: number;
};
}
/**
* 工具执行请求(从 Claude 返回的 tool_use block
*/
export interface ToolExecutionRequest {
/** Claude 分配的 tool_use ID */
toolUseId: string;
/** 工具名称 */
toolName: string;
/** 工具输入参数 */
input: Record<string, unknown>;
/** 在 tool_use 数组中的原始位置(用于结果排序) */
originalIndex: number;
}
/**
* 工具执行结果
*/
export interface ToolExecutionResult {
/** Claude 分配的 tool_use ID */
toolUseId: string;
/** 工具名称 */
toolName: string;
/** 原始位置 */
originalIndex: number;
/** 执行输出 */
output: ToolExecutionOutput;
/** 执行耗时 (ms) */
durationMs: number;
/** 开始时间 */
startedAt: number;
/** 结束时间 */
completedAt: number;
}
/**
* 队列状态事件(用于 stream 通知前端)
*/
export type QueueEvent =
| { type: 'tool_queued'; toolName: string; toolUseId: string; position: number }
| { type: 'tool_started'; toolName: string; toolUseId: string }
| { type: 'tool_completed'; toolName: string; toolUseId: string; durationMs: number; success: boolean }
| { type: 'tool_error'; toolName: string; toolUseId: string; error: string }
| { type: 'agent_started'; agentName: string; toolUseId: string }
| { type: 'agent_progress'; agentName: string; toolUseId: string; turn: number; maxTurns: number }
| { type: 'agent_completed'; agentName: string; toolUseId: string; durationMs: number }
| { type: 'queue_drained' };
```
## 4. ToolExecutionQueue 完整实现
```typescript
// tool-execution-queue.ts
import { Injectable, Logger } from '@nestjs/common';
import {
ToolDefinition,
ToolExecutionRequest,
ToolExecutionResult,
ToolExecutionOutput,
QueueEvent,
} from './tool-execution.types';
/**
* 工具并发执行队列
*
* 核心算法:
* 1. 接收一组 tool_use requests
* 2. 按原始顺序遍历
* 3. 将连续的 concurrency-safe 工具收集到一个 "并发批次"
* 4. 遇到非 safe 工具时:
* a. 先执行await当前并发批次
* b. 再单独执行非 safe 工具
* 5. 继续处理剩余工具
* 6. 最终按 originalIndex 排序返回所有结果
*/
@Injectable()
export class ToolExecutionQueue {
private readonly logger = new Logger(ToolExecutionQueue.name);
/** 已注册的工具定义 */
private toolRegistry = new Map<string, ToolDefinition>();
/** 事件回调(用于 stream 通知) */
private eventCallback?: (event: QueueEvent) => void;
/**
* 注册工具定义
*/
registerTool(name: string, definition: ToolDefinition): void {
this.toolRegistry.set(name, definition);
this.logger.log(`Registered tool: ${name} (concurrencySafe: ${definition.isConcurrencySafe})`);
}
/**
* 批量注册工具
*/
registerTools(tools: Record<string, ToolDefinition>): void {
for (const [name, def] of Object.entries(tools)) {
this.registerTool(name, def);
}
}
/**
* 设置事件回调(用于实时通知前端)
*/
onEvent(callback: (event: QueueEvent) => void): void {
this.eventCallback = callback;
}
/**
* 执行所有工具
*
* 这是主入口方法。接收 Claude 返回的所有 tool_use blocks
* 按并发安全性分组执行,返回有序的结果。
*
* @param requests - 从 Claude 响应中提取的 tool_use 请求列表
* @returns 按 originalIndex 排序的执行结果
*/
async executeAll(requests: ToolExecutionRequest[]): Promise<ToolExecutionResult[]> {
if (requests.length === 0) return [];
this.logger.log(`Executing ${requests.length} tools: ${requests.map(r => r.toolName).join(', ')}`);
const allResults: ToolExecutionResult[] = [];
// ======== 核心算法:分批执行 ========
let currentBatch: ToolExecutionRequest[] = [];
for (const request of requests) {
const definition = this.toolRegistry.get(request.toolName);
if (!definition) {
// 未注册的工具 → 返回错误结果
allResults.push(this.createErrorResult(request, `Unknown tool: ${request.toolName}`));
continue;
}
this.emitEvent({ type: 'tool_queued', toolName: request.toolName, toolUseId: request.toolUseId, position: request.originalIndex });
if (definition.isConcurrencySafe) {
// 并发安全 → 加入当前批次
currentBatch.push(request);
} else {
// 非并发安全 → 先执行当前批次,再单独执行此工具
// 1. 执行当前并发批次
if (currentBatch.length > 0) {
const batchResults = await this.executeConcurrentBatch(currentBatch);
allResults.push(...batchResults);
currentBatch = [];
}
// 2. 单独执行非安全工具(串行)
const result = await this.executeSingleTool(request, definition);
allResults.push(result);
}
}
// 执行最后一个并发批次(如果有)
if (currentBatch.length > 0) {
const batchResults = await this.executeConcurrentBatch(currentBatch);
allResults.push(...batchResults);
}
// 按 originalIndex 排序
allResults.sort((a, b) => a.originalIndex - b.originalIndex);
this.emitEvent({ type: 'queue_drained' });
this.logger.log(`All ${allResults.length} tools completed`);
return allResults;
}
/**
* 并行执行一批并发安全的工具
*/
private async executeConcurrentBatch(batch: ToolExecutionRequest[]): Promise<ToolExecutionResult[]> {
if (batch.length === 0) return [];
if (batch.length === 1) {
// 单个工具不需要 Promise.all
const def = this.toolRegistry.get(batch[0].toolName)!;
return [await this.executeSingleTool(batch[0], def)];
}
this.logger.log(`Executing concurrent batch: ${batch.map(r => r.toolName).join(', ')}`);
// 并行执行所有工具
const promises = batch.map(request => {
const definition = this.toolRegistry.get(request.toolName)!;
return this.executeSingleTool(request, definition);
});
// Promise.allSettled 确保一个失败不影响其他
const settled = await Promise.allSettled(promises);
return settled.map((result, index) => {
if (result.status === 'fulfilled') {
return result.value;
} else {
return this.createErrorResult(batch[index], `Execution failed: ${result.reason}`);
}
});
}
/**
* 执行单个工具
*/
private async executeSingleTool(
request: ToolExecutionRequest,
definition: ToolDefinition,
): Promise<ToolExecutionResult> {
const startedAt = Date.now();
// 通知Agent 调用开始
if (definition.toolType === 'agent_invocation') {
const agentName = request.toolName.replace('invoke_', '');
this.emitEvent({ type: 'agent_started', agentName, toolUseId: request.toolUseId });
}
this.emitEvent({ type: 'tool_started', toolName: request.toolName, toolUseId: request.toolUseId });
try {
// 带超时的执行
const output = await this.executeWithTimeout(
definition.executor(request.input),
definition.timeoutMs,
request.toolName,
);
const completedAt = Date.now();
const durationMs = completedAt - startedAt;
// 通知:完成
this.emitEvent({
type: 'tool_completed',
toolName: request.toolName,
toolUseId: request.toolUseId,
durationMs,
success: output.success,
});
if (definition.toolType === 'agent_invocation') {
const agentName = request.toolName.replace('invoke_', '');
this.emitEvent({ type: 'agent_completed', agentName, toolUseId: request.toolUseId, durationMs });
}
this.logger.log(`Tool ${request.toolName} completed in ${durationMs}ms (success: ${output.success})`);
return {
toolUseId: request.toolUseId,
toolName: request.toolName,
originalIndex: request.originalIndex,
output,
durationMs,
startedAt,
completedAt,
};
} catch (error) {
const completedAt = Date.now();
const durationMs = completedAt - startedAt;
const errorMessage = error instanceof Error ? error.message : String(error);
this.logger.error(`Tool ${request.toolName} failed after ${durationMs}ms: ${errorMessage}`);
this.emitEvent({
type: 'tool_error',
toolName: request.toolName,
toolUseId: request.toolUseId,
error: errorMessage,
});
return {
toolUseId: request.toolUseId,
toolName: request.toolName,
originalIndex: request.originalIndex,
output: {
content: `工具执行失败: ${errorMessage}。请重试或使用其他方式获取信息。`,
success: false,
error: errorMessage,
},
durationMs,
startedAt,
completedAt,
};
}
}
/**
* 带超时的执行
*/
private async executeWithTimeout(
promise: Promise<ToolExecutionOutput>,
timeoutMs: number,
toolName: string,
): Promise<ToolExecutionOutput> {
return new Promise<ToolExecutionOutput>((resolve, reject) => {
const timer = setTimeout(() => {
reject(new Error(`Tool ${toolName} timed out after ${timeoutMs}ms`));
}, timeoutMs);
promise
.then(result => {
clearTimeout(timer);
resolve(result);
})
.catch(error => {
clearTimeout(timer);
reject(error);
});
});
}
/**
* 创建错误结果
*/
private createErrorResult(request: ToolExecutionRequest, error: string): ToolExecutionResult {
return {
toolUseId: request.toolUseId,
toolName: request.toolName,
originalIndex: request.originalIndex,
output: {
content: `错误: ${error}`,
success: false,
error,
},
durationMs: 0,
startedAt: Date.now(),
completedAt: Date.now(),
};
}
/**
* 发送事件
*/
private emitEvent(event: QueueEvent): void {
if (this.eventCallback) {
try {
this.eventCallback(event);
} catch (e) {
this.logger.error(`Event callback error: ${e}`);
}
}
}
/**
* 获取已注册工具列表(用于 Claude API
*/
getClaudeTools(): Anthropic.Tool[] {
return Array.from(this.toolRegistry.values()).map(def => def.tool);
}
/**
* 获取工具定义(用于调试)
*/
getToolDefinition(name: string): ToolDefinition | undefined {
return this.toolRegistry.get(name);
}
}
```
## 5. Agent 调用执行器
当工具类型是 `agent_invocation`executor 需要创建一个新的 Claude API 调用,运行 Specialist Agent 的 mini-loop。
```typescript
// agent-executor.ts
import Anthropic from '@anthropic-ai/sdk';
import { ToolExecutionOutput } from './tool-execution.types';
import { BaseSpecialistService } from '../specialists/base-specialist.service';
/**
* Agent 调用执行器工厂
*
* 为每个 Specialist Agent 创建一个 executor 函数,
* 该函数被注册到 ToolExecutionQueue 中。
*/
export function createAgentExecutor(
specialist: BaseSpecialistService,
eventCallback?: (event: { turn: number; maxTurns: number }) => void,
): (input: Record<string, unknown>) => Promise<ToolExecutionOutput> {
return async (input: Record<string, unknown>): Promise<ToolExecutionOutput> => {
const startTime = Date.now();
let totalInputTokens = 0;
let totalOutputTokens = 0;
const toolCallsMade: string[] = [];
try {
// Specialist Agent 的 mini-loop最多 3 轮)
const result = await specialist.execute({
input,
maxTurns: 3,
onProgress: (turn, maxTurns) => {
eventCallback?.({ turn, maxTurns });
},
onTokenUsage: (input, output) => {
totalInputTokens += input;
totalOutputTokens += output;
},
onToolCall: (toolName) => {
toolCallsMade.push(toolName);
},
});
return {
content: result.output,
success: true,
agentMetadata: {
agentName: specialist.name,
turnsUsed: result.turnsUsed,
tokensUsed: { input: totalInputTokens, output: totalOutputTokens },
toolCallsMade,
durationMs: Date.now() - startTime,
},
};
} catch (error) {
return {
content: `Agent ${specialist.name} 执行失败: ${error instanceof Error ? error.message : String(error)}`,
success: false,
error: error instanceof Error ? error.message : String(error),
agentMetadata: {
agentName: specialist.name,
turnsUsed: 0,
tokensUsed: { input: totalInputTokens, output: totalOutputTokens },
toolCallsMade,
durationMs: Date.now() - startTime,
},
};
}
};
}
```
## 6. BaseSpecialistService 抽象类
```typescript
// base-specialist.service.ts
import Anthropic from '@anthropic-ai/sdk';
/**
* Specialist Agent 执行参数
*/
export interface SpecialistExecuteParams {
input: Record<string, unknown>;
maxTurns: number;
onProgress?: (turn: number, maxTurns: number) => void;
onTokenUsage?: (inputTokens: number, outputTokens: number) => void;
onToolCall?: (toolName: string) => void;
}
/**
* Specialist Agent 执行结果
*/
export interface SpecialistExecuteResult {
output: string; // 返回给 Coordinator 的文本
turnsUsed: number;
metadata?: Record<string, unknown>;
}
/**
* Specialist Agent 基类
*
* 每个 Specialist Agent 都继承此基类。
* 基类提供:
* 1. 标准的 mini-loop最多 N 轮)
* 2. 工具执行
* 3. Token 追踪
* 4. 错误处理
*/
export abstract class BaseSpecialistService {
/** Agent 名称 */
abstract readonly name: string;
/** 使用的模型 */
abstract readonly model: string;
/** Agent 的 system prompt */
abstract readonly systemPrompt: string;
/** Agent 可用的工具 */
abstract readonly tools: Anthropic.Tool[];
/** 最大输出 tokens */
abstract readonly maxTokens: number;
protected client: Anthropic;
constructor(client: Anthropic) {
this.client = client;
}
/**
* 执行 Agent 的 mini-loop
*
* 标准流程:
* 1. 构建初始 messages基于 input 参数)
* 2. 调用 Claude API
* 3. 如果有 tool_use → 执行工具 → 递归
* 4. 无 tool_use → 返回最终文本
* 5. 达到 maxTurns → 强制返回当前结果
*/
async execute(params: SpecialistExecuteParams): Promise<SpecialistExecuteResult> {
const messages: Anthropic.MessageParam[] = this.buildInitialMessages(params.input);
let turnsUsed = 0;
while (turnsUsed < params.maxTurns) {
turnsUsed++;
params.onProgress?.(turnsUsed, params.maxTurns);
// 调用 Claude API非流式Specialist 不需要流式输出)
const response = await this.client.messages.create({
model: this.model,
max_tokens: this.maxTokens,
system: this.systemPrompt,
messages,
tools: this.tools.length > 0 ? this.tools : undefined,
});
// 记录 token 使用
params.onTokenUsage?.(
response.usage.input_tokens,
response.usage.output_tokens,
);
// 提取文本和 tool_use
const textBlocks = response.content.filter(b => b.type === 'text');
const toolUses = response.content.filter(b => b.type === 'tool_use');
// 无工具调用 → 返回文本
if (toolUses.length === 0) {
const outputText = textBlocks.map(b => (b as Anthropic.TextBlock).text).join('\n');
return {
output: outputText,
turnsUsed,
metadata: this.extractMetadata(outputText),
};
}
// 有工具调用 → 执行工具
messages.push({ role: 'assistant', content: response.content });
const toolResults: Anthropic.ToolResultBlockParam[] = [];
for (const toolUse of toolUses) {
const tu = toolUse as Anthropic.ToolUseBlock;
params.onToolCall?.(tu.name);
const result = await this.executeSpecialistTool(
tu.name,
tu.input as Record<string, unknown>,
);
toolResults.push({
type: 'tool_result',
tool_use_id: tu.id,
content: typeof result === 'string' ? result : JSON.stringify(result),
});
}
messages.push({ role: 'user', content: toolResults });
}
// 达到 maxTurns强制返回最后的文本
return {
output: `[Agent ${this.name} 达到最大轮次 (${params.maxTurns}),以下是当前结果]\n${this.extractLastText(messages)}`,
turnsUsed,
};
}
/**
* 构建初始 messages
* 子类可以覆盖以定制输入格式
*/
protected buildInitialMessages(input: Record<string, unknown>): Anthropic.MessageParam[] {
return [{
role: 'user',
content: JSON.stringify(input, null, 2),
}];
}
/**
* 执行 Specialist 自己的工具
* 子类必须实现
*/
protected abstract executeSpecialistTool(
toolName: string,
input: Record<string, unknown>,
): Promise<unknown>;
/**
* 从输出中提取结构化元数据
* 子类可选覆盖
*/
protected extractMetadata(output: string): Record<string, unknown> | undefined {
return undefined;
}
/**
* 提取 messages 中最后一条文本
*/
private extractLastText(messages: Anthropic.MessageParam[]): string {
for (let i = messages.length - 1; i >= 0; i--) {
const msg = messages[i];
if (msg.role === 'assistant') {
if (typeof msg.content === 'string') return msg.content;
if (Array.isArray(msg.content)) {
const textBlocks = msg.content.filter((b: any) => b.type === 'text');
return textBlocks.map((b: any) => b.text).join('\n');
}
}
}
return '(无输出)';
}
}
```
## 7. 执行流程图
```
Claude Coordinator 返回 tool_use blocks:
[invoke_policy_expert, invoke_case_analyst, save_user_memory, invoke_assessment_expert]
ToolExecutionQueue.executeAll():
Step 1: 遍历 tool_use 列表
├─ invoke_policy_expert → isConcurrencySafe: true → 加入 Batch A
├─ invoke_case_analyst → isConcurrencySafe: true → 加入 Batch A
├─ save_user_memory → isConcurrencySafe: false → 触发排空!
│ ├─ 先执行 Batch A并行:
│ │ ├─ invoke_policy_expert ──→ PolicyExpert.execute() ──→ Claude API
│ │ └─ invoke_case_analyst ──→ CaseAnalyst.execute() ──→ Claude API
│ │ ↑ 这两个并行执行!
│ ├─ Batch A 完成
│ └─ 再执行 save_user_memory串行:
│ └─ knowledgeClient.saveUserMemory() ──→ knowledge-service HTTP
└─ invoke_assessment_expert → isConcurrencySafe: true → 加入 Batch B
Step 2: 执行最后的 Batch B:
└─ invoke_assessment_expert ──→ AssessmentExpert.execute() ──→ Claude API
Step 3: 按 originalIndex 排序结果:
[0] invoke_policy_expert → "高才通B类要求..."
[1] invoke_case_analyst → "类似案例:张先生..."
[2] save_user_memory → "已保存用户信息"
[3] invoke_assessment_expert → "评估结果: {score: 75, ...}"
Step 4: 返回排序后的结果 → Coordinator 继续处理
```
## 8. 与 Agent Loop 的集成
```typescript
// coordinator-agent.service.ts (片段)
// 在 agentLoop 中使用 ToolExecutionQueue
async function* agentLoop(params: AgentLoopParams): AsyncGenerator<StreamEvent> {
// ... (Step 1-4: context injection, API call, stream collection)
// Step 5: 提取 tool_use blocks
const toolUses = assistantBlocks.filter(b => b.type === 'tool_use');
if (toolUses.length === 0) {
return; // 自然终止
}
// Step 6: 构建执行请求
const requests: ToolExecutionRequest[] = toolUses.map((tu, index) => ({
toolUseId: (tu as Anthropic.ToolUseBlock).id,
toolName: (tu as Anthropic.ToolUseBlock).name,
input: (tu as Anthropic.ToolUseBlock).input as Record<string, unknown>,
originalIndex: index,
}));
// Step 7: 设置事件回调 → 转发到前端
toolExecutionQueue.onEvent((event) => {
switch (event.type) {
case 'agent_started':
// yield 不能在回调中用,需要通过 EventEmitter 或 channel
streamChannel.push({ type: 'agent_start', agentName: event.agentName });
break;
case 'agent_completed':
streamChannel.push({ type: 'agent_complete', agentName: event.agentName, durationMs: event.durationMs });
break;
case 'tool_error':
streamChannel.push({ type: 'tool_error', toolName: event.toolName, error: event.error });
break;
}
});
// Step 8: 执行所有工具
const results = await toolExecutionQueue.executeAll(requests);
// Step 9: 构建 tool_results 消息
const toolResultBlocks: Anthropic.ToolResultBlockParam[] = results.map(r => ({
type: 'tool_result' as const,
tool_use_id: r.toolUseId,
content: r.output.content,
is_error: !r.output.success,
}));
// Step 10: 递归 — 将结果喂回 Coordinator
const newMessages = [
...params.messages,
{ role: 'assistant' as const, content: assistantBlocks },
{ role: 'user' as const, content: toolResultBlocks },
];
yield* agentLoop({ ...params, messages: newMessages });
}
```
## 9. 错误处理策略
### 9.1 工具级错误处理
| 错误类型 | 处理方式 | 返回给 Coordinator |
|----------|---------|-------------------|
| **超时** | 超过 `timeoutMs` | `"工具执行超时,请重试或使用其他方式获取信息。"` |
| **API 错误** (knowledge-service 不可用) | 捕获异常 | `"知识库服务暂时不可用,请根据已知信息回答。"` |
| **Agent 内部错误** (Specialist Claude API 失败) | 捕获异常 | `"专家 Agent 暂时不可用: {error}. 请直接回答用户。"` |
| **输入验证失败** | 早期返回 | `"工具输入参数无效: {details}"` |
| **Rate Limit** | 指数退避重试 1 次 | 重试成功则正常返回,否则返回错误 |
| **未知工具** | 跳过 | `"未知工具: {toolName}"` |
### 9.2 批次级错误处理
```typescript
/**
* Promise.allSettled 确保批次中一个工具的失败不会导致整个批次失败。
*
* 示例:
* Batch = [invoke_policy_expert, invoke_case_analyst]
*
* 如果 invoke_policy_expert 超时:
* - invoke_case_analyst 正常返回其结果
* - invoke_policy_expert 返回错误消息
* - Coordinator 根据有效的 case_analyst 结果继续生成回复
*
* Coordinator 的 Prompt 中包含指引:
* "如果某个工具调用返回错误,请根据其他可用信息继续回复用户,
* 不要向用户暴露技术错误细节。"
*/
```
### 9.3 Agent 执行重试
```typescript
/**
* Specialist Agent 的重试策略(在 base-specialist.service.ts 中)
*/
private async callClaudeWithRetry(
params: Anthropic.MessageCreateParams,
maxRetries: number = 1,
): Promise<Anthropic.Message> {
let lastError: Error | null = null;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await this.client.messages.create(params);
} catch (error) {
lastError = error instanceof Error ? error : new Error(String(error));
if (this.isRateLimitError(error) && attempt < maxRetries) {
const retryAfter = this.extractRetryAfter(error);
await this.sleep(retryAfter || 1000 * (attempt + 1));
continue;
}
if (this.isOverloadedError(error) && attempt < maxRetries) {
// 降级到 Haiku 模型
return await this.client.messages.create({
...params,
model: 'claude-haiku-4-20250514',
});
}
throw error;
}
}
throw lastError;
}
```
## 10. 性能优化
### 10.1 并行度分析
```
典型场景 1: 用户问政策 + 系统需要保存记忆
tool_uses: [invoke_policy_expert, invoke_memory_manager]
顺序执行: 3s + 2s = 5s
并发执行: invoke_memory_manager 不是 safe但在 invoke_policy_expert 之后
→ Batch [invoke_policy_expert] (3s) → invoke_memory_manager (2s) = 5s
这种情况并发无优势
典型场景 2: 评估用户 + 查找案例 + 获取策略建议
tool_uses: [invoke_assessment_expert, invoke_case_analyst, invoke_strategist]
顺序执行: 4s + 2s + 2s = 8s
并发执行: 全部 safe → Batch 并行 = max(4s, 2s, 2s) = 4s
节省 50% 时间!
典型场景 3: 保存记忆 + 评估 + 生成支付
tool_uses: [save_user_memory, invoke_assessment_expert, generate_payment]
顺序执行: 1s + 4s + 2s = 7s
我们的算法:
save_user_memory (not safe) → 单独执行 (1s)
invoke_assessment_expert (safe) → Batch B
generate_payment (not safe) → 先执行 Batch B (4s), 再单独 (2s)
总计: 1s + 4s + 2s = 7s (这种情况并发无优势)
```
### 10.2 超时配置建议
| 工具类型 | 建议超时 | 原因 |
|----------|---------|------|
| Agent 调用 (Sonnet) | 30s | Sonnet 单次调用约 3-8sAgent 最多 3 轮 |
| Agent 调用 (Haiku) | 15s | Haiku 更快 |
| search_knowledge | 10s | HTTP + 向量检索 |
| save_user_memory | 5s | HTTP + 写入 |
| generate_payment | 10s | 外部支付 API |
| get_current_datetime | 1s | 本地计算 |
| web_search | 15s | 外部搜索 API |
### 10.3 Token 成本追踪
```typescript
/**
* 在 ToolExecutionQueue 层面追踪所有 Agent 调用的 token 开销
*/
interface AgentCostSummary {
agentName: string;
inputTokens: number;
outputTokens: number;
estimatedCostUsd: number;
turnsUsed: number;
durationMs: number;
}
/**
* 每次 executeAll 后可以获取本轮的成本汇总
*/
getCostSummary(results: ToolExecutionResult[]): AgentCostSummary[] {
return results
.filter(r => r.output.agentMetadata)
.map(r => {
const meta = r.output.agentMetadata!;
const inputCost = meta.tokensUsed.input * 0.003 / 1000; // Sonnet input price
const outputCost = meta.tokensUsed.output * 0.015 / 1000; // Sonnet output price
return {
agentName: meta.agentName,
inputTokens: meta.tokensUsed.input,
outputTokens: meta.tokensUsed.output,
estimatedCostUsd: inputCost + outputCost,
turnsUsed: meta.turnsUsed,
durationMs: meta.durationMs,
};
});
}
```
## 11. 工具注册示例
```typescript
// tool-registry.ts - 工具注册
export function registerAllTools(
queue: ToolExecutionQueue,
specialists: SpecialistServices,
directTools: DirectToolServices,
): void {
// ======== Agent 调用工具 ========
queue.registerTool('invoke_policy_expert', {
tool: {
name: 'invoke_policy_expert',
description: '调用政策专家 Agent查询香港移民政策的详细信息、条件要求、申请流程等。',
input_schema: {
type: 'object',
properties: {
query: { type: 'string', description: '要查询的政策问题' },
category: { type: 'string', description: '移民类别可选TTPS/QMAS/GEP/IANG/CIES/TECHTAS' },
},
required: ['query'],
},
},
isConcurrencySafe: true,
toolType: 'agent_invocation',
timeoutMs: 30000,
estimatedDurationMs: 5000,
executor: createAgentExecutor(specialists.policyExpert),
});
queue.registerTool('invoke_assessment_expert', {
tool: {
name: 'invoke_assessment_expert',
description: '调用评估专家 Agent根据用户信息评估移民资格和推荐方案。',
input_schema: {
type: 'object',
properties: {
userInfo: { type: 'object', description: '用户信息age, education, workYears 等)' },
targetCategories: {
type: 'array',
items: { type: 'string' },
description: '要评估的目标类别(可选,默认评估所有)',
},
},
required: ['userInfo'],
},
},
isConcurrencySafe: true,
toolType: 'agent_invocation',
timeoutMs: 30000,
estimatedDurationMs: 6000,
executor: createAgentExecutor(specialists.assessmentExpert),
});
queue.registerTool('invoke_memory_manager', {
tool: {
name: 'invoke_memory_manager',
description: '调用记忆管理 Agent提取和保存用户信息到长期记忆。',
input_schema: {
type: 'object',
properties: {
action: { type: 'string', enum: ['extract', 'save', 'load'] },
userMessage: { type: 'string', description: '用户消息extract 时必须)' },
assistantMessage: { type: 'string', description: '助手回复extract 时必须)' },
memoryData: { type: 'object', description: '要保存的记忆save 时必须)' },
},
required: ['action'],
},
},
isConcurrencySafe: false, // 可能写入!
toolType: 'agent_invocation',
timeoutMs: 15000,
estimatedDurationMs: 3000,
executor: createAgentExecutor(specialists.memoryManager),
});
// ======== 直接工具 ========
queue.registerTool('generate_payment', {
tool: {
name: 'generate_payment',
description: '生成付费评估服务的支付链接。',
input_schema: {
type: 'object',
properties: {
service: { type: 'string', enum: ['detailed_assessment'] },
userId: { type: 'string' },
},
required: ['service', 'userId'],
},
},
isConcurrencySafe: false, // 产生订单,有副作用
toolType: 'direct_tool',
timeoutMs: 10000,
estimatedDurationMs: 2000,
executor: directTools.generatePayment,
});
queue.registerTool('get_current_datetime', {
tool: {
name: 'get_current_datetime',
description: '获取当前日期和时间。',
input_schema: { type: 'object', properties: {} },
},
isConcurrencySafe: true,
toolType: 'direct_tool',
timeoutMs: 1000,
estimatedDurationMs: 5,
executor: async () => ({
content: new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Hong_Kong' }),
success: true,
}),
});
// ... 注册其他工具
}
```
## 12. 与旧架构的对比
| 方面 | 旧架构 (ClaudeAgentServiceV2) | 新架构 (ToolExecutionQueue) |
|------|-------------------------------|--------------------------|
| 工具执行 | `for` 循环串行逐个执行 | 智能分批safe 并行unsafe 串行 |
| 执行超时 | 无超时控制 | 每个工具独立超时 |
| 错误隔离 | 一个工具失败 → 整个循环中断 | 一个失败不影响其他 |
| 事件通知 | 无实时进度 | 完整的 QueueEvent 流 |
| Agent 子调用 | N/A无 Agent 概念) | 每个 Agent 独立 mini-loop |
| 成本追踪 | 仅追踪 Coordinator 的 tokens | 追踪每个 Agent 的 tokens |
| 扩展性 | 改 `executeTool()` switch 语句 | `registerTool()` 声明式注册 |