# 08 - 动态上下文注入系统 (Context Injection System) ## 1. 设计哲学 借鉴 Claude Code 的 attachment 系统:在每次 Claude API 调用之前,**动态组装最相关的上下文**注入到 messages 中。 核心原则: - **按需注入**:不是所有上下文每次都注入,根据对话阶段和用户消息动态选择 - **优先级驱动**:当 context window 接近上限时,按优先级淘汰低优先级上下文 - **缓存友好**:高频不变的上下文利用 Prompt Caching 降低成本 - **格式统一**:所有上下文以统一的 XML-like 标签注入,方便 LLM 识别 ## 2. 上下文类型总览 | 序号 | 类型标识 | 名称 | 数据源 | 优先级 | Cache TTL | 预估 Tokens | |------|----------|------|--------|--------|-----------|-------------| | 1 | `user_memory` | 用户历史记忆 | knowledge-service `/api/v1/memory/user/{id}/top` | P0 (最高) | 60s | 200-500 | | 2 | `collected_info` | 已收集的用户信息 | 本地 ConsultingState.collectedInfo | P0 | 0 (实时) | 100-300 | | 3 | `conversation_stats` | 对话统计 | 本地计算 (messages + timestamps) | P1 | 0 (实时) | 50-80 | | 4 | `assessment_result` | 评估结果 | 本地 ConsultingState.assessmentResult | P0 | 0 (实时) | 200-400 | | 5 | `relevant_knowledge` | 相关知识预检索 | knowledge-service `/api/v1/knowledge/retrieve/prompt` | P1 | 30s | 300-800 | | 6 | `similar_experiences` | 系统经验 | knowledge-service `/api/v1/memory/experience/search` | P2 | 120s | 150-400 | | 7 | `device_context` | 设备/地区信息 | ConversationGateway 传入的 deviceInfo | P2 | 对话生命周期 | 30-60 | | 8 | `active_agents_history` | 最近 Agent 调用记录 | 本地 agentLoop 内存 | P1 | 0 (实时) | 100-300 | **优先级说明**: - **P0 (Critical)**:绝不丢弃,即使 context window 紧张也必须保留 - **P1 (Important)**:正常保留,极端情况下可压缩 - **P2 (Nice-to-have)**:context window 紧张时首先丢弃 ## 3. TypeScript 类型定义 ```typescript // context.types.ts /** * 上下文类型枚举 */ export enum ContextType { USER_MEMORY = 'user_memory', COLLECTED_INFO = 'collected_info', CONVERSATION_STATS = 'conversation_stats', ASSESSMENT_RESULT = 'assessment_result', RELEVANT_KNOWLEDGE = 'relevant_knowledge', SIMILAR_EXPERIENCES = 'similar_experiences', DEVICE_CONTEXT = 'device_context', ACTIVE_AGENTS_HISTORY = 'active_agents_history', } /** * 上下文优先级 */ export enum ContextPriority { CRITICAL = 0, // P0: 绝不丢弃 IMPORTANT = 1, // P1: 正常保留 NICE_TO_HAVE = 2, // P2: 可丢弃 } /** * 单条上下文块 */ export interface ContextBlock { type: ContextType; priority: ContextPriority; content: string; // 格式化后的文本内容 estimatedTokens: number; // 预估 token 数 timestamp: number; // 生成时间(用于缓存判断) cacheTTL: number; // 缓存有效期 (ms), 0 = 不缓存 isEmpty: boolean; // 是否为空(空的不注入) } /** * 上下文注入请求 */ export interface ContextInjectionRequest { userId: string; conversationId: string; messages: Anthropic.MessageParam[]; // 当前对话历史 currentUserMessage: string; // 最新用户消息 consultingState?: ConsultingState; // 当前咨询状态 deviceInfo?: DeviceInfo; // 设备信息 agentHistory?: AgentExecutionRecord[]; // 最近 Agent 调用记录 maxContextTokens?: number; // 上下文 token 上限 (default: 180000) } /** * 上下文注入结果 */ export interface ContextInjectionResult { messages: Anthropic.MessageParam[]; // 注入上下文后的 messages injectedContexts: ContextType[]; // 实际注入了哪些上下文 droppedContexts: ContextType[]; // 被丢弃的上下文(因 token 限制) totalContextTokens: number; // 注入的上下文总 token 数 compacted: boolean; // 是否触发了压缩 } /** * Agent 执行记录(用于 active_agents_history) */ export interface AgentExecutionRecord { agentName: string; invokedAt: number; // timestamp durationMs: number; inputSummary: string; // 调用参数摘要 outputSummary: string; // 返回结果摘要(截断到 200 字符) tokensUsed: number; } /** * 上下文缓存条目 */ interface ContextCacheEntry { block: ContextBlock; createdAt: number; expiresAt: number; // createdAt + cacheTTL } ``` ## 4. 每种上下文的详细设计 ### 4.1 user_memory — 用户历史记忆 **数据源**:knowledge-service 的 Memory API ```typescript // API 调用 const memories = await knowledgeClient.getUserTopMemories(userId, 10); // 返回: UserMemory[] — 按 importance 排序的用户记忆 // 备选:语义搜索(与当前消息相关的记忆) const relevantMemories = await knowledgeClient.searchUserMemories({ userId, query: currentUserMessage, limit: 5, }); ``` **注入格式**: ```xml 以下是该用户的历史记录摘要(来自之前的对话): - [FACT] 用户35岁,本科学历,毕业于浙江大学 (重要度: 90) - [FACT] 在互联网行业工作8年,目前年薪60万 (重要度: 85) - [INTENT] 用户对高才通B类感兴趣 (重要度: 80) - [PREFERENCE] 用户偏好文字交流,回复较简短 (重要度: 60) - [FACT] 已婚,有一个3岁的孩子 (重要度: 70) ``` **Freshness Policy**: - Cache TTL: 60s(用户记忆在一次对话中变化不频繁) - 首次调用时强制刷新 - 当 Memory Manager Agent 保存了新记忆后,立即 invalidate 缓存 **逻辑**: ```typescript async function buildUserMemoryContext( userId: string, currentMessage: string, cache: Map, ): Promise { const cacheKey = `user_memory:${userId}`; const cached = cache.get(cacheKey); if (cached && Date.now() < cached.expiresAt) { return cached.block; } // 并行获取:Top 记忆 + 语义相关记忆 const [topMemories, relevantMemories] = await Promise.all([ knowledgeClient.getUserTopMemories(userId, 5), knowledgeClient.searchUserMemories({ userId, query: currentMessage, limit: 3 }), ]); // 合并去重 const allMemories = deduplicateMemories([...topMemories, ...relevantMemories]); if (allMemories.length === 0) { return { type: ContextType.USER_MEMORY, priority: ContextPriority.CRITICAL, content: '', estimatedTokens: 0, timestamp: Date.now(), cacheTTL: 60000, isEmpty: true }; } const content = formatMemoryBlock(allMemories); const block: ContextBlock = { type: ContextType.USER_MEMORY, priority: ContextPriority.CRITICAL, content, estimatedTokens: estimateTokens(content), timestamp: Date.now(), cacheTTL: 60000, isEmpty: false, }; cache.set(cacheKey, { block, createdAt: Date.now(), expiresAt: Date.now() + 60000 }); return block; } ``` ### 4.2 collected_info — 已收集的用户信息 **数据源**:本地 `ConsultingState.collectedInfo`(存储在 Conversation 实体中) ```typescript // 直接从 state 读取,无需 API 调用 const collectedInfo = consultingState?.collectedInfo || {}; ``` **注入格式**: ```xml 本次对话已收集的用户信息: - 年龄: 35 - 学历: 本科 - 毕业院校: 浙江大学 - 工作年限: 8年 - 行业: 互联网/IT - 年收入: 约60万人民币 - 婚姻状况: 已婚 尚未收集: 语言能力、是否有香港联系、资产情况 ``` **Freshness Policy**: - Cache TTL: 0(实时,每次从 state 读取) - 零开销:纯本地数据,无 API 调用 **逻辑**: ```typescript function buildCollectedInfoContext(consultingState?: ConsultingState): ContextBlock { const info = consultingState?.collectedInfo || {}; const entries = Object.entries(info); if (entries.length === 0) { return emptyBlock(ContextType.COLLECTED_INFO, ContextPriority.CRITICAL); } // 映射 key 到中文标签 const labelMap: Record = { age: '年龄', education: '学历', university: '毕业院校', workYears: '工作年限', industry: '行业', annualIncome: '年收入', maritalStatus: '婚姻状况', children: '子女', languageAbility: '语言能力', hasHKConnection: '香港联系', assets: '资产情况', nationality: '国籍', }; const collected = entries .map(([key, value]) => `- ${labelMap[key] || key}: ${value}`) .join('\n'); // 计算未收集的核心字段 const coreKeys = ['age', 'education', 'workYears', 'annualIncome', 'languageAbility', 'hasHKConnection']; const uncollected = coreKeys .filter(key => info[key] === undefined) .map(key => labelMap[key] || key) .join('、'); const content = `\n本次对话已收集的用户信息:\n${collected}\n${uncollected ? `尚未收集: ${uncollected}` : '核心信息已收集完毕'}\n`; return { type: ContextType.COLLECTED_INFO, priority: ContextPriority.CRITICAL, content, estimatedTokens: estimateTokens(content), timestamp: Date.now(), cacheTTL: 0, isEmpty: false, }; } ``` ### 4.3 conversation_stats — 对话统计 **数据源**:本地计算 ```typescript // 从 messages 数组和 conversation 实体计算 const stats = { totalTurns: messages.filter(m => m.role === 'user').length, duration: Date.now() - conversation.createdAt, currentStage: consultingState?.currentStageId, stageHistory: consultingState?.stageHistory, messagesInCurrentStage: consultingState?.stageTurnCount, }; ``` **注入格式**: ```xml 对话统计: - 总轮次: 8轮 (用户发言8次) - 对话时长: 12分钟 - 当前阶段: 信息收集 (第3轮) - 阶段历程: 开场破冰(2轮) → 需求了解(3轮) → 信息收集(进行中) - 用户回复特点: 平均每次30字,回复较简短 ``` **Freshness Policy**: - Cache TTL: 0(实时计算,无 API 开销) **逻辑**: ```typescript function buildConversationStatsContext( messages: Anthropic.MessageParam[], consultingState?: ConsultingState, conversationCreatedAt?: Date, ): ContextBlock { const userMessages = messages.filter(m => m.role === 'user'); const totalTurns = userMessages.length; // 计算对话时长 const durationMs = conversationCreatedAt ? Date.now() - conversationCreatedAt.getTime() : 0; const durationMinutes = Math.round(durationMs / 60000); // 分析用户回复特点 const avgUserMsgLength = userMessages.reduce((sum, m) => { const text = typeof m.content === 'string' ? m.content : ''; return sum + text.length; }, 0) / Math.max(userMessages.length, 1); const replyStyle = avgUserMsgLength < 20 ? '极简短,可能在移动设备' : avgUserMsgLength < 50 ? '较简短,需引导展开' : avgUserMsgLength < 150 ? '中等长度,表达较充分' : '详细表达,用户积极参与'; // 构建阶段历程 const stageNames: Record = { greeting: '开场破冰', needs_discovery: '需求了解', info_collection: '信息收集', assessment: '评估', recommendation: '推荐', objection_handling: '异议处理', conversion: '转化促成', handoff: '专家对接', }; let stageHistoryStr = '无'; if (consultingState?.stageHistory) { stageHistoryStr = consultingState.stageHistory .map(h => `${stageNames[h.stageId] || h.stageId}(${h.turnsInStage}轮)`) .join(' → '); } const parts = [ '', '对话统计:', `- 总轮次: ${totalTurns}轮`, `- 对话时长: ${durationMinutes}分钟`, `- 当前阶段: ${stageNames[consultingState?.currentStageId || ''] || '未知'} (第${(consultingState?.stageTurnCount || 0) + 1}轮)`, `- 阶段历程: ${stageHistoryStr}`, `- 用户回复特点: 平均每次${Math.round(avgUserMsgLength)}字,${replyStyle}`, '', ]; const content = parts.join('\n'); return { type: ContextType.CONVERSATION_STATS, priority: ContextPriority.IMPORTANT, content, estimatedTokens: estimateTokens(content), timestamp: Date.now(), cacheTTL: 0, isEmpty: false, }; } ``` ### 4.4 assessment_result — 评估结果 **数据源**:本地 `ConsultingState.assessmentResult` **注入格式**: ```xml 用户移民资格初步评估结果: 推荐方案: 高才通B类、优才计划 匹配度: 75分/100分 优势: - 名校毕业+工作经验充足 - 工作经验丰富(8年) - 年龄优势明显(35岁) 注意事项: - 年薪未达高才通A类标准 - 建议准备详细的工作业绩证明 提示: 这是基于初步信息的预评估。付费详细评估(¥99)将提供完整的评分报告和申请建议。 ``` **Freshness Policy**: - Cache TTL: 0(实时,一旦生成不会变化直到重新评估) - 仅当 `consultingState.assessmentResult` 存在时注入 **逻辑**: ```typescript function buildAssessmentResultContext(consultingState?: ConsultingState): ContextBlock { const result = consultingState?.assessmentResult; if (!result) { return emptyBlock(ContextType.ASSESSMENT_RESULT, ContextPriority.CRITICAL); } const parts = [ '', '用户移民资格初步评估结果:', `推荐方案: ${result.recommendedPrograms.join('、')}`, `匹配度: ${result.suitabilityScore}分/100分`, ]; if (result.highlights.length > 0) { parts.push('优势:'); result.highlights.forEach(h => parts.push(` - ${h}`)); } if (result.concerns.length > 0) { parts.push('注意事项:'); result.concerns.forEach(c => parts.push(` - ${c}`)); } parts.push('提示: 这是基于初步信息的预评估。付费详细评估(¥99)将提供完整的评分报告和申请建议。'); parts.push(''); const content = parts.join('\n'); return { type: ContextType.ASSESSMENT_RESULT, priority: ContextPriority.CRITICAL, content, estimatedTokens: estimateTokens(content), timestamp: Date.now(), cacheTTL: 0, isEmpty: false, }; } ``` ### 4.5 relevant_knowledge — 相关知识预检索 **数据源**:knowledge-service RAG API ```typescript // 基于用户最新消息做语义检索 const knowledge = await knowledgeClient.retrieveForPrompt({ query: currentUserMessage, userId, category: detectCategory(currentUserMessage), // 可选的类别过滤 }); ``` **注入格式**: ```xml 以下是与用户最新问题相关的知识库内容(仅供参考,请结合实际情况回答): 来源:高才通计划申请指南 (相似度: 0.92) > 高才通计划(TTPS)B类要求:申请人须持有全球百强大学颁授的学士学位, > 并在申请前五年内累积至少三年工作经验。百强大学名单以最新公布的 > QS/Times/USNEWS/SJTU 四大排名综合为准... 来源:优才计划评分标准 (相似度: 0.85) > 综合计分制下,学历最高可得45分(博士/双硕士),年龄18-39岁可得30分... ``` **Freshness Policy**: - Cache TTL: 30s(同一用户短时间内的消息通常主题相近) - Cache Key: `relevant_knowledge:${userId}:${hashOf(currentMessage)}` - 仅当用户消息涉及具体政策/条件/流程时注入 **条件注入逻辑**: ```typescript async function buildRelevantKnowledgeContext( userId: string, currentMessage: string, cache: Map, ): Promise { // 简单的关键词检测,判断是否需要知识检索 const needsKnowledge = shouldRetrieveKnowledge(currentMessage); if (!needsKnowledge) { return emptyBlock(ContextType.RELEVANT_KNOWLEDGE, ContextPriority.IMPORTANT); } const cacheKey = `relevant_knowledge:${userId}:${simpleHash(currentMessage)}`; const cached = cache.get(cacheKey); if (cached && Date.now() < cached.expiresAt) { return cached.block; } const knowledge = await knowledgeClient.retrieveForPrompt({ query: currentMessage, userId, }); if (!knowledge) { return emptyBlock(ContextType.RELEVANT_KNOWLEDGE, ContextPriority.IMPORTANT); } const content = `\n以下是与用户最新问题相关的知识库内容(仅供参考,请结合实际情况回答):\n\n${knowledge}\n`; const block: ContextBlock = { type: ContextType.RELEVANT_KNOWLEDGE, priority: ContextPriority.IMPORTANT, content, estimatedTokens: estimateTokens(content), timestamp: Date.now(), cacheTTL: 30000, isEmpty: false, }; cache.set(cacheKey, { block, createdAt: Date.now(), expiresAt: Date.now() + 30000 }); return block; } /** * 判断是否需要知识检索 * 简单的关键词 + 模式匹配,避免每次都调 RAG API */ function shouldRetrieveKnowledge(message: string): boolean { const policyKeywords = [ '高才通', 'TTPS', '优才', 'QMAS', '专才', 'GEP', '留学', 'IANG', '投资', 'CIES', '科技', 'TECHTAS', '条件', '要求', '资格', '申请', '流程', '材料', '费用', '时间', '打分', '评分', '签证', '续签', '永居', '居留', '配偶', '受养人', '政策', '最新', ]; const lowerMessage = message.toLowerCase(); return policyKeywords.some(kw => lowerMessage.includes(kw.toLowerCase())); } ``` ### 4.6 similar_experiences — 系统经验 **数据源**:knowledge-service Experience API(来自 evolution-service 的经验积累) ```typescript const experiences = await knowledgeClient.searchExperiences({ query: currentUserMessage, activeOnly: true, limit: 3, }); ``` **注入格式**: ```xml 系统积累的相关经验(可参考但不必逐字照搬): 1. [对话策略] 当用户表示"我还在考虑"时,不要急于推销,先询问具体顾虑点, 再针对性解答。成功率比直接推进高40%。 2. [案例参考] 类似背景的用户(35岁+互联网+本科)通过优才计划成功的案例较多, 建议重点介绍优才综合计分制的优势。 3. [异议处理] 对于"费用太贵"的异议,可以从投资回报角度切入: 香港税收优势每年可节省X万... ``` **Freshness Policy**: - Cache TTL: 120s(系统经验更新频率低) - 仅在信息收集、评估推荐、异议处理阶段注入 ### 4.7 device_context — 设备/地区信息 **数据源**:ConversationGateway 在 WebSocket 连接时传入的 deviceInfo ```typescript // 从 conversation 实体的 deviceInfo 字段获取 const deviceInfo = conversation.deviceInfo; ``` **注入格式**: ```xml 用户环境信息: - 设备类型: 移动设备 (iPhone) - 时区: UTC+8 (中国标准时间) - 地区: 上海 (根据IP推断) - 当前时间: 2025-01-15 周三 下午2:30 提示: 用户在手机上咨询,回复可适当简短。下午时段用户可能在工作间隙咨询。 ``` **Freshness Policy**: - Cache TTL: 整个对话生命周期(连接后设备信息不变) - 仅在首次消息或每5轮注入一次(避免浪费 token) ### 4.8 active_agents_history — 最近 Agent 调用记录 **数据源**:agentLoop 内存中维护的 `AgentExecutionRecord[]` **注入格式**: ```xml 最近调用的专家 Agent(本次对话中): 1. [2分钟前] Policy Expert - 查询高才通B类条件 → 返回了详细的学历和工作经验要求 2. [1分钟前] Memory Manager - 保存用户信息(年龄35,浙大本科) → 已成功保存 注意: 避免重复调用相同 Agent 查询相同信息。如需进一步了解,可直接使用已获取的信息。 ``` **Freshness Policy**: - Cache TTL: 0(实时,每次 API 调用都从内存读取最新记录) - 仅保留最近 5 条记录 - 每条记录包含 agent 名称、时间、输入/输出摘要 ## 5. ContextInjector 服务实现 ```typescript // context-injector.service.ts import { Injectable } from '@nestjs/common'; import { KnowledgeClientService } from '../knowledge/knowledge-client.service'; import { ContextType, ContextPriority, ContextBlock, ContextInjectionRequest, ContextInjectionResult, ContextCacheEntry, } from './context.types'; import Anthropic from '@anthropic-ai/sdk'; /** * Token 估算常量 * 中文约 1.5-2 tokens/字符,英文约 0.25 tokens/word * 保守取 2 tokens/字符 */ const TOKENS_PER_CHAR = 2; /** * Claude Sonnet 的 context window 上限 */ const MAX_CONTEXT_WINDOW = 200000; /** * 为 output 预留的 tokens */ const RESERVED_OUTPUT_TOKENS = 4096; /** * 为 system prompt 预留的 tokens(coordinator prompt 约 8000 tokens) */ const RESERVED_SYSTEM_TOKENS = 10000; /** * 上下文注入触发的压缩阈值(占 context window 的百分比) */ const COMPACTION_THRESHOLD = 0.80; @Injectable() export class ContextInjectorService { /** * 内存缓存 — 按 conversationId 隔离 * Map> */ private cacheStore = new Map>(); constructor( private knowledgeClient: KnowledgeClientService, ) {} /** * 主入口:注入上下文 * * 流程: * 1. 并行获取所有上下文块 * 2. 过滤空块 * 3. 按优先级排序 * 4. 检查 token 预算,必要时丢弃低优先级块 * 5. 组装成 context message 注入到 messages 数组 * 6. 如果总 token 仍超限,触发 auto-compaction */ async inject(request: ContextInjectionRequest): Promise { const cache = this.getOrCreateCache(request.conversationId); const maxTokens = request.maxContextTokens || MAX_CONTEXT_WINDOW; const availableForContext = maxTokens - RESERVED_OUTPUT_TOKENS - RESERVED_SYSTEM_TOKENS; // ======== Step 1: 并行获取所有上下文 ======== const contextBlocks = await this.fetchAllContexts(request, cache); // ======== Step 2: 过滤空块 ======== const nonEmptyBlocks = contextBlocks.filter(b => !b.isEmpty); // ======== Step 3: 按优先级排序(P0 → P1 → P2)======== nonEmptyBlocks.sort((a, b) => a.priority - b.priority); // ======== Step 4: Token 预算裁剪 ======== const existingMessageTokens = this.estimateMessagesTokens(request.messages); let remainingTokenBudget = availableForContext - existingMessageTokens; const injected: ContextBlock[] = []; const dropped: ContextType[] = []; for (const block of nonEmptyBlocks) { if (block.estimatedTokens <= remainingTokenBudget) { injected.push(block); remainingTokenBudget -= block.estimatedTokens; } else if (block.priority === ContextPriority.CRITICAL) { // P0 级别:即使超限也要注入,但触发压缩 injected.push(block); remainingTokenBudget -= block.estimatedTokens; } else { dropped.push(block.type); } } // ======== Step 5: 组装 context message ======== const contextMessage = this.buildContextMessage(injected); const enrichedMessages = this.injectContextMessage(request.messages, contextMessage); // ======== Step 6: Auto-compaction ======== const totalTokens = existingMessageTokens + injected.reduce((sum, b) => sum + b.estimatedTokens, 0); const utilizationRatio = totalTokens / availableForContext; let compacted = false; let finalMessages = enrichedMessages; if (utilizationRatio > COMPACTION_THRESHOLD) { finalMessages = await this.autoCompact(enrichedMessages, availableForContext); compacted = true; } return { messages: finalMessages, injectedContexts: injected.map(b => b.type), droppedContexts: dropped, totalContextTokens: injected.reduce((sum, b) => sum + b.estimatedTokens, 0), compacted, }; } /** * 并行获取所有上下文块 */ private async fetchAllContexts( request: ContextInjectionRequest, cache: Map, ): Promise { // 所有获取操作并行执行,互不依赖 const [ userMemory, collectedInfo, conversationStats, assessmentResult, relevantKnowledge, similarExperiences, deviceContext, agentsHistory, ] = await Promise.all([ this.buildUserMemoryContext(request.userId, request.currentUserMessage, cache), this.buildCollectedInfoContext(request.consultingState), this.buildConversationStatsContext(request.messages, request.consultingState), this.buildAssessmentResultContext(request.consultingState), this.buildRelevantKnowledgeContext(request.userId, request.currentUserMessage, cache), this.buildSimilarExperiencesContext(request.currentUserMessage, request.consultingState, cache), this.buildDeviceContext(request.deviceInfo, request.messages.length), this.buildAgentsHistoryContext(request.agentHistory), ]); return [ userMemory, collectedInfo, conversationStats, assessmentResult, relevantKnowledge, similarExperiences, deviceContext, agentsHistory, ]; } /** * 将上下文块组装成一条 context message * * 设计选择:将所有上下文放在一条 system-level 的 user message 中, * 放在对话历史的最前面(在 previousMessages 之前)。 * * 这样做的好处: * 1. 不污染 system prompt(system prompt 走 cache_control) * 2. 上下文随每次调用动态变化,不影响缓存命中率 * 3. LLM 能清晰区分 "上下文信息" 和 "用户对话" */ private buildContextMessage(blocks: ContextBlock[]): string { if (blocks.length === 0) return ''; const parts = [ '=== 动态上下文信息(系统自动注入,非用户输入) ===', '', ]; for (const block of blocks) { parts.push(block.content); parts.push(''); } parts.push('=== 以上为系统上下文,以下为用户对话 ==='); return parts.join('\n'); } /** * 将 context message 注入到 messages 数组 * * 注入位置:作为第一条 user message,紧接着一条空的 assistant 确认 * 这样不会打断后续的 user/assistant 交替模式 */ private injectContextMessage( messages: Anthropic.MessageParam[], contextMessage: string, ): Anthropic.MessageParam[] { if (!contextMessage) return messages; // 方案 A:注入为 messages 数组的前置消息对 // 必须保持 user → assistant 交替 return [ { role: 'user', content: contextMessage }, { role: 'assistant', content: '我已了解以上上下文信息,将在回复中参考使用。' }, ...messages, ]; } // ========== 各类上下文构建方法(详见 4.1-4.8)========== // 这里省略具体实现,见上方各小节 private async buildUserMemoryContext(userId: string, currentMessage: string, cache: Map): Promise { /* ... */ } private buildCollectedInfoContext(state?: any): ContextBlock { /* ... */ } private buildConversationStatsContext(messages: any[], state?: any): ContextBlock { /* ... */ } private buildAssessmentResultContext(state?: any): ContextBlock { /* ... */ } private async buildRelevantKnowledgeContext(userId: string, message: string, cache: Map): Promise { /* ... */ } private async buildSimilarExperiencesContext(message: string, state: any, cache: Map): Promise { /* ... */ } private buildDeviceContext(deviceInfo?: any, messageCount?: number): ContextBlock { /* ... */ } private buildAgentsHistoryContext(history?: any[]): ContextBlock { /* ... */ } // ========== 工具方法 ========== /** * 估算 messages 数组的 token 数 */ private estimateMessagesTokens(messages: Anthropic.MessageParam[]): number { let total = 0; for (const msg of messages) { if (typeof msg.content === 'string') { total += msg.content.length * TOKENS_PER_CHAR; } else if (Array.isArray(msg.content)) { for (const block of msg.content) { if (block.type === 'text') { total += (block as any).text.length * TOKENS_PER_CHAR; } else if (block.type === 'image') { total += 1000; // 图片约 1000 tokens } } } } return total; } private getOrCreateCache(conversationId: string): Map { if (!this.cacheStore.has(conversationId)) { this.cacheStore.set(conversationId, new Map()); } return this.cacheStore.get(conversationId)!; } /** * 清理对话缓存(对话结束时调用) */ clearCache(conversationId: string): void { this.cacheStore.delete(conversationId); } /** * 使特定上下文类型的缓存失效 * 例如 Memory Manager 保存了新记忆后,invalidate user_memory 缓存 */ invalidateContext(conversationId: string, contextType: ContextType): void { const cache = this.cacheStore.get(conversationId); if (!cache) return; // 删除该类型的所有缓存条目 for (const [key, entry] of cache.entries()) { if (key.startsWith(contextType)) { cache.delete(key); } } } } ``` ## 6. Auto-Compaction 策略 当消息总 token 数接近 context window 上限时(超过 80%),自动触发压缩。 ### 6.1 压缩算法 ```typescript /** * Auto-compaction: 自动压缩对话历史 * * 策略(按优先级执行,每步检查是否降到阈值以下): * 1. 移除 tool_result 中的详细内容,保留摘要 * 2. 压缩早期对话为摘要(保留最近 10 轮完整) * 3. 截断过长的单条消息(超过 2000 字符的) * 4. 移除 P2 上下文块 * 5. 压缩 P1 上下文块 */ async autoCompact( messages: Anthropic.MessageParam[], tokenBudget: number, ): Promise { let currentTokens = this.estimateMessagesTokens(messages); let compactedMessages = [...messages]; // ---- 策略 1: 压缩 tool_result 内容 ---- if (currentTokens > tokenBudget * COMPACTION_THRESHOLD) { compactedMessages = this.compactToolResults(compactedMessages); currentTokens = this.estimateMessagesTokens(compactedMessages); } // ---- 策略 2: 早期对话摘要化 ---- if (currentTokens > tokenBudget * COMPACTION_THRESHOLD) { compactedMessages = await this.summarizeEarlyMessages(compactedMessages, 10); currentTokens = this.estimateMessagesTokens(compactedMessages); } // ---- 策略 3: 截断过长消息 ---- if (currentTokens > tokenBudget * COMPACTION_THRESHOLD) { compactedMessages = this.truncateLongMessages(compactedMessages, 2000); currentTokens = this.estimateMessagesTokens(compactedMessages); } return compactedMessages; } /** * 压缩 tool_result:将详细的 JSON 结果替换为一行摘要 */ private compactToolResults(messages: Anthropic.MessageParam[]): Anthropic.MessageParam[] { return messages.map(msg => { if (msg.role !== 'user' || !Array.isArray(msg.content)) return msg; const compactedContent = (msg.content as any[]).map(block => { if (block.type === 'tool_result') { const originalContent = typeof block.content === 'string' ? block.content : JSON.stringify(block.content); // 如果 tool_result 超过 500 字符,压缩为摘要 if (originalContent.length > 500) { return { ...block, content: `[已压缩] ${originalContent.slice(0, 200)}... (原始长度: ${originalContent.length}字符)`, }; } } return block; }); return { ...msg, content: compactedContent }; }); } /** * 将早期对话(保留最近 N 轮完整)压缩为摘要 * 使用 Haiku 模型做快速摘要,成本极低 */ private async summarizeEarlyMessages( messages: Anthropic.MessageParam[], keepRecentTurns: number, ): Promise { // 找到上下文注入消息对(前2条)和对话消息 const contextPair = messages.slice(0, 2); // context injection pair const dialogMessages = messages.slice(2); // 计算需要保留的最近消息数(每轮 = 1 user + 1 assistant = 2条) const keepCount = keepRecentTurns * 2; if (dialogMessages.length <= keepCount) { return messages; // 没有足够的早期消息需要压缩 } const earlyMessages = dialogMessages.slice(0, -keepCount); const recentMessages = dialogMessages.slice(-keepCount); // 用 Haiku 快速摘要早期对话 const earlyText = earlyMessages .map(m => `${m.role}: ${typeof m.content === 'string' ? m.content : '[complex content]'}`) .join('\n'); const summaryResponse = await this.anthropic.messages.create({ model: 'claude-haiku-4-20250514', max_tokens: 500, messages: [{ role: 'user', content: `请用200字以内概括以下对话的要点(保留关键信息如用户背景、已讨论的移民方案等):\n\n${earlyText}`, }], }); const summary = summaryResponse.content[0].type === 'text' ? summaryResponse.content[0].text : ''; // 组装:上下文 + 摘要 + 最近消息 return [ ...contextPair, { role: 'user', content: `[以下是之前对话的摘要]\n${summary}\n[摘要结束,以下是最近的对话]` }, { role: 'assistant', content: '好的,我了解了之前的讨论内容,我们继续。' }, ...recentMessages, ]; } ``` ### 6.2 Compaction 触发时机 ``` 消息 token 数 / 可用 token 预算: 0% ──────── 50% ──────── 80% ──────── 95% ──── 100% │ 正常区域 │ 压缩区域 │ 危险区域 │ │ │ │ │ │ 不做任何处理 │ 触发策略1-3 │ 触发策略4-5│ │ │ 压缩日志 │ 强制压缩 │ │ │ 通知前端 │ 可能截断 │ ``` ## 7. 完整数据流 ``` 用户发送消息 │ ▼ ConversationService.sendMessage() │ ▼ CoordinatorAgentService.sendMessage() │ ▼ ┌───────────────────────────┐ │ ContextInjector.inject() │ │ │ │ ┌─────────────────────┐ │ │ │ Promise.all([ │ │ │ │ fetchUserMemory, │──┼──→ knowledge-service (HTTP) │ │ fetchKnowledge, │──┼──→ knowledge-service (HTTP) │ │ fetchExperiences, │──┼──→ knowledge-service (HTTP) │ │ buildLocalCtx... │ │ (本地计算,无 I/O) │ │ ]) │ │ │ └──────────┬──────────┘ │ │ │ │ │ ┌──────────▼──────────┐ │ │ │ Token Budget Check │ │ │ │ 排序 → 裁剪 → 注入 │ │ │ └──────────┬──────────┘ │ │ │ │ │ ┌──────────▼──────────┐ │ │ │ Auto-Compaction │ │ │ │ (如果 > 80%) │ │ │ └──────────┬──────────┘ │ │ │ │ │ Return: enrichedMessages │ └─────────────┬─────────────┘ │ ▼ Claude API 调用 (system prompt + enrichedMessages + tools) ``` ## 8. 缓存失效策略 | 事件 | 失效的缓存 | 方式 | |------|-----------|------| | Memory Manager 保存了新记忆 | `user_memory:${userId}` | `invalidateContext()` 主动失效 | | 评估完成 | `assessment_result` | 直接更新 state,无需缓存 | | 对话结束 | 该对话的所有缓存 | `clearCache(conversationId)` | | 用户长时间不活跃 (>5min) | 所有外部数据缓存 | TTL 自动过期 | | 知识库更新 | `relevant_knowledge` | TTL 自动过期(30s)| ## 9. 性能指标 | 指标 | 目标 | 备注 | |------|------|------| | 上下文注入耗时 | < 200ms | 并行获取 + 缓存命中时 < 10ms | | 缓存命中率 | > 70% | 同一对话中连续消息通常命中 | | Auto-compaction 耗时 | < 2s | 使用 Haiku 做摘要时 | | 上下文 token 占比 | 5-15% | 占总 context window 的比例 | | API 调用次数 | 0-3 per injection | 缓存命中时为 0 | ## 10. 与旧架构的对比 | 方面 | 旧架构 (ClaudeAgentServiceV2) | 新架构 (ContextInjector) | |------|-------------------------------|--------------------------| | 上下文来源 | `buildSystemPrompt()` 固定拼接 | 8 种动态上下文,按需注入 | | 缓存 | 无 | 多级缓存,TTL 策略 | | Token 管理 | `calculateMaxTokens()` 限制输出 | 完整的输入+输出 token 预算管理 | | 压缩 | 固定取 last 20 messages | 自动压缩,摘要化早期对话 | | 用户记忆 | `context.userMemory` 字符串数组 | 结构化 UserMemory,语义搜索 | | 知识注入 | 工具调用时才检索 | 预检索,主动注入相关知识 | | 经验利用 | `getAccumulatedExperience()` 简单拼接 | 语义匹配,按场景注入 | | 设备适配 | `buildDeviceContext()` 仅首次 | 持续参考,影响回复风格 |