1120 lines
34 KiB
Markdown
1120 lines
34 KiB
Markdown
# 09 - 工具并发执行系统 (Tool Execution System)
|
||
|
||
## 1. 设计哲学
|
||
|
||
借鉴 Claude Code 的 `ToolExecutionQueue`(Q80 class)设计:
|
||
|
||
> **并发安全的工具并行执行,串行约束的工具顺序执行。**
|
||
|
||
当 Coordinator Agent 在一次回复中返回多个 `tool_use` blocks 时,系统需要智能地决定哪些工具可以并行执行、哪些必须串行等待。这直接影响用户体验(响应延迟)和系统正确性(数据一致性)。
|
||
|
||
核心规则:
|
||
1. **读操作并行**:`search_knowledge`、`invoke_*`(Agent 调用)等纯读操作可以并行
|
||
2. **写操作串行**:`save_user_memory`、`generate_payment` 等有副作用的操作必须串行
|
||
3. **结果有序**:无论执行顺序如何,结果按 Claude 返回的 `tool_use` 顺序排列
|
||
4. **错误隔离**:一个工具失败不影响其他工具的执行
|
||
|
||
## 2. 并发安全性声明
|
||
|
||
### 2.1 工具并发安全性矩阵
|
||
|
||
| 工具名 | 类型 | isConcurrencySafe | 原因 |
|
||
|--------|------|-------------------|------|
|
||
| `invoke_policy_expert` | Agent 调用 | `true` | 纯读,独立 API 调用 |
|
||
| `invoke_assessment_expert` | Agent 调用 | `true` | 纯读,独立 API 调用 |
|
||
| `invoke_strategist` | Agent 调用 | `true` | 纯读,独立 API 调用 |
|
||
| `invoke_objection_handler` | Agent 调用 | `true` | 纯读,独立 API 调用 |
|
||
| `invoke_case_analyst` | Agent 调用 | `true` | 纯读,独立 API 调用 |
|
||
| `invoke_memory_manager` | Agent 调用 | `false` | **可能写入记忆** |
|
||
| `search_knowledge` | 直接工具 | `true` | 纯读 |
|
||
| `get_user_context` | 直接工具 | `true` | 纯读 |
|
||
| `save_user_memory` | 直接工具 | `false` | **写操作** |
|
||
| `generate_payment` | 直接工具 | `false` | **产生支付订单,有副作用** |
|
||
| `get_current_datetime` | 直接工具 | `true` | 纯计算 |
|
||
| `web_search` | 直接工具 | `true` | 纯读,外部 API |
|
||
| `get_exchange_rate` | 直接工具 | `true` | 纯读 |
|
||
| `check_off_topic` | 直接工具 | `true` | 纯读 |
|
||
|
||
### 2.2 并发安全判定规则
|
||
|
||
```typescript
|
||
/**
|
||
* 判断工具是否并发安全的规则:
|
||
*
|
||
* 1. Agent 调用类 (invoke_*):
|
||
* - 默认并发安全(各 Agent 独立运行)
|
||
* - 例外: invoke_memory_manager(因为它可能调用 save_user_memory)
|
||
*
|
||
* 2. 读操作类:
|
||
* - search_*, get_*, check_* → 并发安全
|
||
*
|
||
* 3. 写操作类:
|
||
* - save_*, update_*, delete_* → 非并发安全
|
||
*
|
||
* 4. 副作用类:
|
||
* - generate_payment, send_notification → 非并发安全
|
||
*/
|
||
```
|
||
|
||
## 3. TypeScript 类型定义
|
||
|
||
```typescript
|
||
// tool-execution.types.ts
|
||
|
||
import Anthropic from '@anthropic-ai/sdk';
|
||
|
||
/**
|
||
* 工具定义(扩展 Claude 的 Tool 类型)
|
||
*/
|
||
export interface ToolDefinition {
|
||
/** Claude API 工具定义 */
|
||
tool: Anthropic.Tool;
|
||
|
||
/** 是否可以与其他工具并发执行 */
|
||
isConcurrencySafe: boolean;
|
||
|
||
/** 工具类型:agent 调用 vs 直接工具 */
|
||
toolType: 'agent_invocation' | 'direct_tool';
|
||
|
||
/** 执行超时时间 (ms) */
|
||
timeoutMs: number;
|
||
|
||
/** 执行器函数 */
|
||
executor: (input: Record<string, unknown>) => Promise<ToolExecutionOutput>;
|
||
|
||
/** 预估耗时 (ms),用于前端显示和排序 */
|
||
estimatedDurationMs: number;
|
||
}
|
||
|
||
/**
|
||
* 工具执行输出
|
||
*/
|
||
export interface ToolExecutionOutput {
|
||
/** 返回给 Claude 的文本结果 */
|
||
content: string;
|
||
|
||
/** 执行是否成功 */
|
||
success: boolean;
|
||
|
||
/** 错误信息(如果失败) */
|
||
error?: string;
|
||
|
||
/** Agent 执行元数据(仅 agent_invocation 类型) */
|
||
agentMetadata?: {
|
||
agentName: string;
|
||
turnsUsed: number;
|
||
tokensUsed: { input: number; output: number };
|
||
toolCallsMade: string[];
|
||
durationMs: number;
|
||
};
|
||
}
|
||
|
||
/**
|
||
* 工具执行请求(从 Claude 返回的 tool_use block)
|
||
*/
|
||
export interface ToolExecutionRequest {
|
||
/** Claude 分配的 tool_use ID */
|
||
toolUseId: string;
|
||
|
||
/** 工具名称 */
|
||
toolName: string;
|
||
|
||
/** 工具输入参数 */
|
||
input: Record<string, unknown>;
|
||
|
||
/** 在 tool_use 数组中的原始位置(用于结果排序) */
|
||
originalIndex: number;
|
||
}
|
||
|
||
/**
|
||
* 工具执行结果
|
||
*/
|
||
export interface ToolExecutionResult {
|
||
/** Claude 分配的 tool_use ID */
|
||
toolUseId: string;
|
||
|
||
/** 工具名称 */
|
||
toolName: string;
|
||
|
||
/** 原始位置 */
|
||
originalIndex: number;
|
||
|
||
/** 执行输出 */
|
||
output: ToolExecutionOutput;
|
||
|
||
/** 执行耗时 (ms) */
|
||
durationMs: number;
|
||
|
||
/** 开始时间 */
|
||
startedAt: number;
|
||
|
||
/** 结束时间 */
|
||
completedAt: number;
|
||
}
|
||
|
||
/**
|
||
* 队列状态事件(用于 stream 通知前端)
|
||
*/
|
||
export type QueueEvent =
|
||
| { type: 'tool_queued'; toolName: string; toolUseId: string; position: number }
|
||
| { type: 'tool_started'; toolName: string; toolUseId: string }
|
||
| { type: 'tool_completed'; toolName: string; toolUseId: string; durationMs: number; success: boolean }
|
||
| { type: 'tool_error'; toolName: string; toolUseId: string; error: string }
|
||
| { type: 'agent_started'; agentName: string; toolUseId: string }
|
||
| { type: 'agent_progress'; agentName: string; toolUseId: string; turn: number; maxTurns: number }
|
||
| { type: 'agent_completed'; agentName: string; toolUseId: string; durationMs: number }
|
||
| { type: 'queue_drained' };
|
||
```
|
||
|
||
## 4. ToolExecutionQueue 完整实现
|
||
|
||
```typescript
|
||
// tool-execution-queue.ts
|
||
|
||
import { Injectable, Logger } from '@nestjs/common';
|
||
import {
|
||
ToolDefinition,
|
||
ToolExecutionRequest,
|
||
ToolExecutionResult,
|
||
ToolExecutionOutput,
|
||
QueueEvent,
|
||
} from './tool-execution.types';
|
||
|
||
/**
|
||
* 工具并发执行队列
|
||
*
|
||
* 核心算法:
|
||
* 1. 接收一组 tool_use requests
|
||
* 2. 按原始顺序遍历
|
||
* 3. 将连续的 concurrency-safe 工具收集到一个 "并发批次"
|
||
* 4. 遇到非 safe 工具时:
|
||
* a. 先执行(await)当前并发批次
|
||
* b. 再单独执行非 safe 工具
|
||
* 5. 继续处理剩余工具
|
||
* 6. 最终按 originalIndex 排序返回所有结果
|
||
*/
|
||
@Injectable()
|
||
export class ToolExecutionQueue {
|
||
private readonly logger = new Logger(ToolExecutionQueue.name);
|
||
|
||
/** 已注册的工具定义 */
|
||
private toolRegistry = new Map<string, ToolDefinition>();
|
||
|
||
/** 事件回调(用于 stream 通知) */
|
||
private eventCallback?: (event: QueueEvent) => void;
|
||
|
||
/**
|
||
* 注册工具定义
|
||
*/
|
||
registerTool(name: string, definition: ToolDefinition): void {
|
||
this.toolRegistry.set(name, definition);
|
||
this.logger.log(`Registered tool: ${name} (concurrencySafe: ${definition.isConcurrencySafe})`);
|
||
}
|
||
|
||
/**
|
||
* 批量注册工具
|
||
*/
|
||
registerTools(tools: Record<string, ToolDefinition>): void {
|
||
for (const [name, def] of Object.entries(tools)) {
|
||
this.registerTool(name, def);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 设置事件回调(用于实时通知前端)
|
||
*/
|
||
onEvent(callback: (event: QueueEvent) => void): void {
|
||
this.eventCallback = callback;
|
||
}
|
||
|
||
/**
|
||
* 执行所有工具
|
||
*
|
||
* 这是主入口方法。接收 Claude 返回的所有 tool_use blocks,
|
||
* 按并发安全性分组执行,返回有序的结果。
|
||
*
|
||
* @param requests - 从 Claude 响应中提取的 tool_use 请求列表
|
||
* @returns 按 originalIndex 排序的执行结果
|
||
*/
|
||
async executeAll(requests: ToolExecutionRequest[]): Promise<ToolExecutionResult[]> {
|
||
if (requests.length === 0) return [];
|
||
|
||
this.logger.log(`Executing ${requests.length} tools: ${requests.map(r => r.toolName).join(', ')}`);
|
||
|
||
const allResults: ToolExecutionResult[] = [];
|
||
|
||
// ======== 核心算法:分批执行 ========
|
||
let currentBatch: ToolExecutionRequest[] = [];
|
||
|
||
for (const request of requests) {
|
||
const definition = this.toolRegistry.get(request.toolName);
|
||
|
||
if (!definition) {
|
||
// 未注册的工具 → 返回错误结果
|
||
allResults.push(this.createErrorResult(request, `Unknown tool: ${request.toolName}`));
|
||
continue;
|
||
}
|
||
|
||
this.emitEvent({ type: 'tool_queued', toolName: request.toolName, toolUseId: request.toolUseId, position: request.originalIndex });
|
||
|
||
if (definition.isConcurrencySafe) {
|
||
// 并发安全 → 加入当前批次
|
||
currentBatch.push(request);
|
||
} else {
|
||
// 非并发安全 → 先执行当前批次,再单独执行此工具
|
||
|
||
// 1. 执行当前并发批次
|
||
if (currentBatch.length > 0) {
|
||
const batchResults = await this.executeConcurrentBatch(currentBatch);
|
||
allResults.push(...batchResults);
|
||
currentBatch = [];
|
||
}
|
||
|
||
// 2. 单独执行非安全工具(串行)
|
||
const result = await this.executeSingleTool(request, definition);
|
||
allResults.push(result);
|
||
}
|
||
}
|
||
|
||
// 执行最后一个并发批次(如果有)
|
||
if (currentBatch.length > 0) {
|
||
const batchResults = await this.executeConcurrentBatch(currentBatch);
|
||
allResults.push(...batchResults);
|
||
}
|
||
|
||
// 按 originalIndex 排序
|
||
allResults.sort((a, b) => a.originalIndex - b.originalIndex);
|
||
|
||
this.emitEvent({ type: 'queue_drained' });
|
||
this.logger.log(`All ${allResults.length} tools completed`);
|
||
|
||
return allResults;
|
||
}
|
||
|
||
/**
|
||
* 并行执行一批并发安全的工具
|
||
*/
|
||
private async executeConcurrentBatch(batch: ToolExecutionRequest[]): Promise<ToolExecutionResult[]> {
|
||
if (batch.length === 0) return [];
|
||
|
||
if (batch.length === 1) {
|
||
// 单个工具不需要 Promise.all
|
||
const def = this.toolRegistry.get(batch[0].toolName)!;
|
||
return [await this.executeSingleTool(batch[0], def)];
|
||
}
|
||
|
||
this.logger.log(`Executing concurrent batch: ${batch.map(r => r.toolName).join(', ')}`);
|
||
|
||
// 并行执行所有工具
|
||
const promises = batch.map(request => {
|
||
const definition = this.toolRegistry.get(request.toolName)!;
|
||
return this.executeSingleTool(request, definition);
|
||
});
|
||
|
||
// Promise.allSettled 确保一个失败不影响其他
|
||
const settled = await Promise.allSettled(promises);
|
||
|
||
return settled.map((result, index) => {
|
||
if (result.status === 'fulfilled') {
|
||
return result.value;
|
||
} else {
|
||
return this.createErrorResult(batch[index], `Execution failed: ${result.reason}`);
|
||
}
|
||
});
|
||
}
|
||
|
||
/**
|
||
* 执行单个工具
|
||
*/
|
||
private async executeSingleTool(
|
||
request: ToolExecutionRequest,
|
||
definition: ToolDefinition,
|
||
): Promise<ToolExecutionResult> {
|
||
const startedAt = Date.now();
|
||
|
||
// 通知:Agent 调用开始
|
||
if (definition.toolType === 'agent_invocation') {
|
||
const agentName = request.toolName.replace('invoke_', '');
|
||
this.emitEvent({ type: 'agent_started', agentName, toolUseId: request.toolUseId });
|
||
}
|
||
|
||
this.emitEvent({ type: 'tool_started', toolName: request.toolName, toolUseId: request.toolUseId });
|
||
|
||
try {
|
||
// 带超时的执行
|
||
const output = await this.executeWithTimeout(
|
||
definition.executor(request.input),
|
||
definition.timeoutMs,
|
||
request.toolName,
|
||
);
|
||
|
||
const completedAt = Date.now();
|
||
const durationMs = completedAt - startedAt;
|
||
|
||
// 通知:完成
|
||
this.emitEvent({
|
||
type: 'tool_completed',
|
||
toolName: request.toolName,
|
||
toolUseId: request.toolUseId,
|
||
durationMs,
|
||
success: output.success,
|
||
});
|
||
|
||
if (definition.toolType === 'agent_invocation') {
|
||
const agentName = request.toolName.replace('invoke_', '');
|
||
this.emitEvent({ type: 'agent_completed', agentName, toolUseId: request.toolUseId, durationMs });
|
||
}
|
||
|
||
this.logger.log(`Tool ${request.toolName} completed in ${durationMs}ms (success: ${output.success})`);
|
||
|
||
return {
|
||
toolUseId: request.toolUseId,
|
||
toolName: request.toolName,
|
||
originalIndex: request.originalIndex,
|
||
output,
|
||
durationMs,
|
||
startedAt,
|
||
completedAt,
|
||
};
|
||
} catch (error) {
|
||
const completedAt = Date.now();
|
||
const durationMs = completedAt - startedAt;
|
||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||
|
||
this.logger.error(`Tool ${request.toolName} failed after ${durationMs}ms: ${errorMessage}`);
|
||
|
||
this.emitEvent({
|
||
type: 'tool_error',
|
||
toolName: request.toolName,
|
||
toolUseId: request.toolUseId,
|
||
error: errorMessage,
|
||
});
|
||
|
||
return {
|
||
toolUseId: request.toolUseId,
|
||
toolName: request.toolName,
|
||
originalIndex: request.originalIndex,
|
||
output: {
|
||
content: `工具执行失败: ${errorMessage}。请重试或使用其他方式获取信息。`,
|
||
success: false,
|
||
error: errorMessage,
|
||
},
|
||
durationMs,
|
||
startedAt,
|
||
completedAt,
|
||
};
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 带超时的执行
|
||
*/
|
||
private async executeWithTimeout(
|
||
promise: Promise<ToolExecutionOutput>,
|
||
timeoutMs: number,
|
||
toolName: string,
|
||
): Promise<ToolExecutionOutput> {
|
||
return new Promise<ToolExecutionOutput>((resolve, reject) => {
|
||
const timer = setTimeout(() => {
|
||
reject(new Error(`Tool ${toolName} timed out after ${timeoutMs}ms`));
|
||
}, timeoutMs);
|
||
|
||
promise
|
||
.then(result => {
|
||
clearTimeout(timer);
|
||
resolve(result);
|
||
})
|
||
.catch(error => {
|
||
clearTimeout(timer);
|
||
reject(error);
|
||
});
|
||
});
|
||
}
|
||
|
||
/**
|
||
* 创建错误结果
|
||
*/
|
||
private createErrorResult(request: ToolExecutionRequest, error: string): ToolExecutionResult {
|
||
return {
|
||
toolUseId: request.toolUseId,
|
||
toolName: request.toolName,
|
||
originalIndex: request.originalIndex,
|
||
output: {
|
||
content: `错误: ${error}`,
|
||
success: false,
|
||
error,
|
||
},
|
||
durationMs: 0,
|
||
startedAt: Date.now(),
|
||
completedAt: Date.now(),
|
||
};
|
||
}
|
||
|
||
/**
|
||
* 发送事件
|
||
*/
|
||
private emitEvent(event: QueueEvent): void {
|
||
if (this.eventCallback) {
|
||
try {
|
||
this.eventCallback(event);
|
||
} catch (e) {
|
||
this.logger.error(`Event callback error: ${e}`);
|
||
}
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 获取已注册工具列表(用于 Claude API)
|
||
*/
|
||
getClaudeTools(): Anthropic.Tool[] {
|
||
return Array.from(this.toolRegistry.values()).map(def => def.tool);
|
||
}
|
||
|
||
/**
|
||
* 获取工具定义(用于调试)
|
||
*/
|
||
getToolDefinition(name: string): ToolDefinition | undefined {
|
||
return this.toolRegistry.get(name);
|
||
}
|
||
}
|
||
```
|
||
|
||
## 5. Agent 调用执行器
|
||
|
||
当工具类型是 `agent_invocation` 时,executor 需要创建一个新的 Claude API 调用,运行 Specialist Agent 的 mini-loop。
|
||
|
||
```typescript
|
||
// agent-executor.ts
|
||
|
||
import Anthropic from '@anthropic-ai/sdk';
|
||
import { ToolExecutionOutput } from './tool-execution.types';
|
||
import { BaseSpecialistService } from '../specialists/base-specialist.service';
|
||
|
||
/**
|
||
* Agent 调用执行器工厂
|
||
*
|
||
* 为每个 Specialist Agent 创建一个 executor 函数,
|
||
* 该函数被注册到 ToolExecutionQueue 中。
|
||
*/
|
||
export function createAgentExecutor(
|
||
specialist: BaseSpecialistService,
|
||
eventCallback?: (event: { turn: number; maxTurns: number }) => void,
|
||
): (input: Record<string, unknown>) => Promise<ToolExecutionOutput> {
|
||
return async (input: Record<string, unknown>): Promise<ToolExecutionOutput> => {
|
||
const startTime = Date.now();
|
||
let totalInputTokens = 0;
|
||
let totalOutputTokens = 0;
|
||
const toolCallsMade: string[] = [];
|
||
|
||
try {
|
||
// Specialist Agent 的 mini-loop(最多 3 轮)
|
||
const result = await specialist.execute({
|
||
input,
|
||
maxTurns: 3,
|
||
onProgress: (turn, maxTurns) => {
|
||
eventCallback?.({ turn, maxTurns });
|
||
},
|
||
onTokenUsage: (input, output) => {
|
||
totalInputTokens += input;
|
||
totalOutputTokens += output;
|
||
},
|
||
onToolCall: (toolName) => {
|
||
toolCallsMade.push(toolName);
|
||
},
|
||
});
|
||
|
||
return {
|
||
content: result.output,
|
||
success: true,
|
||
agentMetadata: {
|
||
agentName: specialist.name,
|
||
turnsUsed: result.turnsUsed,
|
||
tokensUsed: { input: totalInputTokens, output: totalOutputTokens },
|
||
toolCallsMade,
|
||
durationMs: Date.now() - startTime,
|
||
},
|
||
};
|
||
} catch (error) {
|
||
return {
|
||
content: `Agent ${specialist.name} 执行失败: ${error instanceof Error ? error.message : String(error)}`,
|
||
success: false,
|
||
error: error instanceof Error ? error.message : String(error),
|
||
agentMetadata: {
|
||
agentName: specialist.name,
|
||
turnsUsed: 0,
|
||
tokensUsed: { input: totalInputTokens, output: totalOutputTokens },
|
||
toolCallsMade,
|
||
durationMs: Date.now() - startTime,
|
||
},
|
||
};
|
||
}
|
||
};
|
||
}
|
||
```
|
||
|
||
## 6. BaseSpecialistService 抽象类
|
||
|
||
```typescript
|
||
// base-specialist.service.ts
|
||
|
||
import Anthropic from '@anthropic-ai/sdk';
|
||
|
||
/**
|
||
* Specialist Agent 执行参数
|
||
*/
|
||
export interface SpecialistExecuteParams {
|
||
input: Record<string, unknown>;
|
||
maxTurns: number;
|
||
onProgress?: (turn: number, maxTurns: number) => void;
|
||
onTokenUsage?: (inputTokens: number, outputTokens: number) => void;
|
||
onToolCall?: (toolName: string) => void;
|
||
}
|
||
|
||
/**
|
||
* Specialist Agent 执行结果
|
||
*/
|
||
export interface SpecialistExecuteResult {
|
||
output: string; // 返回给 Coordinator 的文本
|
||
turnsUsed: number;
|
||
metadata?: Record<string, unknown>;
|
||
}
|
||
|
||
/**
|
||
* Specialist Agent 基类
|
||
*
|
||
* 每个 Specialist Agent 都继承此基类。
|
||
* 基类提供:
|
||
* 1. 标准的 mini-loop(最多 N 轮)
|
||
* 2. 工具执行
|
||
* 3. Token 追踪
|
||
* 4. 错误处理
|
||
*/
|
||
export abstract class BaseSpecialistService {
|
||
/** Agent 名称 */
|
||
abstract readonly name: string;
|
||
|
||
/** 使用的模型 */
|
||
abstract readonly model: string;
|
||
|
||
/** Agent 的 system prompt */
|
||
abstract readonly systemPrompt: string;
|
||
|
||
/** Agent 可用的工具 */
|
||
abstract readonly tools: Anthropic.Tool[];
|
||
|
||
/** 最大输出 tokens */
|
||
abstract readonly maxTokens: number;
|
||
|
||
protected client: Anthropic;
|
||
|
||
constructor(client: Anthropic) {
|
||
this.client = client;
|
||
}
|
||
|
||
/**
|
||
* 执行 Agent 的 mini-loop
|
||
*
|
||
* 标准流程:
|
||
* 1. 构建初始 messages(基于 input 参数)
|
||
* 2. 调用 Claude API
|
||
* 3. 如果有 tool_use → 执行工具 → 递归
|
||
* 4. 无 tool_use → 返回最终文本
|
||
* 5. 达到 maxTurns → 强制返回当前结果
|
||
*/
|
||
async execute(params: SpecialistExecuteParams): Promise<SpecialistExecuteResult> {
|
||
const messages: Anthropic.MessageParam[] = this.buildInitialMessages(params.input);
|
||
let turnsUsed = 0;
|
||
|
||
while (turnsUsed < params.maxTurns) {
|
||
turnsUsed++;
|
||
params.onProgress?.(turnsUsed, params.maxTurns);
|
||
|
||
// 调用 Claude API(非流式,Specialist 不需要流式输出)
|
||
const response = await this.client.messages.create({
|
||
model: this.model,
|
||
max_tokens: this.maxTokens,
|
||
system: this.systemPrompt,
|
||
messages,
|
||
tools: this.tools.length > 0 ? this.tools : undefined,
|
||
});
|
||
|
||
// 记录 token 使用
|
||
params.onTokenUsage?.(
|
||
response.usage.input_tokens,
|
||
response.usage.output_tokens,
|
||
);
|
||
|
||
// 提取文本和 tool_use
|
||
const textBlocks = response.content.filter(b => b.type === 'text');
|
||
const toolUses = response.content.filter(b => b.type === 'tool_use');
|
||
|
||
// 无工具调用 → 返回文本
|
||
if (toolUses.length === 0) {
|
||
const outputText = textBlocks.map(b => (b as Anthropic.TextBlock).text).join('\n');
|
||
return {
|
||
output: outputText,
|
||
turnsUsed,
|
||
metadata: this.extractMetadata(outputText),
|
||
};
|
||
}
|
||
|
||
// 有工具调用 → 执行工具
|
||
messages.push({ role: 'assistant', content: response.content });
|
||
|
||
const toolResults: Anthropic.ToolResultBlockParam[] = [];
|
||
for (const toolUse of toolUses) {
|
||
const tu = toolUse as Anthropic.ToolUseBlock;
|
||
params.onToolCall?.(tu.name);
|
||
|
||
const result = await this.executeSpecialistTool(
|
||
tu.name,
|
||
tu.input as Record<string, unknown>,
|
||
);
|
||
|
||
toolResults.push({
|
||
type: 'tool_result',
|
||
tool_use_id: tu.id,
|
||
content: typeof result === 'string' ? result : JSON.stringify(result),
|
||
});
|
||
}
|
||
|
||
messages.push({ role: 'user', content: toolResults });
|
||
}
|
||
|
||
// 达到 maxTurns,强制返回最后的文本
|
||
return {
|
||
output: `[Agent ${this.name} 达到最大轮次 (${params.maxTurns}),以下是当前结果]\n${this.extractLastText(messages)}`,
|
||
turnsUsed,
|
||
};
|
||
}
|
||
|
||
/**
|
||
* 构建初始 messages
|
||
* 子类可以覆盖以定制输入格式
|
||
*/
|
||
protected buildInitialMessages(input: Record<string, unknown>): Anthropic.MessageParam[] {
|
||
return [{
|
||
role: 'user',
|
||
content: JSON.stringify(input, null, 2),
|
||
}];
|
||
}
|
||
|
||
/**
|
||
* 执行 Specialist 自己的工具
|
||
* 子类必须实现
|
||
*/
|
||
protected abstract executeSpecialistTool(
|
||
toolName: string,
|
||
input: Record<string, unknown>,
|
||
): Promise<unknown>;
|
||
|
||
/**
|
||
* 从输出中提取结构化元数据
|
||
* 子类可选覆盖
|
||
*/
|
||
protected extractMetadata(output: string): Record<string, unknown> | undefined {
|
||
return undefined;
|
||
}
|
||
|
||
/**
|
||
* 提取 messages 中最后一条文本
|
||
*/
|
||
private extractLastText(messages: Anthropic.MessageParam[]): string {
|
||
for (let i = messages.length - 1; i >= 0; i--) {
|
||
const msg = messages[i];
|
||
if (msg.role === 'assistant') {
|
||
if (typeof msg.content === 'string') return msg.content;
|
||
if (Array.isArray(msg.content)) {
|
||
const textBlocks = msg.content.filter((b: any) => b.type === 'text');
|
||
return textBlocks.map((b: any) => b.text).join('\n');
|
||
}
|
||
}
|
||
}
|
||
return '(无输出)';
|
||
}
|
||
}
|
||
```
|
||
|
||
## 7. 执行流程图
|
||
|
||
```
|
||
Claude Coordinator 返回 tool_use blocks:
|
||
[invoke_policy_expert, invoke_case_analyst, save_user_memory, invoke_assessment_expert]
|
||
|
||
ToolExecutionQueue.executeAll():
|
||
|
||
Step 1: 遍历 tool_use 列表
|
||
├─ invoke_policy_expert → isConcurrencySafe: true → 加入 Batch A
|
||
├─ invoke_case_analyst → isConcurrencySafe: true → 加入 Batch A
|
||
├─ save_user_memory → isConcurrencySafe: false → 触发排空!
|
||
│ ├─ 先执行 Batch A(并行):
|
||
│ │ ├─ invoke_policy_expert ──→ PolicyExpert.execute() ──→ Claude API
|
||
│ │ └─ invoke_case_analyst ──→ CaseAnalyst.execute() ──→ Claude API
|
||
│ │ ↑ 这两个并行执行!
|
||
│ ├─ Batch A 完成
|
||
│ └─ 再执行 save_user_memory(串行):
|
||
│ └─ knowledgeClient.saveUserMemory() ──→ knowledge-service HTTP
|
||
│
|
||
└─ invoke_assessment_expert → isConcurrencySafe: true → 加入 Batch B
|
||
|
||
Step 2: 执行最后的 Batch B:
|
||
└─ invoke_assessment_expert ──→ AssessmentExpert.execute() ──→ Claude API
|
||
|
||
Step 3: 按 originalIndex 排序结果:
|
||
[0] invoke_policy_expert → "高才通B类要求..."
|
||
[1] invoke_case_analyst → "类似案例:张先生..."
|
||
[2] save_user_memory → "已保存用户信息"
|
||
[3] invoke_assessment_expert → "评估结果: {score: 75, ...}"
|
||
|
||
Step 4: 返回排序后的结果 → Coordinator 继续处理
|
||
```
|
||
|
||
## 8. 与 Agent Loop 的集成
|
||
|
||
```typescript
|
||
// coordinator-agent.service.ts (片段)
|
||
|
||
// 在 agentLoop 中使用 ToolExecutionQueue
|
||
|
||
async function* agentLoop(params: AgentLoopParams): AsyncGenerator<StreamEvent> {
|
||
// ... (Step 1-4: context injection, API call, stream collection)
|
||
|
||
// Step 5: 提取 tool_use blocks
|
||
const toolUses = assistantBlocks.filter(b => b.type === 'tool_use');
|
||
|
||
if (toolUses.length === 0) {
|
||
return; // 自然终止
|
||
}
|
||
|
||
// Step 6: 构建执行请求
|
||
const requests: ToolExecutionRequest[] = toolUses.map((tu, index) => ({
|
||
toolUseId: (tu as Anthropic.ToolUseBlock).id,
|
||
toolName: (tu as Anthropic.ToolUseBlock).name,
|
||
input: (tu as Anthropic.ToolUseBlock).input as Record<string, unknown>,
|
||
originalIndex: index,
|
||
}));
|
||
|
||
// Step 7: 设置事件回调 → 转发到前端
|
||
toolExecutionQueue.onEvent((event) => {
|
||
switch (event.type) {
|
||
case 'agent_started':
|
||
// yield 不能在回调中用,需要通过 EventEmitter 或 channel
|
||
streamChannel.push({ type: 'agent_start', agentName: event.agentName });
|
||
break;
|
||
case 'agent_completed':
|
||
streamChannel.push({ type: 'agent_complete', agentName: event.agentName, durationMs: event.durationMs });
|
||
break;
|
||
case 'tool_error':
|
||
streamChannel.push({ type: 'tool_error', toolName: event.toolName, error: event.error });
|
||
break;
|
||
}
|
||
});
|
||
|
||
// Step 8: 执行所有工具
|
||
const results = await toolExecutionQueue.executeAll(requests);
|
||
|
||
// Step 9: 构建 tool_results 消息
|
||
const toolResultBlocks: Anthropic.ToolResultBlockParam[] = results.map(r => ({
|
||
type: 'tool_result' as const,
|
||
tool_use_id: r.toolUseId,
|
||
content: r.output.content,
|
||
is_error: !r.output.success,
|
||
}));
|
||
|
||
// Step 10: 递归 — 将结果喂回 Coordinator
|
||
const newMessages = [
|
||
...params.messages,
|
||
{ role: 'assistant' as const, content: assistantBlocks },
|
||
{ role: 'user' as const, content: toolResultBlocks },
|
||
];
|
||
|
||
yield* agentLoop({ ...params, messages: newMessages });
|
||
}
|
||
```
|
||
|
||
## 9. 错误处理策略
|
||
|
||
### 9.1 工具级错误处理
|
||
|
||
| 错误类型 | 处理方式 | 返回给 Coordinator |
|
||
|----------|---------|-------------------|
|
||
| **超时** | 超过 `timeoutMs` | `"工具执行超时,请重试或使用其他方式获取信息。"` |
|
||
| **API 错误** (knowledge-service 不可用) | 捕获异常 | `"知识库服务暂时不可用,请根据已知信息回答。"` |
|
||
| **Agent 内部错误** (Specialist Claude API 失败) | 捕获异常 | `"专家 Agent 暂时不可用: {error}. 请直接回答用户。"` |
|
||
| **输入验证失败** | 早期返回 | `"工具输入参数无效: {details}"` |
|
||
| **Rate Limit** | 指数退避重试 1 次 | 重试成功则正常返回,否则返回错误 |
|
||
| **未知工具** | 跳过 | `"未知工具: {toolName}"` |
|
||
|
||
### 9.2 批次级错误处理
|
||
|
||
```typescript
|
||
/**
|
||
* Promise.allSettled 确保批次中一个工具的失败不会导致整个批次失败。
|
||
*
|
||
* 示例:
|
||
* Batch = [invoke_policy_expert, invoke_case_analyst]
|
||
*
|
||
* 如果 invoke_policy_expert 超时:
|
||
* - invoke_case_analyst 正常返回其结果
|
||
* - invoke_policy_expert 返回错误消息
|
||
* - Coordinator 根据有效的 case_analyst 结果继续生成回复
|
||
*
|
||
* Coordinator 的 Prompt 中包含指引:
|
||
* "如果某个工具调用返回错误,请根据其他可用信息继续回复用户,
|
||
* 不要向用户暴露技术错误细节。"
|
||
*/
|
||
```
|
||
|
||
### 9.3 Agent 执行重试
|
||
|
||
```typescript
|
||
/**
|
||
* Specialist Agent 的重试策略(在 base-specialist.service.ts 中)
|
||
*/
|
||
private async callClaudeWithRetry(
|
||
params: Anthropic.MessageCreateParams,
|
||
maxRetries: number = 1,
|
||
): Promise<Anthropic.Message> {
|
||
let lastError: Error | null = null;
|
||
|
||
for (let attempt = 0; attempt <= maxRetries; attempt++) {
|
||
try {
|
||
return await this.client.messages.create(params);
|
||
} catch (error) {
|
||
lastError = error instanceof Error ? error : new Error(String(error));
|
||
|
||
if (this.isRateLimitError(error) && attempt < maxRetries) {
|
||
const retryAfter = this.extractRetryAfter(error);
|
||
await this.sleep(retryAfter || 1000 * (attempt + 1));
|
||
continue;
|
||
}
|
||
|
||
if (this.isOverloadedError(error) && attempt < maxRetries) {
|
||
// 降级到 Haiku 模型
|
||
return await this.client.messages.create({
|
||
...params,
|
||
model: 'claude-haiku-4-20250514',
|
||
});
|
||
}
|
||
|
||
throw error;
|
||
}
|
||
}
|
||
|
||
throw lastError;
|
||
}
|
||
```
|
||
|
||
## 10. 性能优化
|
||
|
||
### 10.1 并行度分析
|
||
|
||
```
|
||
典型场景 1: 用户问政策 + 系统需要保存记忆
|
||
tool_uses: [invoke_policy_expert, invoke_memory_manager]
|
||
|
||
顺序执行: 3s + 2s = 5s
|
||
并发执行: invoke_memory_manager 不是 safe,但在 invoke_policy_expert 之后
|
||
→ Batch [invoke_policy_expert] (3s) → invoke_memory_manager (2s) = 5s
|
||
这种情况并发无优势
|
||
|
||
典型场景 2: 评估用户 + 查找案例 + 获取策略建议
|
||
tool_uses: [invoke_assessment_expert, invoke_case_analyst, invoke_strategist]
|
||
|
||
顺序执行: 4s + 2s + 2s = 8s
|
||
并发执行: 全部 safe → Batch 并行 = max(4s, 2s, 2s) = 4s
|
||
节省 50% 时间!
|
||
|
||
典型场景 3: 保存记忆 + 评估 + 生成支付
|
||
tool_uses: [save_user_memory, invoke_assessment_expert, generate_payment]
|
||
|
||
顺序执行: 1s + 4s + 2s = 7s
|
||
我们的算法:
|
||
save_user_memory (not safe) → 单独执行 (1s)
|
||
invoke_assessment_expert (safe) → Batch B
|
||
generate_payment (not safe) → 先执行 Batch B (4s), 再单独 (2s)
|
||
总计: 1s + 4s + 2s = 7s (这种情况并发无优势)
|
||
```
|
||
|
||
### 10.2 超时配置建议
|
||
|
||
| 工具类型 | 建议超时 | 原因 |
|
||
|----------|---------|------|
|
||
| Agent 调用 (Sonnet) | 30s | Sonnet 单次调用约 3-8s,Agent 最多 3 轮 |
|
||
| Agent 调用 (Haiku) | 15s | Haiku 更快 |
|
||
| search_knowledge | 10s | HTTP + 向量检索 |
|
||
| save_user_memory | 5s | HTTP + 写入 |
|
||
| generate_payment | 10s | 外部支付 API |
|
||
| get_current_datetime | 1s | 本地计算 |
|
||
| web_search | 15s | 外部搜索 API |
|
||
|
||
### 10.3 Token 成本追踪
|
||
|
||
```typescript
|
||
/**
|
||
* 在 ToolExecutionQueue 层面追踪所有 Agent 调用的 token 开销
|
||
*/
|
||
interface AgentCostSummary {
|
||
agentName: string;
|
||
inputTokens: number;
|
||
outputTokens: number;
|
||
estimatedCostUsd: number;
|
||
turnsUsed: number;
|
||
durationMs: number;
|
||
}
|
||
|
||
/**
|
||
* 每次 executeAll 后可以获取本轮的成本汇总
|
||
*/
|
||
getCostSummary(results: ToolExecutionResult[]): AgentCostSummary[] {
|
||
return results
|
||
.filter(r => r.output.agentMetadata)
|
||
.map(r => {
|
||
const meta = r.output.agentMetadata!;
|
||
const inputCost = meta.tokensUsed.input * 0.003 / 1000; // Sonnet input price
|
||
const outputCost = meta.tokensUsed.output * 0.015 / 1000; // Sonnet output price
|
||
return {
|
||
agentName: meta.agentName,
|
||
inputTokens: meta.tokensUsed.input,
|
||
outputTokens: meta.tokensUsed.output,
|
||
estimatedCostUsd: inputCost + outputCost,
|
||
turnsUsed: meta.turnsUsed,
|
||
durationMs: meta.durationMs,
|
||
};
|
||
});
|
||
}
|
||
```
|
||
|
||
## 11. 工具注册示例
|
||
|
||
```typescript
|
||
// tool-registry.ts - 工具注册
|
||
|
||
export function registerAllTools(
|
||
queue: ToolExecutionQueue,
|
||
specialists: SpecialistServices,
|
||
directTools: DirectToolServices,
|
||
): void {
|
||
|
||
// ======== Agent 调用工具 ========
|
||
|
||
queue.registerTool('invoke_policy_expert', {
|
||
tool: {
|
||
name: 'invoke_policy_expert',
|
||
description: '调用政策专家 Agent,查询香港移民政策的详细信息、条件要求、申请流程等。',
|
||
input_schema: {
|
||
type: 'object',
|
||
properties: {
|
||
query: { type: 'string', description: '要查询的政策问题' },
|
||
category: { type: 'string', description: '移民类别(可选):TTPS/QMAS/GEP/IANG/CIES/TECHTAS' },
|
||
},
|
||
required: ['query'],
|
||
},
|
||
},
|
||
isConcurrencySafe: true,
|
||
toolType: 'agent_invocation',
|
||
timeoutMs: 30000,
|
||
estimatedDurationMs: 5000,
|
||
executor: createAgentExecutor(specialists.policyExpert),
|
||
});
|
||
|
||
queue.registerTool('invoke_assessment_expert', {
|
||
tool: {
|
||
name: 'invoke_assessment_expert',
|
||
description: '调用评估专家 Agent,根据用户信息评估移民资格和推荐方案。',
|
||
input_schema: {
|
||
type: 'object',
|
||
properties: {
|
||
userInfo: { type: 'object', description: '用户信息(age, education, workYears 等)' },
|
||
targetCategories: {
|
||
type: 'array',
|
||
items: { type: 'string' },
|
||
description: '要评估的目标类别(可选,默认评估所有)',
|
||
},
|
||
},
|
||
required: ['userInfo'],
|
||
},
|
||
},
|
||
isConcurrencySafe: true,
|
||
toolType: 'agent_invocation',
|
||
timeoutMs: 30000,
|
||
estimatedDurationMs: 6000,
|
||
executor: createAgentExecutor(specialists.assessmentExpert),
|
||
});
|
||
|
||
queue.registerTool('invoke_memory_manager', {
|
||
tool: {
|
||
name: 'invoke_memory_manager',
|
||
description: '调用记忆管理 Agent,提取和保存用户信息到长期记忆。',
|
||
input_schema: {
|
||
type: 'object',
|
||
properties: {
|
||
action: { type: 'string', enum: ['extract', 'save', 'load'] },
|
||
userMessage: { type: 'string', description: '用户消息(extract 时必须)' },
|
||
assistantMessage: { type: 'string', description: '助手回复(extract 时必须)' },
|
||
memoryData: { type: 'object', description: '要保存的记忆(save 时必须)' },
|
||
},
|
||
required: ['action'],
|
||
},
|
||
},
|
||
isConcurrencySafe: false, // 可能写入!
|
||
toolType: 'agent_invocation',
|
||
timeoutMs: 15000,
|
||
estimatedDurationMs: 3000,
|
||
executor: createAgentExecutor(specialists.memoryManager),
|
||
});
|
||
|
||
// ======== 直接工具 ========
|
||
|
||
queue.registerTool('generate_payment', {
|
||
tool: {
|
||
name: 'generate_payment',
|
||
description: '生成付费评估服务的支付链接。',
|
||
input_schema: {
|
||
type: 'object',
|
||
properties: {
|
||
service: { type: 'string', enum: ['detailed_assessment'] },
|
||
userId: { type: 'string' },
|
||
},
|
||
required: ['service', 'userId'],
|
||
},
|
||
},
|
||
isConcurrencySafe: false, // 产生订单,有副作用
|
||
toolType: 'direct_tool',
|
||
timeoutMs: 10000,
|
||
estimatedDurationMs: 2000,
|
||
executor: directTools.generatePayment,
|
||
});
|
||
|
||
queue.registerTool('get_current_datetime', {
|
||
tool: {
|
||
name: 'get_current_datetime',
|
||
description: '获取当前日期和时间。',
|
||
input_schema: { type: 'object', properties: {} },
|
||
},
|
||
isConcurrencySafe: true,
|
||
toolType: 'direct_tool',
|
||
timeoutMs: 1000,
|
||
estimatedDurationMs: 5,
|
||
executor: async () => ({
|
||
content: new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Hong_Kong' }),
|
||
success: true,
|
||
}),
|
||
});
|
||
|
||
// ... 注册其他工具
|
||
}
|
||
```
|
||
|
||
## 12. 与旧架构的对比
|
||
|
||
| 方面 | 旧架构 (ClaudeAgentServiceV2) | 新架构 (ToolExecutionQueue) |
|
||
|------|-------------------------------|--------------------------|
|
||
| 工具执行 | `for` 循环串行逐个执行 | 智能分批:safe 并行,unsafe 串行 |
|
||
| 执行超时 | 无超时控制 | 每个工具独立超时 |
|
||
| 错误隔离 | 一个工具失败 → 整个循环中断 | 一个失败不影响其他 |
|
||
| 事件通知 | 无实时进度 | 完整的 QueueEvent 流 |
|
||
| Agent 子调用 | N/A(无 Agent 概念) | 每个 Agent 独立 mini-loop |
|
||
| 成本追踪 | 仅追踪 Coordinator 的 tokens | 追踪每个 Agent 的 tokens |
|
||
| 扩展性 | 改 `executeTool()` switch 语句 | `registerTool()` 声明式注册 |
|