iconsulting/docs/architecture/01-coordinator-agent.md

8.5 KiB
Raw Blame History

01 - Coordinator Agent (主协调器) 设计详解

1. 核心职责

Coordinator 是唯一直接面对用户的 Agent。它的职责是

  1. 理解用户意图 — 无需额外的 IntentClassifierLLM 自行判断
  2. 编排专家 Agent — 决定调用哪些专家、传什么参数
  3. 综合信息 — 将多个专家的结果整合为自然的对话回复
  4. 直接回答简单问题 — 闲聊、简单确认等不需要启动专家 Agent
  5. 管理对话节奏 — 控制提问频率、推进对话进程
  6. 维护咨询状态 — 在回复中自报当前阶段和收集到的信息

2. 模型与参数

{
  model: 'claude-sonnet-4-20250514',
  max_tokens: 4096,
  // Prompt Caching: system prompt 使用 cache_control
  system: [
    { type: 'text', text: coordinatorPrompt, cache_control: { type: 'ephemeral' } }
  ],
}

3. 可用工具Tools

Coordinator 拥有两类工具:

3.1 Agent 调用工具(核心)

工具名 描述 输入参数 返回值
invoke_policy_expert 查询移民政策详情 {query: string, category?: string} 政策解读文本
invoke_assessment_expert 评估用户移民资格 {userInfo: object, targetCategories?: string[]} 评估报告 JSON
invoke_strategist 获取对话策略建议 {conversationSummary: string, currentStage: string} 策略建议文本
invoke_objection_handler 处理用户异议 {objection: string, userContext: string} 回应方案文本
invoke_case_analyst 查找类似案例 {userProfile: object, targetCategory: string} 案例分析文本
invoke_memory_manager 管理用户记忆 {action: 'load'|'save'|'extract', ...} 记忆数据

