import { Injectable, OnModuleInit } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import Anthropic from '@anthropic-ai/sdk'; import { ImmigrationToolsService } from './tools/immigration-tools.service'; import { TokenUsageService } from './token-usage.service'; import { buildSystemPrompt, SystemPromptConfig } from './prompts/system-prompt'; import { KnowledgeClientService } from '../knowledge/knowledge-client.service'; import { intentClassifier, IntentResult, IntentType } from './intent-classifier'; import { responseGate } from './response-gate'; export interface FileAttachment { id: string; originalName: string; mimeType: string; type: 'image' | 'document' | 'audio' | 'video' | 'other'; size: number; downloadUrl?: string; thumbnailUrl?: string; } export interface ConversationContext { userId: string; conversationId: string; userMemory?: string[]; previousMessages?: Array<{ role: 'user' | 'assistant'; content: string; attachments?: FileAttachment[]; }>; } export interface StreamChunk { type: 'text' | 'tool_use' | 'tool_result' | 'end'; content?: string; toolName?: string; toolInput?: Record; toolResult?: unknown; } @Injectable() export class ClaudeAgentService implements OnModuleInit { private client: Anthropic; private systemPromptConfig: SystemPromptConfig; constructor( private configService: ConfigService, private immigrationToolsService: ImmigrationToolsService, private knowledgeClient: KnowledgeClientService, private tokenUsageService: TokenUsageService, ) {} onModuleInit() { const baseUrl = this.configService.get('ANTHROPIC_BASE_URL'); const isProxyUrl = baseUrl && (baseUrl.includes('67.223.119.33') || baseUrl.match(/^\d+\.\d+\.\d+\.\d+/)); // If using IP-based proxy, disable TLS certificate verification if (isProxyUrl) { console.log(`Using Anthropic proxy (TLS verification disabled): ${baseUrl}`); process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0'; } this.client = new Anthropic({ apiKey: this.configService.get('ANTHROPIC_API_KEY'), baseURL: baseUrl || undefined, }); if (baseUrl && !isProxyUrl) { console.log(`Using Anthropic API base URL: ${baseUrl}`); } // Initialize with default config this.systemPromptConfig = { identity: '专业、友善、耐心的香港移民顾问', conversationStyle: '专业但不生硬,用简洁明了的语言解答', }; } /** * Update system prompt configuration (for evolution) */ updateSystemPromptConfig(config: Partial) { this.systemPromptConfig = { ...this.systemPromptConfig, ...config, }; } /** * Calculate max tokens based on intent classification * 严格控制回复长度,中文约 1.5 tokens/字符 */ private calculateMaxTokens(intent: IntentResult): number { // 中文约 1.5 tokens/字符,稍加余量取 1.8 const tokensPerChar = 1.8; const baseTokens = Math.round(intent.maxResponseLength * tokensPerChar); // 根据意图类型调整,严格限制上限 switch (intent.type) { case IntentType.CHAT: return Math.min(200, baseTokens); // 闲聊严格限制 200 tokens case IntentType.SIMPLE_QUERY: return Math.min(600, baseTokens); // 简单查询限制 600 tokens (~300字) case IntentType.CLARIFICATION: return Math.min(300, baseTokens); // 澄清限制 300 tokens case IntentType.CONFIRMATION: return Math.min(400, baseTokens); // 确认限制 400 tokens case IntentType.DEEP_CONSULTATION: return Math.min(1600, Math.max(800, baseTokens)); // 深度咨询 800-1600 case IntentType.ACTION_NEEDED: return Math.min(1000, Math.max(500, baseTokens)); // 需要行动 500-1000 default: return 1024; // 默认 1024 } } /** * Fetch and format approved system experiences for injection */ private async getAccumulatedExperience(query: string): Promise { try { const experiences = await this.knowledgeClient.searchExperiences({ query, activeOnly: true, limit: 5, }); if (experiences.length === 0) { return '暂无'; } return experiences .map((exp, index) => `${index + 1}. [${exp.experienceType}] ${exp.content}`) .join('\n'); } catch (error) { console.error('[ClaudeAgent] Failed to fetch experiences:', error); return '暂无'; } } /** * Build multimodal content blocks for Claude Vision API */ private async buildMultimodalContent( text: string, attachments?: FileAttachment[], ): Promise { const content: Anthropic.ContentBlockParam[] = []; // Add image attachments first (Claude processes images before text) if (attachments && attachments.length > 0) { for (const attachment of attachments) { if (attachment.type === 'image' && attachment.downloadUrl) { try { // Fetch the image and convert to base64 const response = await fetch(attachment.downloadUrl); if (response.ok) { const buffer = await response.arrayBuffer(); const base64Data = Buffer.from(buffer).toString('base64'); // Determine media type const mediaType = attachment.mimeType as 'image/jpeg' | 'image/png' | 'image/gif' | 'image/webp'; content.push({ type: 'image', source: { type: 'base64', media_type: mediaType, data: base64Data, }, }); } } catch (error) { console.error(`Failed to fetch image ${attachment.originalName}:`, error); } } else if (attachment.type === 'document') { // For documents, add a text reference content.push({ type: 'text', text: `[Attached document: ${attachment.originalName}]`, }); } } } // Add the text message if (text) { content.push({ type: 'text', text, }); } return content; } /** * Send a message and get streaming response with tool loop support * Uses Prompt Caching to reduce costs (~90% savings on cached system prompt) * Supports multimodal messages with image attachments * Implements 3-layer architecture: Intent Classification -> ReAct Agent -> Response Gate */ async *sendMessage( message: string, context: ConversationContext, attachments?: FileAttachment[], ): AsyncGenerator { // ========== 第一层:意图分类 ========== const conversationHistory = context.previousMessages?.map(msg => ({ role: msg.role, content: msg.content, })) || []; const intent = intentClassifier.classify(message, conversationHistory); console.log(`[ClaudeAgent] Intent classified: ${intent.type}, maxLength: ${intent.maxResponseLength}, needsTools: ${intent.needsTools}`); // ========== 第二层:ReAct Agent ========== const tools = this.immigrationToolsService.getTools(); // Fetch relevant system experiences and inject into prompt const accumulatedExperience = await this.getAccumulatedExperience(message); const dynamicConfig: SystemPromptConfig = { ...this.systemPromptConfig, accumulatedExperience, intentHint: intent, // 注入意图分类结果 }; const systemPrompt = buildSystemPrompt(dynamicConfig); // Build messages array const messages: Anthropic.MessageParam[] = []; // Add previous messages if any (with multimodal support) if (context.previousMessages) { for (const msg of context.previousMessages) { if (msg.attachments && msg.attachments.length > 0 && msg.role === 'user') { // Build multimodal content for messages with attachments const multimodalContent = await this.buildMultimodalContent(msg.content, msg.attachments); messages.push({ role: msg.role, content: multimodalContent, }); } else { messages.push({ role: msg.role, content: msg.content, }); } } } // Add current message (with multimodal support) if (attachments && attachments.length > 0) { const multimodalContent = await this.buildMultimodalContent(message, attachments); messages.push({ role: 'user', content: multimodalContent, }); } else { messages.push({ role: 'user', content: message, }); } // Tool loop - continue until we get a final response (no tool use) const maxIterations = 10; // Safety limit let iterations = 0; // 根据意图分类调整 max_tokens const maxTokens = this.calculateMaxTokens(intent); // System prompt with cache_control for Prompt Caching // Cache TTL is 5 minutes, cache hits cost only 10% of normal input price const systemWithCache: Anthropic.TextBlockParam[] = [ { type: 'text', text: systemPrompt, cache_control: { type: 'ephemeral' }, }, ]; // 用于收集完整响应以进行门控检查 let fullResponseText = ''; // Token 使用量累积 const startTime = Date.now(); let totalInputTokens = 0; let totalOutputTokens = 0; let totalCacheCreationTokens = 0; let totalCacheReadTokens = 0; let toolCallCount = 0; while (iterations < maxIterations) { iterations++; try { // Create streaming message with cached system prompt const stream = await this.client.messages.stream({ model: 'claude-sonnet-4-20250514', max_tokens: maxTokens, system: systemWithCache, messages, tools: tools as Anthropic.Tool[], }); let currentToolUse: { id: string; name: string; inputJson: string; input: Record; } | null = null; // Collect all tool uses and text blocks in this response const toolUses: Array<{ id: string; name: string; input: Record }> = []; const assistantContent: Anthropic.ContentBlockParam[] = []; let hasText = false; for await (const event of stream) { if (event.type === 'content_block_start') { if (event.content_block.type === 'tool_use') { currentToolUse = { id: event.content_block.id, name: event.content_block.name, inputJson: '', input: {}, }; } } else if (event.type === 'content_block_delta') { if (event.delta.type === 'text_delta') { hasText = true; fullResponseText += event.delta.text; // 收集完整响应 yield { type: 'text', content: event.delta.text, }; } else if (event.delta.type === 'input_json_delta' && currentToolUse) { currentToolUse.inputJson += event.delta.partial_json || ''; } } else if (event.type === 'content_block_stop') { if (currentToolUse) { // Parse the complete accumulated JSON try { currentToolUse.input = JSON.parse(currentToolUse.inputJson || '{}'); } catch (e) { console.error('Failed to parse tool input JSON:', currentToolUse.inputJson, e); currentToolUse.input = {}; } toolUses.push({ id: currentToolUse.id, name: currentToolUse.name, input: currentToolUse.input, }); yield { type: 'tool_use', toolName: currentToolUse.name, toolInput: currentToolUse.input, }; currentToolUse = null; } } } // 获取最终消息以提取 usage 信息 const finalMsg = await stream.finalMessage(); // 累积 token 使用量 if (finalMsg.usage) { totalInputTokens += finalMsg.usage.input_tokens || 0; totalOutputTokens += finalMsg.usage.output_tokens || 0; // Prompt Caching 的 tokens (如果 API 返回) const usage = finalMsg.usage as unknown as Record; totalCacheCreationTokens += usage.cache_creation_input_tokens || 0; totalCacheReadTokens += usage.cache_read_input_tokens || 0; } // If no tool uses, we're done if (toolUses.length === 0) { // ========== 第三层:回复质量门控(日志记录) ========== if (fullResponseText) { const gateResult = responseGate.check(fullResponseText, intent, message); console.log(`[ClaudeAgent] Response gate: passed=${gateResult.passed}, length=${fullResponseText.length}/${intent.maxResponseLength}`); if (!gateResult.passed && gateResult.suggestions) { console.log(`[ClaudeAgent] Gate suggestions: ${gateResult.suggestions.join(', ')}`); } } // ========== 记录 Token 使用量 ========== const latencyMs = Date.now() - startTime; this.tokenUsageService.recordUsage({ userId: context.userId, conversationId: context.conversationId, model: 'claude-sonnet-4-20250514', inputTokens: totalInputTokens, outputTokens: totalOutputTokens, cacheCreationTokens: totalCacheCreationTokens, cacheReadTokens: totalCacheReadTokens, intentType: intent.type, toolCalls: toolCallCount, responseLength: fullResponseText.length, latencyMs, }).catch(err => console.error('[ClaudeAgent] Failed to record token usage:', err)); yield { type: 'end' }; return; } // 累积工具调用次数 toolCallCount += toolUses.length; // Build assistant message content with tool uses for (const block of finalMsg.content) { if (block.type === 'text') { assistantContent.push({ type: 'text', text: block.text }); } else if (block.type === 'tool_use') { assistantContent.push({ type: 'tool_use', id: block.id, name: block.name, input: block.input as Record, }); } } // Add assistant message with tool uses messages.push({ role: 'assistant', content: assistantContent, }); // Execute all tools and collect results const toolResults: Anthropic.ToolResultBlockParam[] = []; for (const toolUse of toolUses) { const result = await this.immigrationToolsService.executeTool( toolUse.name, toolUse.input, context, ); yield { type: 'tool_result', toolName: toolUse.name, toolResult: result, }; toolResults.push({ type: 'tool_result', tool_use_id: toolUse.id, content: JSON.stringify(result), }); } // Add user message with tool results messages.push({ role: 'user', content: toolResults, }); // Continue the loop to get Claude's response after tool execution } catch (error) { console.error('Claude API error:', error); throw error; } } console.error('Tool loop exceeded maximum iterations'); yield { type: 'end' }; } /** * Non-streaming message for simple queries * Uses Prompt Caching for cost optimization */ async sendMessageSync( message: string, context: ConversationContext, ): Promise { const tools = this.immigrationToolsService.getTools(); // Fetch relevant system experiences and inject into prompt const accumulatedExperience = await this.getAccumulatedExperience(message); const dynamicConfig: SystemPromptConfig = { ...this.systemPromptConfig, accumulatedExperience, }; const systemPrompt = buildSystemPrompt(dynamicConfig); const messages: Anthropic.MessageParam[] = []; if (context.previousMessages) { for (const msg of context.previousMessages) { messages.push({ role: msg.role, content: msg.content, }); } } messages.push({ role: 'user', content: message, }); // System prompt with cache_control for Prompt Caching const systemWithCache: Anthropic.TextBlockParam[] = [ { type: 'text', text: systemPrompt, cache_control: { type: 'ephemeral' }, }, ]; const response = await this.client.messages.create({ model: 'claude-sonnet-4-20250514', max_tokens: 4096, system: systemWithCache, messages, tools: tools as Anthropic.Tool[], }); // Extract text response let result = ''; for (const block of response.content) { if (block.type === 'text') { result += block.text; } } return result; } /** * Analyze content (for evolution service) */ async analyze(prompt: string): Promise { const response = await this.client.messages.create({ model: 'claude-sonnet-4-20250514', max_tokens: 8192, messages: [ { role: 'user', content: prompt, }, ], }); let result = ''; for (const block of response.content) { if (block.type === 'text') { result += block.text; } } return result; } }