37 KiB
37 KiB
08 - 动态上下文注入系统 (Context Injection System)
1. 设计哲学
借鉴 Claude Code 的 attachment 系统:在每次 Claude API 调用之前,动态组装最相关的上下文注入到 messages 中。
核心原则:
- 按需注入:不是所有上下文每次都注入,根据对话阶段和用户消息动态选择
- 优先级驱动:当 context window 接近上限时,按优先级淘汰低优先级上下文
- 缓存友好:高频不变的上下文利用 Prompt Caching 降低成本
- 格式统一:所有上下文以统一的 XML-like 标签注入,方便 LLM 识别
2. 上下文类型总览
| 序号 | 类型标识 | 名称 | 数据源 | 优先级 | Cache TTL | 预估 Tokens |
|---|---|---|---|---|---|---|
| 1 | user_memory |
用户历史记忆 | knowledge-service /api/v1/memory/user/{id}/top |
P0 (最高) | 60s | 200-500 |
| 2 | collected_info |
已收集的用户信息 | 本地 ConsultingState.collectedInfo | P0 | 0 (实时) | 100-300 |
| 3 | conversation_stats |
对话统计 | 本地计算 (messages + timestamps) | P1 | 0 (实时) | 50-80 |
| 4 | assessment_result |
评估结果 | 本地 ConsultingState.assessmentResult | P0 | 0 (实时) | 200-400 |
| 5 | relevant_knowledge |
相关知识预检索 | knowledge-service /api/v1/knowledge/retrieve/prompt |
P1 | 30s | 300-800 |
| 6 | similar_experiences |
系统经验 | knowledge-service /api/v1/memory/experience/search |
P2 | 120s | 150-400 |
| 7 | device_context |
设备/地区信息 | ConversationGateway 传入的 deviceInfo | P2 | 对话生命周期 | 30-60 |
| 8 | active_agents_history |
最近 Agent 调用记录 | 本地 agentLoop 内存 | P1 | 0 (实时) | 100-300 |
优先级说明:
- P0 (Critical):绝不丢弃,即使 context window 紧张也必须保留
- P1 (Important):正常保留,极端情况下可压缩
- P2 (Nice-to-have):context window 紧张时首先丢弃
3. TypeScript 类型定义
// context.types.ts
/**
* 上下文类型枚举
*/
export enum ContextType {
USER_MEMORY = 'user_memory',
COLLECTED_INFO = 'collected_info',
CONVERSATION_STATS = 'conversation_stats',
ASSESSMENT_RESULT = 'assessment_result',
RELEVANT_KNOWLEDGE = 'relevant_knowledge',
SIMILAR_EXPERIENCES = 'similar_experiences',
DEVICE_CONTEXT = 'device_context',
ACTIVE_AGENTS_HISTORY = 'active_agents_history',
}
/**
* 上下文优先级
*/
export enum ContextPriority {
CRITICAL = 0, // P0: 绝不丢弃
IMPORTANT = 1, // P1: 正常保留
NICE_TO_HAVE = 2, // P2: 可丢弃
}
/**
* 单条上下文块
*/
export interface ContextBlock {
type: ContextType;
priority: ContextPriority;
content: string; // 格式化后的文本内容
estimatedTokens: number; // 预估 token 数
timestamp: number; // 生成时间(用于缓存判断)
cacheTTL: number; // 缓存有效期 (ms), 0 = 不缓存
isEmpty: boolean; // 是否为空(空的不注入)
}
/**
* 上下文注入请求
*/
export interface ContextInjectionRequest {
userId: string;
conversationId: string;
messages: Anthropic.MessageParam[]; // 当前对话历史
currentUserMessage: string; // 最新用户消息
consultingState?: ConsultingState; // 当前咨询状态
deviceInfo?: DeviceInfo; // 设备信息
agentHistory?: AgentExecutionRecord[]; // 最近 Agent 调用记录
maxContextTokens?: number; // 上下文 token 上限 (default: 180000)
}
/**
* 上下文注入结果
*/
export interface ContextInjectionResult {
messages: Anthropic.MessageParam[]; // 注入上下文后的 messages
injectedContexts: ContextType[]; // 实际注入了哪些上下文
droppedContexts: ContextType[]; // 被丢弃的上下文(因 token 限制)
totalContextTokens: number; // 注入的上下文总 token 数
compacted: boolean; // 是否触发了压缩
}
/**
* Agent 执行记录(用于 active_agents_history)
*/
export interface AgentExecutionRecord {
agentName: string;
invokedAt: number; // timestamp
durationMs: number;
inputSummary: string; // 调用参数摘要
outputSummary: string; // 返回结果摘要(截断到 200 字符)
tokensUsed: number;
}
/**
* 上下文缓存条目
*/
interface ContextCacheEntry {
block: ContextBlock;
createdAt: number;
expiresAt: number; // createdAt + cacheTTL
}
4. 每种上下文的详细设计
4.1 user_memory — 用户历史记忆
数据源:knowledge-service 的 Memory API
// API 调用
const memories = await knowledgeClient.getUserTopMemories(userId, 10);
// 返回: UserMemory[] — 按 importance 排序的用户记忆
// 备选:语义搜索(与当前消息相关的记忆)
const relevantMemories = await knowledgeClient.searchUserMemories({
userId,
query: currentUserMessage,
limit: 5,
});
注入格式:
<user_memory>
以下是该用户的历史记录摘要(来自之前的对话):
- [FACT] 用户35岁,本科学历,毕业于浙江大学 (重要度: 90)
- [FACT] 在互联网行业工作8年,目前年薪60万 (重要度: 85)
- [INTENT] 用户对高才通B类感兴趣 (重要度: 80)
- [PREFERENCE] 用户偏好文字交流,回复较简短 (重要度: 60)
- [FACT] 已婚,有一个3岁的孩子 (重要度: 70)
</user_memory>
Freshness Policy:
- Cache TTL: 60s(用户记忆在一次对话中变化不频繁)
- 首次调用时强制刷新
- 当 Memory Manager Agent 保存了新记忆后,立即 invalidate 缓存
逻辑:
async function buildUserMemoryContext(
userId: string,
currentMessage: string,
cache: Map<string, ContextCacheEntry>,
): Promise<ContextBlock> {
const cacheKey = `user_memory:${userId}`;
const cached = cache.get(cacheKey);
if (cached && Date.now() < cached.expiresAt) {
return cached.block;
}
// 并行获取:Top 记忆 + 语义相关记忆
const [topMemories, relevantMemories] = await Promise.all([
knowledgeClient.getUserTopMemories(userId, 5),
knowledgeClient.searchUserMemories({ userId, query: currentMessage, limit: 3 }),
]);
// 合并去重
const allMemories = deduplicateMemories([...topMemories, ...relevantMemories]);
if (allMemories.length === 0) {
return { type: ContextType.USER_MEMORY, priority: ContextPriority.CRITICAL, content: '', estimatedTokens: 0, timestamp: Date.now(), cacheTTL: 60000, isEmpty: true };
}
const content = formatMemoryBlock(allMemories);
const block: ContextBlock = {
type: ContextType.USER_MEMORY,
priority: ContextPriority.CRITICAL,
content,
estimatedTokens: estimateTokens(content),
timestamp: Date.now(),
cacheTTL: 60000,
isEmpty: false,
};
cache.set(cacheKey, { block, createdAt: Date.now(), expiresAt: Date.now() + 60000 });
return block;
}
4.2 collected_info — 已收集的用户信息
数据源:本地 ConsultingState.collectedInfo(存储在 Conversation 实体中)
// 直接从 state 读取,无需 API 调用
const collectedInfo = consultingState?.collectedInfo || {};
注入格式:
<collected_info>
本次对话已收集的用户信息:
- 年龄: 35
- 学历: 本科
- 毕业院校: 浙江大学
- 工作年限: 8年
- 行业: 互联网/IT
- 年收入: 约60万人民币
- 婚姻状况: 已婚
尚未收集: 语言能力、是否有香港联系、资产情况
</collected_info>
Freshness Policy:
- Cache TTL: 0(实时,每次从 state 读取)
- 零开销:纯本地数据,无 API 调用
逻辑:
function buildCollectedInfoContext(consultingState?: ConsultingState): ContextBlock {
const info = consultingState?.collectedInfo || {};
const entries = Object.entries(info);
if (entries.length === 0) {
return emptyBlock(ContextType.COLLECTED_INFO, ContextPriority.CRITICAL);
}
// 映射 key 到中文标签
const labelMap: Record<string, string> = {
age: '年龄', education: '学历', university: '毕业院校',
workYears: '工作年限', industry: '行业', annualIncome: '年收入',
maritalStatus: '婚姻状况', children: '子女', languageAbility: '语言能力',
hasHKConnection: '香港联系', assets: '资产情况', nationality: '国籍',
};
const collected = entries
.map(([key, value]) => `- ${labelMap[key] || key}: ${value}`)
.join('\n');
// 计算未收集的核心字段
const coreKeys = ['age', 'education', 'workYears', 'annualIncome', 'languageAbility', 'hasHKConnection'];
const uncollected = coreKeys
.filter(key => info[key] === undefined)
.map(key => labelMap[key] || key)
.join('、');
const content = `<collected_info>\n本次对话已收集的用户信息:\n${collected}\n${uncollected ? `尚未收集: ${uncollected}` : '核心信息已收集完毕'}\n</collected_info>`;
return {
type: ContextType.COLLECTED_INFO,
priority: ContextPriority.CRITICAL,
content,
estimatedTokens: estimateTokens(content),
timestamp: Date.now(),
cacheTTL: 0,
isEmpty: false,
};
}
4.3 conversation_stats — 对话统计
数据源:本地计算
// 从 messages 数组和 conversation 实体计算
const stats = {
totalTurns: messages.filter(m => m.role === 'user').length,
duration: Date.now() - conversation.createdAt,
currentStage: consultingState?.currentStageId,
stageHistory: consultingState?.stageHistory,
messagesInCurrentStage: consultingState?.stageTurnCount,
};
注入格式:
<conversation_stats>
对话统计:
- 总轮次: 8轮 (用户发言8次)
- 对话时长: 12分钟
- 当前阶段: 信息收集 (第3轮)
- 阶段历程: 开场破冰(2轮) → 需求了解(3轮) → 信息收集(进行中)
- 用户回复特点: 平均每次30字,回复较简短
</conversation_stats>
Freshness Policy:
- Cache TTL: 0(实时计算,无 API 开销)
逻辑:
function buildConversationStatsContext(
messages: Anthropic.MessageParam[],
consultingState?: ConsultingState,
conversationCreatedAt?: Date,
): ContextBlock {
const userMessages = messages.filter(m => m.role === 'user');
const totalTurns = userMessages.length;
// 计算对话时长
const durationMs = conversationCreatedAt
? Date.now() - conversationCreatedAt.getTime()
: 0;
const durationMinutes = Math.round(durationMs / 60000);
// 分析用户回复特点
const avgUserMsgLength = userMessages.reduce((sum, m) => {
const text = typeof m.content === 'string' ? m.content : '';
return sum + text.length;
}, 0) / Math.max(userMessages.length, 1);
const replyStyle = avgUserMsgLength < 20 ? '极简短,可能在移动设备'
: avgUserMsgLength < 50 ? '较简短,需引导展开'
: avgUserMsgLength < 150 ? '中等长度,表达较充分'
: '详细表达,用户积极参与';
// 构建阶段历程
const stageNames: Record<string, string> = {
greeting: '开场破冰', needs_discovery: '需求了解', info_collection: '信息收集',
assessment: '评估', recommendation: '推荐', objection_handling: '异议处理',
conversion: '转化促成', handoff: '专家对接',
};
let stageHistoryStr = '无';
if (consultingState?.stageHistory) {
stageHistoryStr = consultingState.stageHistory
.map(h => `${stageNames[h.stageId] || h.stageId}(${h.turnsInStage}轮)`)
.join(' → ');
}
const parts = [
'<conversation_stats>',
'对话统计:',
`- 总轮次: ${totalTurns}轮`,
`- 对话时长: ${durationMinutes}分钟`,
`- 当前阶段: ${stageNames[consultingState?.currentStageId || ''] || '未知'} (第${(consultingState?.stageTurnCount || 0) + 1}轮)`,
`- 阶段历程: ${stageHistoryStr}`,
`- 用户回复特点: 平均每次${Math.round(avgUserMsgLength)}字,${replyStyle}`,
'</conversation_stats>',
];
const content = parts.join('\n');
return {
type: ContextType.CONVERSATION_STATS,
priority: ContextPriority.IMPORTANT,
content,
estimatedTokens: estimateTokens(content),
timestamp: Date.now(),
cacheTTL: 0,
isEmpty: false,
};
}
4.4 assessment_result — 评估结果
数据源:本地 ConsultingState.assessmentResult
注入格式:
<assessment_result>
用户移民资格初步评估结果:
推荐方案: 高才通B类、优才计划
匹配度: 75分/100分
优势:
- 名校毕业+工作经验充足
- 工作经验丰富(8年)
- 年龄优势明显(35岁)
注意事项:
- 年薪未达高才通A类标准
- 建议准备详细的工作业绩证明
提示: 这是基于初步信息的预评估。付费详细评估(¥99)将提供完整的评分报告和申请建议。
</assessment_result>
Freshness Policy:
- Cache TTL: 0(实时,一旦生成不会变化直到重新评估)
- 仅当
consultingState.assessmentResult存在时注入
逻辑:
function buildAssessmentResultContext(consultingState?: ConsultingState): ContextBlock {
const result = consultingState?.assessmentResult;
if (!result) {
return emptyBlock(ContextType.ASSESSMENT_RESULT, ContextPriority.CRITICAL);
}
const parts = [
'<assessment_result>',
'用户移民资格初步评估结果:',
`推荐方案: ${result.recommendedPrograms.join('、')}`,
`匹配度: ${result.suitabilityScore}分/100分`,
];
if (result.highlights.length > 0) {
parts.push('优势:');
result.highlights.forEach(h => parts.push(` - ${h}`));
}
if (result.concerns.length > 0) {
parts.push('注意事项:');
result.concerns.forEach(c => parts.push(` - ${c}`));
}
parts.push('提示: 这是基于初步信息的预评估。付费详细评估(¥99)将提供完整的评分报告和申请建议。');
parts.push('</assessment_result>');
const content = parts.join('\n');
return {
type: ContextType.ASSESSMENT_RESULT,
priority: ContextPriority.CRITICAL,
content,
estimatedTokens: estimateTokens(content),
timestamp: Date.now(),
cacheTTL: 0,
isEmpty: false,
};
}
4.5 relevant_knowledge — 相关知识预检索
数据源:knowledge-service RAG API
// 基于用户最新消息做语义检索
const knowledge = await knowledgeClient.retrieveForPrompt({
query: currentUserMessage,
userId,
category: detectCategory(currentUserMessage), // 可选的类别过滤
});
注入格式:
<relevant_knowledge>
以下是与用户最新问题相关的知识库内容(仅供参考,请结合实际情况回答):
来源:高才通计划申请指南 (相似度: 0.92)
> 高才通计划(TTPS)B类要求:申请人须持有全球百强大学颁授的学士学位,
> 并在申请前五年内累积至少三年工作经验。百强大学名单以最新公布的
> QS/Times/USNEWS/SJTU 四大排名综合为准...
来源:优才计划评分标准 (相似度: 0.85)
> 综合计分制下,学历最高可得45分(博士/双硕士),年龄18-39岁可得30分...
</relevant_knowledge>
Freshness Policy:
- Cache TTL: 30s(同一用户短时间内的消息通常主题相近)
- Cache Key:
relevant_knowledge:${userId}:${hashOf(currentMessage)} - 仅当用户消息涉及具体政策/条件/流程时注入
条件注入逻辑:
async function buildRelevantKnowledgeContext(
userId: string,
currentMessage: string,
cache: Map<string, ContextCacheEntry>,
): Promise<ContextBlock> {
// 简单的关键词检测,判断是否需要知识检索
const needsKnowledge = shouldRetrieveKnowledge(currentMessage);
if (!needsKnowledge) {
return emptyBlock(ContextType.RELEVANT_KNOWLEDGE, ContextPriority.IMPORTANT);
}
const cacheKey = `relevant_knowledge:${userId}:${simpleHash(currentMessage)}`;
const cached = cache.get(cacheKey);
if (cached && Date.now() < cached.expiresAt) {
return cached.block;
}
const knowledge = await knowledgeClient.retrieveForPrompt({
query: currentMessage,
userId,
});
if (!knowledge) {
return emptyBlock(ContextType.RELEVANT_KNOWLEDGE, ContextPriority.IMPORTANT);
}
const content = `<relevant_knowledge>\n以下是与用户最新问题相关的知识库内容(仅供参考,请结合实际情况回答):\n\n${knowledge}\n</relevant_knowledge>`;
const block: ContextBlock = {
type: ContextType.RELEVANT_KNOWLEDGE,
priority: ContextPriority.IMPORTANT,
content,
estimatedTokens: estimateTokens(content),
timestamp: Date.now(),
cacheTTL: 30000,
isEmpty: false,
};
cache.set(cacheKey, { block, createdAt: Date.now(), expiresAt: Date.now() + 30000 });
return block;
}
/**
* 判断是否需要知识检索
* 简单的关键词 + 模式匹配,避免每次都调 RAG API
*/
function shouldRetrieveKnowledge(message: string): boolean {
const policyKeywords = [
'高才通', 'TTPS', '优才', 'QMAS', '专才', 'GEP', '留学', 'IANG',
'投资', 'CIES', '科技', 'TECHTAS', '条件', '要求', '资格', '申请',
'流程', '材料', '费用', '时间', '打分', '评分', '签证', '续签',
'永居', '居留', '配偶', '受养人', '政策', '最新',
];
const lowerMessage = message.toLowerCase();
return policyKeywords.some(kw => lowerMessage.includes(kw.toLowerCase()));
}
4.6 similar_experiences — 系统经验
数据源:knowledge-service Experience API(来自 evolution-service 的经验积累)
const experiences = await knowledgeClient.searchExperiences({
query: currentUserMessage,
activeOnly: true,
limit: 3,
});
注入格式:
<similar_experiences>
系统积累的相关经验(可参考但不必逐字照搬):
1. [对话策略] 当用户表示"我还在考虑"时,不要急于推销,先询问具体顾虑点,
再针对性解答。成功率比直接推进高40%。
2. [案例参考] 类似背景的用户(35岁+互联网+本科)通过优才计划成功的案例较多,
建议重点介绍优才综合计分制的优势。
3. [异议处理] 对于"费用太贵"的异议,可以从投资回报角度切入:
香港税收优势每年可节省X万...
</similar_experiences>
Freshness Policy:
- Cache TTL: 120s(系统经验更新频率低)
- 仅在信息收集、评估推荐、异议处理阶段注入
4.7 device_context — 设备/地区信息
数据源:ConversationGateway 在 WebSocket 连接时传入的 deviceInfo
// 从 conversation 实体的 deviceInfo 字段获取
const deviceInfo = conversation.deviceInfo;
注入格式:
<device_context>
用户环境信息:
- 设备类型: 移动设备 (iPhone)
- 时区: UTC+8 (中国标准时间)
- 地区: 上海 (根据IP推断)
- 当前时间: 2025-01-15 周三 下午2:30
提示: 用户在手机上咨询,回复可适当简短。下午时段用户可能在工作间隙咨询。
</device_context>
Freshness Policy:
- Cache TTL: 整个对话生命周期(连接后设备信息不变)
- 仅在首次消息或每5轮注入一次(避免浪费 token)
4.8 active_agents_history — 最近 Agent 调用记录
数据源:agentLoop 内存中维护的 AgentExecutionRecord[]
注入格式:
<active_agents_history>
最近调用的专家 Agent(本次对话中):
1. [2分钟前] Policy Expert - 查询高才通B类条件
→ 返回了详细的学历和工作经验要求
2. [1分钟前] Memory Manager - 保存用户信息(年龄35,浙大本科)
→ 已成功保存
注意: 避免重复调用相同 Agent 查询相同信息。如需进一步了解,可直接使用已获取的信息。
</active_agents_history>
Freshness Policy:
- Cache TTL: 0(实时,每次 API 调用都从内存读取最新记录)
- 仅保留最近 5 条记录
- 每条记录包含 agent 名称、时间、输入/输出摘要
5. ContextInjector 服务实现
// context-injector.service.ts
import { Injectable } from '@nestjs/common';
import { KnowledgeClientService } from '../knowledge/knowledge-client.service';
import {
ContextType, ContextPriority, ContextBlock,
ContextInjectionRequest, ContextInjectionResult,
ContextCacheEntry,
} from './context.types';
import Anthropic from '@anthropic-ai/sdk';
/**
* Token 估算常量
* 中文约 1.5-2 tokens/字符,英文约 0.25 tokens/word
* 保守取 2 tokens/字符
*/
const TOKENS_PER_CHAR = 2;
/**
* Claude Sonnet 的 context window 上限
*/
const MAX_CONTEXT_WINDOW = 200000;
/**
* 为 output 预留的 tokens
*/
const RESERVED_OUTPUT_TOKENS = 4096;
/**
* 为 system prompt 预留的 tokens(coordinator prompt 约 8000 tokens)
*/
const RESERVED_SYSTEM_TOKENS = 10000;
/**
* 上下文注入触发的压缩阈值(占 context window 的百分比)
*/
const COMPACTION_THRESHOLD = 0.80;
@Injectable()
export class ContextInjectorService {
/**
* 内存缓存 — 按 conversationId 隔离
* Map<conversationId, Map<cacheKey, ContextCacheEntry>>
*/
private cacheStore = new Map<string, Map<string, ContextCacheEntry>>();
constructor(
private knowledgeClient: KnowledgeClientService,
) {}
/**
* 主入口:注入上下文
*
* 流程:
* 1. 并行获取所有上下文块
* 2. 过滤空块
* 3. 按优先级排序
* 4. 检查 token 预算,必要时丢弃低优先级块
* 5. 组装成 context message 注入到 messages 数组
* 6. 如果总 token 仍超限,触发 auto-compaction
*/
async inject(request: ContextInjectionRequest): Promise<ContextInjectionResult> {
const cache = this.getOrCreateCache(request.conversationId);
const maxTokens = request.maxContextTokens || MAX_CONTEXT_WINDOW;
const availableForContext = maxTokens - RESERVED_OUTPUT_TOKENS - RESERVED_SYSTEM_TOKENS;
// ======== Step 1: 并行获取所有上下文 ========
const contextBlocks = await this.fetchAllContexts(request, cache);
// ======== Step 2: 过滤空块 ========
const nonEmptyBlocks = contextBlocks.filter(b => !b.isEmpty);
// ======== Step 3: 按优先级排序(P0 → P1 → P2)========
nonEmptyBlocks.sort((a, b) => a.priority - b.priority);
// ======== Step 4: Token 预算裁剪 ========
const existingMessageTokens = this.estimateMessagesTokens(request.messages);
let remainingTokenBudget = availableForContext - existingMessageTokens;
const injected: ContextBlock[] = [];
const dropped: ContextType[] = [];
for (const block of nonEmptyBlocks) {
if (block.estimatedTokens <= remainingTokenBudget) {
injected.push(block);
remainingTokenBudget -= block.estimatedTokens;
} else if (block.priority === ContextPriority.CRITICAL) {
// P0 级别:即使超限也要注入,但触发压缩
injected.push(block);
remainingTokenBudget -= block.estimatedTokens;
} else {
dropped.push(block.type);
}
}
// ======== Step 5: 组装 context message ========
const contextMessage = this.buildContextMessage(injected);
const enrichedMessages = this.injectContextMessage(request.messages, contextMessage);
// ======== Step 6: Auto-compaction ========
const totalTokens = existingMessageTokens + injected.reduce((sum, b) => sum + b.estimatedTokens, 0);
const utilizationRatio = totalTokens / availableForContext;
let compacted = false;
let finalMessages = enrichedMessages;
if (utilizationRatio > COMPACTION_THRESHOLD) {
finalMessages = await this.autoCompact(enrichedMessages, availableForContext);
compacted = true;
}
return {
messages: finalMessages,
injectedContexts: injected.map(b => b.type),
droppedContexts: dropped,
totalContextTokens: injected.reduce((sum, b) => sum + b.estimatedTokens, 0),
compacted,
};
}
/**
* 并行获取所有上下文块
*/
private async fetchAllContexts(
request: ContextInjectionRequest,
cache: Map<string, ContextCacheEntry>,
): Promise<ContextBlock[]> {
// 所有获取操作并行执行,互不依赖
const [
userMemory,
collectedInfo,
conversationStats,
assessmentResult,
relevantKnowledge,
similarExperiences,
deviceContext,
agentsHistory,
] = await Promise.all([
this.buildUserMemoryContext(request.userId, request.currentUserMessage, cache),
this.buildCollectedInfoContext(request.consultingState),
this.buildConversationStatsContext(request.messages, request.consultingState),
this.buildAssessmentResultContext(request.consultingState),
this.buildRelevantKnowledgeContext(request.userId, request.currentUserMessage, cache),
this.buildSimilarExperiencesContext(request.currentUserMessage, request.consultingState, cache),
this.buildDeviceContext(request.deviceInfo, request.messages.length),
this.buildAgentsHistoryContext(request.agentHistory),
]);
return [
userMemory,
collectedInfo,
conversationStats,
assessmentResult,
relevantKnowledge,
similarExperiences,
deviceContext,
agentsHistory,
];
}
/**
* 将上下文块组装成一条 context message
*
* 设计选择:将所有上下文放在一条 system-level 的 user message 中,
* 放在对话历史的最前面(在 previousMessages 之前)。
*
* 这样做的好处:
* 1. 不污染 system prompt(system prompt 走 cache_control)
* 2. 上下文随每次调用动态变化,不影响缓存命中率
* 3. LLM 能清晰区分 "上下文信息" 和 "用户对话"
*/
private buildContextMessage(blocks: ContextBlock[]): string {
if (blocks.length === 0) return '';
const parts = [
'=== 动态上下文信息(系统自动注入,非用户输入) ===',
'',
];
for (const block of blocks) {
parts.push(block.content);
parts.push('');
}
parts.push('=== 以上为系统上下文,以下为用户对话 ===');
return parts.join('\n');
}
/**
* 将 context message 注入到 messages 数组
*
* 注入位置:作为第一条 user message,紧接着一条空的 assistant 确认
* 这样不会打断后续的 user/assistant 交替模式
*/
private injectContextMessage(
messages: Anthropic.MessageParam[],
contextMessage: string,
): Anthropic.MessageParam[] {
if (!contextMessage) return messages;
// 方案 A:注入为 messages 数组的前置消息对
// 必须保持 user → assistant 交替
return [
{ role: 'user', content: contextMessage },
{ role: 'assistant', content: '我已了解以上上下文信息,将在回复中参考使用。' },
...messages,
];
}
// ========== 各类上下文构建方法(详见 4.1-4.8)==========
// 这里省略具体实现,见上方各小节
private async buildUserMemoryContext(userId: string, currentMessage: string, cache: Map<string, ContextCacheEntry>): Promise<ContextBlock> { /* ... */ }
private buildCollectedInfoContext(state?: any): ContextBlock { /* ... */ }
private buildConversationStatsContext(messages: any[], state?: any): ContextBlock { /* ... */ }
private buildAssessmentResultContext(state?: any): ContextBlock { /* ... */ }
private async buildRelevantKnowledgeContext(userId: string, message: string, cache: Map<string, ContextCacheEntry>): Promise<ContextBlock> { /* ... */ }
private async buildSimilarExperiencesContext(message: string, state: any, cache: Map<string, ContextCacheEntry>): Promise<ContextBlock> { /* ... */ }
private buildDeviceContext(deviceInfo?: any, messageCount?: number): ContextBlock { /* ... */ }
private buildAgentsHistoryContext(history?: any[]): ContextBlock { /* ... */ }
// ========== 工具方法 ==========
/**
* 估算 messages 数组的 token 数
*/
private estimateMessagesTokens(messages: Anthropic.MessageParam[]): number {
let total = 0;
for (const msg of messages) {
if (typeof msg.content === 'string') {
total += msg.content.length * TOKENS_PER_CHAR;
} else if (Array.isArray(msg.content)) {
for (const block of msg.content) {
if (block.type === 'text') {
total += (block as any).text.length * TOKENS_PER_CHAR;
} else if (block.type === 'image') {
total += 1000; // 图片约 1000 tokens
}
}
}
}
return total;
}
private getOrCreateCache(conversationId: string): Map<string, ContextCacheEntry> {
if (!this.cacheStore.has(conversationId)) {
this.cacheStore.set(conversationId, new Map());
}
return this.cacheStore.get(conversationId)!;
}
/**
* 清理对话缓存(对话结束时调用)
*/
clearCache(conversationId: string): void {
this.cacheStore.delete(conversationId);
}
/**
* 使特定上下文类型的缓存失效
* 例如 Memory Manager 保存了新记忆后,invalidate user_memory 缓存
*/
invalidateContext(conversationId: string, contextType: ContextType): void {
const cache = this.cacheStore.get(conversationId);
if (!cache) return;
// 删除该类型的所有缓存条目
for (const [key, entry] of cache.entries()) {
if (key.startsWith(contextType)) {
cache.delete(key);
}
}
}
}
6. Auto-Compaction 策略
当消息总 token 数接近 context window 上限时(超过 80%),自动触发压缩。
6.1 压缩算法
/**
* Auto-compaction: 自动压缩对话历史
*
* 策略(按优先级执行,每步检查是否降到阈值以下):
* 1. 移除 tool_result 中的详细内容,保留摘要
* 2. 压缩早期对话为摘要(保留最近 10 轮完整)
* 3. 截断过长的单条消息(超过 2000 字符的)
* 4. 移除 P2 上下文块
* 5. 压缩 P1 上下文块
*/
async autoCompact(
messages: Anthropic.MessageParam[],
tokenBudget: number,
): Promise<Anthropic.MessageParam[]> {
let currentTokens = this.estimateMessagesTokens(messages);
let compactedMessages = [...messages];
// ---- 策略 1: 压缩 tool_result 内容 ----
if (currentTokens > tokenBudget * COMPACTION_THRESHOLD) {
compactedMessages = this.compactToolResults(compactedMessages);
currentTokens = this.estimateMessagesTokens(compactedMessages);
}
// ---- 策略 2: 早期对话摘要化 ----
if (currentTokens > tokenBudget * COMPACTION_THRESHOLD) {
compactedMessages = await this.summarizeEarlyMessages(compactedMessages, 10);
currentTokens = this.estimateMessagesTokens(compactedMessages);
}
// ---- 策略 3: 截断过长消息 ----
if (currentTokens > tokenBudget * COMPACTION_THRESHOLD) {
compactedMessages = this.truncateLongMessages(compactedMessages, 2000);
currentTokens = this.estimateMessagesTokens(compactedMessages);
}
return compactedMessages;
}
/**
* 压缩 tool_result:将详细的 JSON 结果替换为一行摘要
*/
private compactToolResults(messages: Anthropic.MessageParam[]): Anthropic.MessageParam[] {
return messages.map(msg => {
if (msg.role !== 'user' || !Array.isArray(msg.content)) return msg;
const compactedContent = (msg.content as any[]).map(block => {
if (block.type === 'tool_result') {
const originalContent = typeof block.content === 'string'
? block.content
: JSON.stringify(block.content);
// 如果 tool_result 超过 500 字符,压缩为摘要
if (originalContent.length > 500) {
return {
...block,
content: `[已压缩] ${originalContent.slice(0, 200)}... (原始长度: ${originalContent.length}字符)`,
};
}
}
return block;
});
return { ...msg, content: compactedContent };
});
}
/**
* 将早期对话(保留最近 N 轮完整)压缩为摘要
* 使用 Haiku 模型做快速摘要,成本极低
*/
private async summarizeEarlyMessages(
messages: Anthropic.MessageParam[],
keepRecentTurns: number,
): Promise<Anthropic.MessageParam[]> {
// 找到上下文注入消息对(前2条)和对话消息
const contextPair = messages.slice(0, 2); // context injection pair
const dialogMessages = messages.slice(2);
// 计算需要保留的最近消息数(每轮 = 1 user + 1 assistant = 2条)
const keepCount = keepRecentTurns * 2;
if (dialogMessages.length <= keepCount) {
return messages; // 没有足够的早期消息需要压缩
}
const earlyMessages = dialogMessages.slice(0, -keepCount);
const recentMessages = dialogMessages.slice(-keepCount);
// 用 Haiku 快速摘要早期对话
const earlyText = earlyMessages
.map(m => `${m.role}: ${typeof m.content === 'string' ? m.content : '[complex content]'}`)
.join('\n');
const summaryResponse = await this.anthropic.messages.create({
model: 'claude-haiku-4-20250514',
max_tokens: 500,
messages: [{
role: 'user',
content: `请用200字以内概括以下对话的要点(保留关键信息如用户背景、已讨论的移民方案等):\n\n${earlyText}`,
}],
});
const summary = summaryResponse.content[0].type === 'text'
? summaryResponse.content[0].text
: '';
// 组装:上下文 + 摘要 + 最近消息
return [
...contextPair,
{ role: 'user', content: `[以下是之前对话的摘要]\n${summary}\n[摘要结束,以下是最近的对话]` },
{ role: 'assistant', content: '好的,我了解了之前的讨论内容,我们继续。' },
...recentMessages,
];
}
6.2 Compaction 触发时机
消息 token 数 / 可用 token 预算:
0% ──────── 50% ──────── 80% ──────── 95% ──── 100%
│ 正常区域 │ 压缩区域 │ 危险区域 │
│ │ │ │
│ 不做任何处理 │ 触发策略1-3 │ 触发策略4-5│
│ │ 压缩日志 │ 强制压缩 │
│ │ 通知前端 │ 可能截断 │
7. 完整数据流
用户发送消息
│
▼
ConversationService.sendMessage()
│
▼
CoordinatorAgentService.sendMessage()
│
▼
┌───────────────────────────┐
│ ContextInjector.inject() │
│ │
│ ┌─────────────────────┐ │
│ │ Promise.all([ │ │
│ │ fetchUserMemory, │──┼──→ knowledge-service (HTTP)
│ │ fetchKnowledge, │──┼──→ knowledge-service (HTTP)
│ │ fetchExperiences, │──┼──→ knowledge-service (HTTP)
│ │ buildLocalCtx... │ │ (本地计算,无 I/O)
│ │ ]) │ │
│ └──────────┬──────────┘ │
│ │ │
│ ┌──────────▼──────────┐ │
│ │ Token Budget Check │ │
│ │ 排序 → 裁剪 → 注入 │ │
│ └──────────┬──────────┘ │
│ │ │
│ ┌──────────▼──────────┐ │
│ │ Auto-Compaction │ │
│ │ (如果 > 80%) │ │
│ └──────────┬──────────┘ │
│ │ │
│ Return: enrichedMessages │
└─────────────┬─────────────┘
│
▼
Claude API 调用
(system prompt + enrichedMessages + tools)
8. 缓存失效策略
| 事件 | 失效的缓存 | 方式 |
|---|---|---|
| Memory Manager 保存了新记忆 | user_memory:${userId} |
invalidateContext() 主动失效 |
| 评估完成 | assessment_result |
直接更新 state,无需缓存 |
| 对话结束 | 该对话的所有缓存 | clearCache(conversationId) |
| 用户长时间不活跃 (>5min) | 所有外部数据缓存 | TTL 自动过期 |
| 知识库更新 | relevant_knowledge |
TTL 自动过期(30s) |
9. 性能指标
| 指标 | 目标 | 备注 |
|---|---|---|
| 上下文注入耗时 | < 200ms | 并行获取 + 缓存命中时 < 10ms |
| 缓存命中率 | > 70% | 同一对话中连续消息通常命中 |
| Auto-compaction 耗时 | < 2s | 使用 Haiku 做摘要时 |
| 上下文 token 占比 | 5-15% | 占总 context window 的比例 |
| API 调用次数 | 0-3 per injection | 缓存命中时为 0 |
10. 与旧架构的对比
| 方面 | 旧架构 (ClaudeAgentServiceV2) | 新架构 (ContextInjector) |
|---|---|---|
| 上下文来源 | buildSystemPrompt() 固定拼接 |
8 种动态上下文,按需注入 |
| 缓存 | 无 | 多级缓存,TTL 策略 |
| Token 管理 | calculateMaxTokens() 限制输出 |
完整的输入+输出 token 预算管理 |
| 压缩 | 固定取 last 20 messages | 自动压缩,摘要化早期对话 |
| 用户记忆 | context.userMemory 字符串数组 |
结构化 UserMemory,语义搜索 |
| 知识注入 | 工具调用时才检索 | 预检索,主动注入相关知识 |
| 经验利用 | getAccumulatedExperience() 简单拼接 |
语义匹配,按场景注入 |
| 设备适配 | buildDeviceContext() 仅首次 |
持续参考,影响回复风格 |