3.2 直接工具(不经过 Agent

工具名 描述 来源
generate_payment 生成支付链接 payment-service
get_current_datetime 获取当前时间 本地
web_search 搜索网页 Google API
get_exchange_rate 获取汇率 API / 缓存
fetch_immigration_news 获取移民新闻 API / 缓存

4. Agent Loop 控制流

// agent-loop.ts - 核心控制流伪代码

interface AgentLoopParams {
  messages: Message[];           // 对话历史
  systemPrompt: string[];        // 系统提示(含缓存控制)
  tools: ToolDefinition[];       // 所有可用工具
  maxTurns: number;              // 最大递归轮次 (default: 15)
  maxBudgetUsd: number;          // 单次对话最大成本 (default: 0.50)
  conversationId: string;
  userId: string;
  abortSignal?: AbortSignal;     // 用户中断信号
}

async function* agentLoop(params: AgentLoopParams): AsyncGenerator<StreamEvent> {
  let turnCount = 0;
  let totalCost = 0;

  // === Guard: 超限检查 ===
  if (turnCount >= params.maxTurns) {
    yield { type: 'error', code: 'MAX_TURNS_REACHED' };
    return;
  }
  if (totalCost >= params.maxBudgetUsd) {
    yield { type: 'error', code: 'BUDGET_EXCEEDED' };
    return;
  }
  if (params.abortSignal?.aborted) {
    yield { type: 'error', code: 'USER_ABORTED' };
    return;
  }

  // === Step 1: 动态上下文注入 ===
  const enrichedMessages = await contextInjector.inject({
    messages: params.messages,
    userId: params.userId,
    conversationId: params.conversationId,
  });

  // === Step 2: 上下文压缩(如果接近 token 上限)===
  const compactedMessages = await autoCompactIfNeeded(enrichedMessages);

  // === Step 3: 调用 Claude API流式===
  const stream = anthropic.messages.stream({
    model: 'claude-sonnet-4-20250514',
    system: params.systemPrompt,
    messages: compactedMessages,
    tools: params.tools,
    max_tokens: 4096,
  });

  // === Step 4: 流式收集响应 ===
  const assistantBlocks: ContentBlock[] = [];
  for await (const event of stream) {
    // 流式传输文本给前端
    if (event.type === 'content_block_delta' && event.delta.type === 'text_delta') {
      yield { type: 'text', content: event.delta.text };
    }
    // 收集所有 content blocks
    collectBlocks(event, assistantBlocks);
  }

  // 记录 token 使用
  const usage = stream.finalMessage().usage;
  totalCost += calculateCost(usage);
  yield { type: 'usage', usage, cost: totalCost };

  // === Step 5: 提取 tool_use blocks ===
  const toolUses = assistantBlocks.filter(b => b.type === 'tool_use');

  // === Step 6: 无工具调用 → 自然结束 ===
  if (toolUses.length === 0) {
    // 从回复中提取状态更新
    const stateUpdate = extractConsultingState(assistantBlocks);
    if (stateUpdate) {
      yield { type: 'state_update', state: stateUpdate };
    }
    return; // 递归终止
  }

  // === Step 7: 执行工具(并发队列)===
  const toolResults: ToolResult[] = [];

  for (const toolUse of toolUses) {
    // 通知前端 Agent 开始工作
    if (toolUse.name.startsWith('invoke_')) {
      yield { type: 'agent_start', agentName: toolUse.name.replace('invoke_', '') };
    }
  }

  // 通过 ToolExecutionQueue 并发执行
  const results = await toolExecutionQueue.executeAll(toolUses);

  for (const result of results) {
    toolResults.push(result);
    // 通知前端 Agent 完成
    if (result.toolName.startsWith('invoke_')) {
      yield { type: 'agent_complete', agentName: result.toolName.replace('invoke_', '') };
    }
  }

  // === Step 8: 递归 — 把工具结果喂回 Coordinator ===
  turnCount++;
  const newMessages = [
    ...params.messages,
    { role: 'assistant', content: assistantBlocks },
    ...toolResults.map(r => ({
      role: 'user',
      content: [{ type: 'tool_result', tool_use_id: r.id, content: r.output }]
    })),
  ];

  yield* agentLoop({
    ...params,
    messages: newMessages,
  });
}

5. 递归终止条件

条件 触发 行为
无 tool_use LLM 认为回复完成 自然终止,返回文本
maxTurns 超限 turnCount >= 15 强制终止yield error
maxBudgetUsd 超限 累计 API 成本超限 强制终止yield error
abortSignal 用户点击停止 立即终止
API 错误 Claude API 返回错误 尝试 fallback model否则终止

6. Coordinator System Prompt 结构

详见 11-prompt-templates.md,核心结构:

# 身份定义 (Identity)
你是 iConsulting 的资深香港移民顾问...

# 你的专家团队 (Your Expert Team)
你拥有 6 个专家助手,通过工具调用来获取他们的帮助...
  - Policy Expert: 调用时机、输入输出格式
  - Assessment Expert: ...
  - ...

# 对话策略 (Conversation Strategy)
## 咨询阶段(你自行判断当前处于哪个阶段)
  - 开场阶段:目标、行为、判断条件
  - 需求了解:...
  - 信息收集:...
  - 评估推荐:...
  - 异议处理:...
  - 转化促成:...

# 回复规范 (Response Guidelines)
  - 语气:专业但亲和
  - 长度:根据场景自适应
  - 结构:每次回复以推进性问题结尾
  - 禁忌不承诺成功率、不一次问超过2个问题

# 状态报告 (State Reporting)
  每次回复结束时附加 <consulting_state> 标签...

# 六大移民类别详解 (Immigration Categories)
  QMAS/GEP/IANG/TTPS/CIES/TECHTAS 的详细条件...

# 业务规则 (Business Rules)
  付费评估服务说明、专家对接流程...

7. 与 ConversationService 的接口

// CoordinatorAgentService 对外暴露的接口
// 与旧的 ClaudeAgentServiceV2 保持相同的 AsyncGenerator 模式

interface CoordinatorAgentService {
  sendMessage(params: {
    conversationContext: ConversationContext;
    userMessage: string;
    attachments?: Attachment[];
    userId: string;
    conversationId: string;
    deviceInfo?: DeviceInfo;
  }): AsyncGenerator<StreamEvent>;
}

ConversationService 只需将注入从 ClaudeAgentServiceV2 切换到 CoordinatorAgentService,内部聚合逻辑不变。

8. 错误处理

// Coordinator 的错误处理策略
try {
  yield* agentLoop(params);
} catch (error) {
  if (error instanceof RateLimitError) {
    // 等待后重试
    await sleep(error.retryAfter);
    yield* agentLoop(params);
  } else if (error instanceof ModelOverloadedError) {
    // 降级到 Haiku
    yield* agentLoop({ ...params, model: 'claude-haiku' });
  } else {
    // 返回友好错误消息
    yield {
      type: 'text',
      content: '抱歉,系统遇到了临时问题。请稍后重试,或联系我们的人工顾问。'
    };
    yield { type: 'error', code: 'INTERNAL_ERROR', message: error.message };
  }
}