34 KiB
34 KiB
09 - 工具并发执行系统 (Tool Execution System)
1. 设计哲学
借鉴 Claude Code 的 ToolExecutionQueue(Q80 class)设计:
并发安全的工具并行执行,串行约束的工具顺序执行。
当 Coordinator Agent 在一次回复中返回多个 tool_use blocks 时,系统需要智能地决定哪些工具可以并行执行、哪些必须串行等待。这直接影响用户体验(响应延迟)和系统正确性(数据一致性)。
核心规则:
- 读操作并行:
search_knowledge、invoke_*(Agent 调用)等纯读操作可以并行 - 写操作串行:
save_user_memory、generate_payment等有副作用的操作必须串行 - 结果有序:无论执行顺序如何,结果按 Claude 返回的
tool_use顺序排列 - 错误隔离:一个工具失败不影响其他工具的执行
2. 并发安全性声明
2.1 工具并发安全性矩阵
| 工具名 | 类型 | isConcurrencySafe | 原因 |
|---|---|---|---|
invoke_policy_expert |
Agent 调用 | true |
纯读,独立 API 调用 |
invoke_assessment_expert |
Agent 调用 | true |
纯读,独立 API 调用 |
invoke_strategist |
Agent 调用 | true |
纯读,独立 API 调用 |
invoke_objection_handler |
Agent 调用 | true |
纯读,独立 API 调用 |
invoke_case_analyst |
Agent 调用 | true |
纯读,独立 API 调用 |
invoke_memory_manager |
Agent 调用 | false |
可能写入记忆 |
search_knowledge |
直接工具 | true |
纯读 |
get_user_context |
直接工具 | true |
纯读 |
save_user_memory |
直接工具 | false |
写操作 |
generate_payment |
直接工具 | false |
产生支付订单,有副作用 |
get_current_datetime |
直接工具 | true |
纯计算 |
web_search |
直接工具 | true |
纯读,外部 API |
get_exchange_rate |
直接工具 | true |
纯读 |
check_off_topic |
直接工具 | true |
纯读 |
2.2 并发安全判定规则
/**
* 判断工具是否并发安全的规则:
*
* 1. Agent 调用类 (invoke_*):
* - 默认并发安全(各 Agent 独立运行)
* - 例外: invoke_memory_manager(因为它可能调用 save_user_memory)
*
* 2. 读操作类:
* - search_*, get_*, check_* → 并发安全
*
* 3. 写操作类:
* - save_*, update_*, delete_* → 非并发安全
*
* 4. 副作用类:
* - generate_payment, send_notification → 非并发安全
*/
3. TypeScript 类型定义
// tool-execution.types.ts
import Anthropic from '@anthropic-ai/sdk';
/**
* 工具定义(扩展 Claude 的 Tool 类型)
*/
export interface ToolDefinition {
/** Claude API 工具定义 */
tool: Anthropic.Tool;
/** 是否可以与其他工具并发执行 */
isConcurrencySafe: boolean;
/** 工具类型:agent 调用 vs 直接工具 */
toolType: 'agent_invocation' | 'direct_tool';
/** 执行超时时间 (ms) */
timeoutMs: number;
/** 执行器函数 */
executor: (input: Record<string, unknown>) => Promise<ToolExecutionOutput>;
/** 预估耗时 (ms),用于前端显示和排序 */
estimatedDurationMs: number;
}
/**
* 工具执行输出
*/
export interface ToolExecutionOutput {
/** 返回给 Claude 的文本结果 */
content: string;
/** 执行是否成功 */
success: boolean;
/** 错误信息(如果失败) */
error?: string;
/** Agent 执行元数据(仅 agent_invocation 类型) */
agentMetadata?: {
agentName: string;
turnsUsed: number;
tokensUsed: { input: number; output: number };
toolCallsMade: string[];
durationMs: number;
};
}
/**
* 工具执行请求(从 Claude 返回的 tool_use block)
*/
export interface ToolExecutionRequest {
/** Claude 分配的 tool_use ID */
toolUseId: string;
/** 工具名称 */
toolName: string;
/** 工具输入参数 */
input: Record<string, unknown>;
/** 在 tool_use 数组中的原始位置(用于结果排序) */
originalIndex: number;
}
/**
* 工具执行结果
*/
export interface ToolExecutionResult {
/** Claude 分配的 tool_use ID */
toolUseId: string;
/** 工具名称 */
toolName: string;
/** 原始位置 */
originalIndex: number;
/** 执行输出 */
output: ToolExecutionOutput;
/** 执行耗时 (ms) */
durationMs: number;
/** 开始时间 */
startedAt: number;
/** 结束时间 */
completedAt: number;
}
/**
* 队列状态事件(用于 stream 通知前端)
*/
export type QueueEvent =
| { type: 'tool_queued'; toolName: string; toolUseId: string; position: number }
| { type: 'tool_started'; toolName: string; toolUseId: string }
| { type: 'tool_completed'; toolName: string; toolUseId: string; durationMs: number; success: boolean }
| { type: 'tool_error'; toolName: string; toolUseId: string; error: string }
| { type: 'agent_started'; agentName: string; toolUseId: string }
| { type: 'agent_progress'; agentName: string; toolUseId: string; turn: number; maxTurns: number }
| { type: 'agent_completed'; agentName: string; toolUseId: string; durationMs: number }
| { type: 'queue_drained' };
4. ToolExecutionQueue 完整实现
// tool-execution-queue.ts
import { Injectable, Logger } from '@nestjs/common';
import {
ToolDefinition,
ToolExecutionRequest,
ToolExecutionResult,
ToolExecutionOutput,
QueueEvent,
} from './tool-execution.types';
/**
* 工具并发执行队列
*
* 核心算法:
* 1. 接收一组 tool_use requests
* 2. 按原始顺序遍历
* 3. 将连续的 concurrency-safe 工具收集到一个 "并发批次"
* 4. 遇到非 safe 工具时:
* a. 先执行(await)当前并发批次
* b. 再单独执行非 safe 工具
* 5. 继续处理剩余工具
* 6. 最终按 originalIndex 排序返回所有结果
*/
@Injectable()
export class ToolExecutionQueue {
private readonly logger = new Logger(ToolExecutionQueue.name);
/** 已注册的工具定义 */
private toolRegistry = new Map<string, ToolDefinition>();
/** 事件回调(用于 stream 通知) */
private eventCallback?: (event: QueueEvent) => void;
/**
* 注册工具定义
*/
registerTool(name: string, definition: ToolDefinition): void {
this.toolRegistry.set(name, definition);
this.logger.log(`Registered tool: ${name} (concurrencySafe: ${definition.isConcurrencySafe})`);
}
/**
* 批量注册工具
*/
registerTools(tools: Record<string, ToolDefinition>): void {
for (const [name, def] of Object.entries(tools)) {
this.registerTool(name, def);
}
}
/**
* 设置事件回调(用于实时通知前端)
*/
onEvent(callback: (event: QueueEvent) => void): void {
this.eventCallback = callback;
}
/**
* 执行所有工具
*
* 这是主入口方法。接收 Claude 返回的所有 tool_use blocks,
* 按并发安全性分组执行,返回有序的结果。
*
* @param requests - 从 Claude 响应中提取的 tool_use 请求列表
* @returns 按 originalIndex 排序的执行结果
*/
async executeAll(requests: ToolExecutionRequest[]): Promise<ToolExecutionResult[]> {
if (requests.length === 0) return [];
this.logger.log(`Executing ${requests.length} tools: ${requests.map(r => r.toolName).join(', ')}`);
const allResults: ToolExecutionResult[] = [];
// ======== 核心算法:分批执行 ========
let currentBatch: ToolExecutionRequest[] = [];
for (const request of requests) {
const definition = this.toolRegistry.get(request.toolName);
if (!definition) {
// 未注册的工具 → 返回错误结果
allResults.push(this.createErrorResult(request, `Unknown tool: ${request.toolName}`));
continue;
}
this.emitEvent({ type: 'tool_queued', toolName: request.toolName, toolUseId: request.toolUseId, position: request.originalIndex });
if (definition.isConcurrencySafe) {
// 并发安全 → 加入当前批次
currentBatch.push(request);
} else {
// 非并发安全 → 先执行当前批次,再单独执行此工具
// 1. 执行当前并发批次
if (currentBatch.length > 0) {
const batchResults = await this.executeConcurrentBatch(currentBatch);
allResults.push(...batchResults);
currentBatch = [];
}
// 2. 单独执行非安全工具(串行)
const result = await this.executeSingleTool(request, definition);
allResults.push(result);
}
}
// 执行最后一个并发批次(如果有)
if (currentBatch.length > 0) {
const batchResults = await this.executeConcurrentBatch(currentBatch);
allResults.push(...batchResults);
}
// 按 originalIndex 排序
allResults.sort((a, b) => a.originalIndex - b.originalIndex);
this.emitEvent({ type: 'queue_drained' });
this.logger.log(`All ${allResults.length} tools completed`);
return allResults;
}
/**
* 并行执行一批并发安全的工具
*/
private async executeConcurrentBatch(batch: ToolExecutionRequest[]): Promise<ToolExecutionResult[]> {
if (batch.length === 0) return [];
if (batch.length === 1) {
// 单个工具不需要 Promise.all
const def = this.toolRegistry.get(batch[0].toolName)!;
return [await this.executeSingleTool(batch[0], def)];
}
this.logger.log(`Executing concurrent batch: ${batch.map(r => r.toolName).join(', ')}`);
// 并行执行所有工具
const promises = batch.map(request => {
const definition = this.toolRegistry.get(request.toolName)!;
return this.executeSingleTool(request, definition);
});
// Promise.allSettled 确保一个失败不影响其他
const settled = await Promise.allSettled(promises);
return settled.map((result, index) => {
if (result.status === 'fulfilled') {
return result.value;
} else {
return this.createErrorResult(batch[index], `Execution failed: ${result.reason}`);
}
});
}
/**
* 执行单个工具
*/
private async executeSingleTool(
request: ToolExecutionRequest,
definition: ToolDefinition,
): Promise<ToolExecutionResult> {
const startedAt = Date.now();
// 通知:Agent 调用开始
if (definition.toolType === 'agent_invocation') {
const agentName = request.toolName.replace('invoke_', '');
this.emitEvent({ type: 'agent_started', agentName, toolUseId: request.toolUseId });
}
this.emitEvent({ type: 'tool_started', toolName: request.toolName, toolUseId: request.toolUseId });
try {
// 带超时的执行
const output = await this.executeWithTimeout(
definition.executor(request.input),
definition.timeoutMs,
request.toolName,
);
const completedAt = Date.now();
const durationMs = completedAt - startedAt;
// 通知:完成
this.emitEvent({
type: 'tool_completed',
toolName: request.toolName,
toolUseId: request.toolUseId,
durationMs,
success: output.success,
});
if (definition.toolType === 'agent_invocation') {
const agentName = request.toolName.replace('invoke_', '');
this.emitEvent({ type: 'agent_completed', agentName, toolUseId: request.toolUseId, durationMs });
}
this.logger.log(`Tool ${request.toolName} completed in ${durationMs}ms (success: ${output.success})`);
return {
toolUseId: request.toolUseId,
toolName: request.toolName,
originalIndex: request.originalIndex,
output,
durationMs,
startedAt,
completedAt,
};
} catch (error) {
const completedAt = Date.now();
const durationMs = completedAt - startedAt;
const errorMessage = error instanceof Error ? error.message : String(error);
this.logger.error(`Tool ${request.toolName} failed after ${durationMs}ms: ${errorMessage}`);
this.emitEvent({
type: 'tool_error',
toolName: request.toolName,
toolUseId: request.toolUseId,
error: errorMessage,
});
return {
toolUseId: request.toolUseId,
toolName: request.toolName,
originalIndex: request.originalIndex,
output: {
content: `工具执行失败: ${errorMessage}。请重试或使用其他方式获取信息。`,
success: false,
error: errorMessage,
},
durationMs,
startedAt,
completedAt,
};
}
}
/**
* 带超时的执行
*/
private async executeWithTimeout(
promise: Promise<ToolExecutionOutput>,
timeoutMs: number,
toolName: string,
): Promise<ToolExecutionOutput> {
return new Promise<ToolExecutionOutput>((resolve, reject) => {
const timer = setTimeout(() => {
reject(new Error(`Tool ${toolName} timed out after ${timeoutMs}ms`));
}, timeoutMs);
promise
.then(result => {
clearTimeout(timer);
resolve(result);
})
.catch(error => {
clearTimeout(timer);
reject(error);
});
});
}
/**
* 创建错误结果
*/
private createErrorResult(request: ToolExecutionRequest, error: string): ToolExecutionResult {
return {
toolUseId: request.toolUseId,
toolName: request.toolName,
originalIndex: request.originalIndex,
output: {
content: `错误: ${error}`,
success: false,
error,
},
durationMs: 0,
startedAt: Date.now(),
completedAt: Date.now(),
};
}
/**
* 发送事件
*/
private emitEvent(event: QueueEvent): void {
if (this.eventCallback) {
try {
this.eventCallback(event);
} catch (e) {
this.logger.error(`Event callback error: ${e}`);
}
}
}
/**
* 获取已注册工具列表(用于 Claude API)
*/
getClaudeTools(): Anthropic.Tool[] {
return Array.from(this.toolRegistry.values()).map(def => def.tool);
}
/**
* 获取工具定义(用于调试)
*/
getToolDefinition(name: string): ToolDefinition | undefined {
return this.toolRegistry.get(name);
}
}
5. Agent 调用执行器
当工具类型是 agent_invocation 时,executor 需要创建一个新的 Claude API 调用,运行 Specialist Agent 的 mini-loop。
// agent-executor.ts
import Anthropic from '@anthropic-ai/sdk';
import { ToolExecutionOutput } from './tool-execution.types';
import { BaseSpecialistService } from '../specialists/base-specialist.service';
/**
* Agent 调用执行器工厂
*
* 为每个 Specialist Agent 创建一个 executor 函数,
* 该函数被注册到 ToolExecutionQueue 中。
*/
export function createAgentExecutor(
specialist: BaseSpecialistService,
eventCallback?: (event: { turn: number; maxTurns: number }) => void,
): (input: Record<string, unknown>) => Promise<ToolExecutionOutput> {
return async (input: Record<string, unknown>): Promise<ToolExecutionOutput> => {
const startTime = Date.now();
let totalInputTokens = 0;
let totalOutputTokens = 0;
const toolCallsMade: string[] = [];
try {
// Specialist Agent 的 mini-loop(最多 3 轮)
const result = await specialist.execute({
input,
maxTurns: 3,
onProgress: (turn, maxTurns) => {
eventCallback?.({ turn, maxTurns });
},
onTokenUsage: (input, output) => {
totalInputTokens += input;
totalOutputTokens += output;
},
onToolCall: (toolName) => {
toolCallsMade.push(toolName);
},
});
return {
content: result.output,
success: true,
agentMetadata: {
agentName: specialist.name,
turnsUsed: result.turnsUsed,
tokensUsed: { input: totalInputTokens, output: totalOutputTokens },
toolCallsMade,
durationMs: Date.now() - startTime,
},
};
} catch (error) {
return {
content: `Agent ${specialist.name} 执行失败: ${error instanceof Error ? error.message : String(error)}`,
success: false,
error: error instanceof Error ? error.message : String(error),
agentMetadata: {
agentName: specialist.name,
turnsUsed: 0,
tokensUsed: { input: totalInputTokens, output: totalOutputTokens },
toolCallsMade,
durationMs: Date.now() - startTime,
},
};
}
};
}
6. BaseSpecialistService 抽象类
// base-specialist.service.ts
import Anthropic from '@anthropic-ai/sdk';
/**
* Specialist Agent 执行参数
*/
export interface SpecialistExecuteParams {
input: Record<string, unknown>;
maxTurns: number;
onProgress?: (turn: number, maxTurns: number) => void;
onTokenUsage?: (inputTokens: number, outputTokens: number) => void;
onToolCall?: (toolName: string) => void;
}
/**
* Specialist Agent 执行结果
*/
export interface SpecialistExecuteResult {
output: string; // 返回给 Coordinator 的文本
turnsUsed: number;
metadata?: Record<string, unknown>;
}
/**
* Specialist Agent 基类
*
* 每个 Specialist Agent 都继承此基类。
* 基类提供:
* 1. 标准的 mini-loop(最多 N 轮)
* 2. 工具执行
* 3. Token 追踪
* 4. 错误处理
*/
export abstract class BaseSpecialistService {
/** Agent 名称 */
abstract readonly name: string;
/** 使用的模型 */
abstract readonly model: string;
/** Agent 的 system prompt */
abstract readonly systemPrompt: string;
/** Agent 可用的工具 */
abstract readonly tools: Anthropic.Tool[];
/** 最大输出 tokens */
abstract readonly maxTokens: number;
protected client: Anthropic;
constructor(client: Anthropic) {
this.client = client;
}
/**
* 执行 Agent 的 mini-loop
*
* 标准流程:
* 1. 构建初始 messages(基于 input 参数)
* 2. 调用 Claude API
* 3. 如果有 tool_use → 执行工具 → 递归
* 4. 无 tool_use → 返回最终文本
* 5. 达到 maxTurns → 强制返回当前结果
*/
async execute(params: SpecialistExecuteParams): Promise<SpecialistExecuteResult> {
const messages: Anthropic.MessageParam[] = this.buildInitialMessages(params.input);
let turnsUsed = 0;
while (turnsUsed < params.maxTurns) {
turnsUsed++;
params.onProgress?.(turnsUsed, params.maxTurns);
// 调用 Claude API(非流式,Specialist 不需要流式输出)
const response = await this.client.messages.create({
model: this.model,
max_tokens: this.maxTokens,
system: this.systemPrompt,
messages,
tools: this.tools.length > 0 ? this.tools : undefined,
});
// 记录 token 使用
params.onTokenUsage?.(
response.usage.input_tokens,
response.usage.output_tokens,
);
// 提取文本和 tool_use
const textBlocks = response.content.filter(b => b.type === 'text');
const toolUses = response.content.filter(b => b.type === 'tool_use');
// 无工具调用 → 返回文本
if (toolUses.length === 0) {
const outputText = textBlocks.map(b => (b as Anthropic.TextBlock).text).join('\n');
return {
output: outputText,
turnsUsed,
metadata: this.extractMetadata(outputText),
};
}
// 有工具调用 → 执行工具
messages.push({ role: 'assistant', content: response.content });
const toolResults: Anthropic.ToolResultBlockParam[] = [];
for (const toolUse of toolUses) {
const tu = toolUse as Anthropic.ToolUseBlock;
params.onToolCall?.(tu.name);
const result = await this.executeSpecialistTool(
tu.name,
tu.input as Record<string, unknown>,
);
toolResults.push({
type: 'tool_result',
tool_use_id: tu.id,
content: typeof result === 'string' ? result : JSON.stringify(result),
});
}
messages.push({ role: 'user', content: toolResults });
}
// 达到 maxTurns,强制返回最后的文本
return {
output: `[Agent ${this.name} 达到最大轮次 (${params.maxTurns}),以下是当前结果]\n${this.extractLastText(messages)}`,
turnsUsed,
};
}
/**
* 构建初始 messages
* 子类可以覆盖以定制输入格式
*/
protected buildInitialMessages(input: Record<string, unknown>): Anthropic.MessageParam[] {
return [{
role: 'user',
content: JSON.stringify(input, null, 2),
}];
}
/**
* 执行 Specialist 自己的工具
* 子类必须实现
*/
protected abstract executeSpecialistTool(
toolName: string,
input: Record<string, unknown>,
): Promise<unknown>;
/**
* 从输出中提取结构化元数据
* 子类可选覆盖
*/
protected extractMetadata(output: string): Record<string, unknown> | undefined {
return undefined;
}
/**
* 提取 messages 中最后一条文本
*/
private extractLastText(messages: Anthropic.MessageParam[]): string {
for (let i = messages.length - 1; i >= 0; i--) {
const msg = messages[i];
if (msg.role === 'assistant') {
if (typeof msg.content === 'string') return msg.content;
if (Array.isArray(msg.content)) {
const textBlocks = msg.content.filter((b: any) => b.type === 'text');
return textBlocks.map((b: any) => b.text).join('\n');
}
}
}
return '(无输出)';
}
}
7. 执行流程图
Claude Coordinator 返回 tool_use blocks:
[invoke_policy_expert, invoke_case_analyst, save_user_memory, invoke_assessment_expert]
ToolExecutionQueue.executeAll():
Step 1: 遍历 tool_use 列表
├─ invoke_policy_expert → isConcurrencySafe: true → 加入 Batch A
├─ invoke_case_analyst → isConcurrencySafe: true → 加入 Batch A
├─ save_user_memory → isConcurrencySafe: false → 触发排空!
│ ├─ 先执行 Batch A(并行):
│ │ ├─ invoke_policy_expert ──→ PolicyExpert.execute() ──→ Claude API
│ │ └─ invoke_case_analyst ──→ CaseAnalyst.execute() ──→ Claude API
│ │ ↑ 这两个并行执行!
│ ├─ Batch A 完成
│ └─ 再执行 save_user_memory(串行):
│ └─ knowledgeClient.saveUserMemory() ──→ knowledge-service HTTP
│
└─ invoke_assessment_expert → isConcurrencySafe: true → 加入 Batch B
Step 2: 执行最后的 Batch B:
└─ invoke_assessment_expert ──→ AssessmentExpert.execute() ──→ Claude API
Step 3: 按 originalIndex 排序结果:
[0] invoke_policy_expert → "高才通B类要求..."
[1] invoke_case_analyst → "类似案例:张先生..."
[2] save_user_memory → "已保存用户信息"
[3] invoke_assessment_expert → "评估结果: {score: 75, ...}"
Step 4: 返回排序后的结果 → Coordinator 继续处理
8. 与 Agent Loop 的集成
// coordinator-agent.service.ts (片段)
// 在 agentLoop 中使用 ToolExecutionQueue
async function* agentLoop(params: AgentLoopParams): AsyncGenerator<StreamEvent> {
// ... (Step 1-4: context injection, API call, stream collection)
// Step 5: 提取 tool_use blocks
const toolUses = assistantBlocks.filter(b => b.type === 'tool_use');
if (toolUses.length === 0) {
return; // 自然终止
}
// Step 6: 构建执行请求
const requests: ToolExecutionRequest[] = toolUses.map((tu, index) => ({
toolUseId: (tu as Anthropic.ToolUseBlock).id,
toolName: (tu as Anthropic.ToolUseBlock).name,
input: (tu as Anthropic.ToolUseBlock).input as Record<string, unknown>,
originalIndex: index,
}));
// Step 7: 设置事件回调 → 转发到前端
toolExecutionQueue.onEvent((event) => {
switch (event.type) {
case 'agent_started':
// yield 不能在回调中用,需要通过 EventEmitter 或 channel
streamChannel.push({ type: 'agent_start', agentName: event.agentName });
break;
case 'agent_completed':
streamChannel.push({ type: 'agent_complete', agentName: event.agentName, durationMs: event.durationMs });
break;
case 'tool_error':
streamChannel.push({ type: 'tool_error', toolName: event.toolName, error: event.error });
break;
}
});
// Step 8: 执行所有工具
const results = await toolExecutionQueue.executeAll(requests);
// Step 9: 构建 tool_results 消息
const toolResultBlocks: Anthropic.ToolResultBlockParam[] = results.map(r => ({
type: 'tool_result' as const,
tool_use_id: r.toolUseId,
content: r.output.content,
is_error: !r.output.success,
}));
// Step 10: 递归 — 将结果喂回 Coordinator
const newMessages = [
...params.messages,
{ role: 'assistant' as const, content: assistantBlocks },
{ role: 'user' as const, content: toolResultBlocks },
];
yield* agentLoop({ ...params, messages: newMessages });
}
9. 错误处理策略
9.1 工具级错误处理
| 错误类型 | 处理方式 | 返回给 Coordinator |
|---|---|---|
| 超时 | 超过 timeoutMs |
"工具执行超时,请重试或使用其他方式获取信息。" |
| API 错误 (knowledge-service 不可用) | 捕获异常 | "知识库服务暂时不可用,请根据已知信息回答。" |
| Agent 内部错误 (Specialist Claude API 失败) | 捕获异常 | "专家 Agent 暂时不可用: {error}. 请直接回答用户。" |
| 输入验证失败 | 早期返回 | "工具输入参数无效: {details}" |
| Rate Limit | 指数退避重试 1 次 | 重试成功则正常返回,否则返回错误 |
| 未知工具 | 跳过 | "未知工具: {toolName}" |
9.2 批次级错误处理
/**
* Promise.allSettled 确保批次中一个工具的失败不会导致整个批次失败。
*
* 示例:
* Batch = [invoke_policy_expert, invoke_case_analyst]
*
* 如果 invoke_policy_expert 超时:
* - invoke_case_analyst 正常返回其结果
* - invoke_policy_expert 返回错误消息
* - Coordinator 根据有效的 case_analyst 结果继续生成回复
*
* Coordinator 的 Prompt 中包含指引:
* "如果某个工具调用返回错误,请根据其他可用信息继续回复用户,
* 不要向用户暴露技术错误细节。"
*/
9.3 Agent 执行重试
/**
* Specialist Agent 的重试策略(在 base-specialist.service.ts 中)
*/
private async callClaudeWithRetry(
params: Anthropic.MessageCreateParams,
maxRetries: number = 1,
): Promise<Anthropic.Message> {
let lastError: Error | null = null;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await this.client.messages.create(params);
} catch (error) {
lastError = error instanceof Error ? error : new Error(String(error));
if (this.isRateLimitError(error) && attempt < maxRetries) {
const retryAfter = this.extractRetryAfter(error);
await this.sleep(retryAfter || 1000 * (attempt + 1));
continue;
}
if (this.isOverloadedError(error) && attempt < maxRetries) {
// 降级到 Haiku 模型
return await this.client.messages.create({
...params,
model: 'claude-haiku-4-20250514',
});
}
throw error;
}
}
throw lastError;
}
10. 性能优化
10.1 并行度分析
典型场景 1: 用户问政策 + 系统需要保存记忆
tool_uses: [invoke_policy_expert, invoke_memory_manager]
顺序执行: 3s + 2s = 5s
并发执行: invoke_memory_manager 不是 safe,但在 invoke_policy_expert 之后
→ Batch [invoke_policy_expert] (3s) → invoke_memory_manager (2s) = 5s
这种情况并发无优势
典型场景 2: 评估用户 + 查找案例 + 获取策略建议
tool_uses: [invoke_assessment_expert, invoke_case_analyst, invoke_strategist]
顺序执行: 4s + 2s + 2s = 8s
并发执行: 全部 safe → Batch 并行 = max(4s, 2s, 2s) = 4s
节省 50% 时间!
典型场景 3: 保存记忆 + 评估 + 生成支付
tool_uses: [save_user_memory, invoke_assessment_expert, generate_payment]
顺序执行: 1s + 4s + 2s = 7s
我们的算法:
save_user_memory (not safe) → 单独执行 (1s)
invoke_assessment_expert (safe) → Batch B
generate_payment (not safe) → 先执行 Batch B (4s), 再单独 (2s)
总计: 1s + 4s + 2s = 7s (这种情况并发无优势)
10.2 超时配置建议
| 工具类型 | 建议超时 | 原因 |
|---|---|---|
| Agent 调用 (Sonnet) | 30s | Sonnet 单次调用约 3-8s,Agent 最多 3 轮 |
| Agent 调用 (Haiku) | 15s | Haiku 更快 |
| search_knowledge | 10s | HTTP + 向量检索 |
| save_user_memory | 5s | HTTP + 写入 |
| generate_payment | 10s | 外部支付 API |
| get_current_datetime | 1s | 本地计算 |
| web_search | 15s | 外部搜索 API |
10.3 Token 成本追踪
/**
* 在 ToolExecutionQueue 层面追踪所有 Agent 调用的 token 开销
*/
interface AgentCostSummary {
agentName: string;
inputTokens: number;
outputTokens: number;
estimatedCostUsd: number;
turnsUsed: number;
durationMs: number;
}
/**
* 每次 executeAll 后可以获取本轮的成本汇总
*/
getCostSummary(results: ToolExecutionResult[]): AgentCostSummary[] {
return results
.filter(r => r.output.agentMetadata)
.map(r => {
const meta = r.output.agentMetadata!;
const inputCost = meta.tokensUsed.input * 0.003 / 1000; // Sonnet input price
const outputCost = meta.tokensUsed.output * 0.015 / 1000; // Sonnet output price
return {
agentName: meta.agentName,
inputTokens: meta.tokensUsed.input,
outputTokens: meta.tokensUsed.output,
estimatedCostUsd: inputCost + outputCost,
turnsUsed: meta.turnsUsed,
durationMs: meta.durationMs,
};
});
}
11. 工具注册示例
// tool-registry.ts - 工具注册
export function registerAllTools(
queue: ToolExecutionQueue,
specialists: SpecialistServices,
directTools: DirectToolServices,
): void {
// ======== Agent 调用工具 ========
queue.registerTool('invoke_policy_expert', {
tool: {
name: 'invoke_policy_expert',
description: '调用政策专家 Agent,查询香港移民政策的详细信息、条件要求、申请流程等。',
input_schema: {
type: 'object',
properties: {
query: { type: 'string', description: '要查询的政策问题' },
category: { type: 'string', description: '移民类别(可选):TTPS/QMAS/GEP/IANG/CIES/TECHTAS' },
},
required: ['query'],
},
},
isConcurrencySafe: true,
toolType: 'agent_invocation',
timeoutMs: 30000,
estimatedDurationMs: 5000,
executor: createAgentExecutor(specialists.policyExpert),
});
queue.registerTool('invoke_assessment_expert', {
tool: {
name: 'invoke_assessment_expert',
description: '调用评估专家 Agent,根据用户信息评估移民资格和推荐方案。',
input_schema: {
type: 'object',
properties: {
userInfo: { type: 'object', description: '用户信息(age, education, workYears 等)' },
targetCategories: {
type: 'array',
items: { type: 'string' },
description: '要评估的目标类别(可选,默认评估所有)',
},
},
required: ['userInfo'],
},
},
isConcurrencySafe: true,
toolType: 'agent_invocation',
timeoutMs: 30000,
estimatedDurationMs: 6000,
executor: createAgentExecutor(specialists.assessmentExpert),
});
queue.registerTool('invoke_memory_manager', {
tool: {
name: 'invoke_memory_manager',
description: '调用记忆管理 Agent,提取和保存用户信息到长期记忆。',
input_schema: {
type: 'object',
properties: {
action: { type: 'string', enum: ['extract', 'save', 'load'] },
userMessage: { type: 'string', description: '用户消息(extract 时必须)' },
assistantMessage: { type: 'string', description: '助手回复(extract 时必须)' },
memoryData: { type: 'object', description: '要保存的记忆(save 时必须)' },
},
required: ['action'],
},
},
isConcurrencySafe: false, // 可能写入!
toolType: 'agent_invocation',
timeoutMs: 15000,
estimatedDurationMs: 3000,
executor: createAgentExecutor(specialists.memoryManager),
});
// ======== 直接工具 ========
queue.registerTool('generate_payment', {
tool: {
name: 'generate_payment',
description: '生成付费评估服务的支付链接。',
input_schema: {
type: 'object',
properties: {
service: { type: 'string', enum: ['detailed_assessment'] },
userId: { type: 'string' },
},
required: ['service', 'userId'],
},
},
isConcurrencySafe: false, // 产生订单,有副作用
toolType: 'direct_tool',
timeoutMs: 10000,
estimatedDurationMs: 2000,
executor: directTools.generatePayment,
});
queue.registerTool('get_current_datetime', {
tool: {
name: 'get_current_datetime',
description: '获取当前日期和时间。',
input_schema: { type: 'object', properties: {} },
},
isConcurrencySafe: true,
toolType: 'direct_tool',
timeoutMs: 1000,
estimatedDurationMs: 5,
executor: async () => ({
content: new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Hong_Kong' }),
success: true,
}),
});
// ... 注册其他工具
}
12. 与旧架构的对比
| 方面 | 旧架构 (ClaudeAgentServiceV2) | 新架构 (ToolExecutionQueue) |
|---|---|---|
| 工具执行 | for 循环串行逐个执行 |
智能分批:safe 并行,unsafe 串行 |
| 执行超时 | 无超时控制 | 每个工具独立超时 |
| 错误隔离 | 一个工具失败 → 整个循环中断 | 一个失败不影响其他 |
| 事件通知 | 无实时进度 | 完整的 QueueEvent 流 |
| Agent 子调用 | N/A(无 Agent 概念) | 每个 Agent 独立 mini-loop |
| 成本追踪 | 仅追踪 Coordinator 的 tokens | 追踪每个 Agent 的 tokens |
| 扩展性 | 改 executeTool() switch 语句 |
registerTool() 声明式注册 |