From 849a4a3099f0339a6d0513e750a2c626beac027a Mon Sep 17 00:00:00 2001 From: hailin Date: Fri, 23 Jan 2026 08:23:58 -0800 Subject: [PATCH] feat(conversation): add token usage tracking for API cost analysis - Add TokenUsageEntity to store per-request token consumption - Add TokenUsageService with cost calculation and statistics APIs - Record input/output/cache tokens per API call - Calculate estimated cost based on Claude pricing - Provide user/conversation/global stats aggregation - Support daily stats and top users ranking - Integrate token tracking in ClaudeAgentService - Track latency, tool calls, response length - Accumulate tokens across tool loop iterations - Add token_usages table to init-db.sql with proper indexes This enables: - Per-user token consumption tracking - Cost analysis and optimization - Future billing/quota features Co-Authored-By: Claude Opus 4.5 --- .claude/settings.local.json | 11 +- .../src/domain/entities/token-usage.entity.ts | 76 +++++ .../claude/claude-agent.service.ts | 47 ++- .../infrastructure/claude/claude.module.ts | 13 +- .../claude/token-usage.service.ts | 296 ++++++++++++++++++ scripts/init-db.sql | 52 +++ 6 files changed, 488 insertions(+), 7 deletions(-) create mode 100644 packages/services/conversation-service/src/domain/entities/token-usage.entity.ts create mode 100644 packages/services/conversation-service/src/infrastructure/claude/token-usage.service.ts diff --git a/.claude/settings.local.json b/.claude/settings.local.json index f92271b..d2ac671 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -19,7 +19,16 @@ "Bash(scp:*)", "Bash(timeout 20 cat:*)", "Bash(npm run build:*)", - "Bash(pnpm run build:*)" + "Bash(pnpm run build:*)", + "Bash(pnpm --filter admin-client build:*)", + "Bash(node:*)", + "Bash(npm run build:conversation:*)", + "Bash(npm run lint)", + "Bash(dir:*)", + "Bash(cmd /c \"dir %USERPROFILE%\\\\.ssh\")", + "Bash(git fetch:*)", + "Bash(TEST_USER_ID=\"a1b2c3d4-e5f6-7890-abcd-ef1234567890\":*)", + "Bash(git reset:*)" ] } } diff --git a/packages/services/conversation-service/src/domain/entities/token-usage.entity.ts b/packages/services/conversation-service/src/domain/entities/token-usage.entity.ts new file mode 100644 index 0000000..1ef0cfb --- /dev/null +++ b/packages/services/conversation-service/src/domain/entities/token-usage.entity.ts @@ -0,0 +1,76 @@ +import { + Entity, + PrimaryGeneratedColumn, + Column, + CreateDateColumn, + Index, +} from 'typeorm'; + +/** + * Token 使用统计实体 + * 记录每次 Claude API 调用的 token 消耗 + */ +@Entity('token_usages') +@Index('idx_token_usages_user', ['userId']) +@Index('idx_token_usages_conversation', ['conversationId']) +@Index('idx_token_usages_created', ['createdAt']) +@Index('idx_token_usages_model', ['model']) +export class TokenUsageEntity { + @PrimaryGeneratedColumn('uuid') + id: string; + + @Column({ name: 'user_id', type: 'uuid', nullable: true }) + userId: string | null; + + @Column({ name: 'conversation_id', type: 'uuid' }) + conversationId: string; + + @Column({ name: 'message_id', type: 'uuid', nullable: true }) + messageId: string | null; + + @Column({ length: 50 }) + model: string; + + // 输入 tokens + @Column({ name: 'input_tokens', default: 0 }) + inputTokens: number; + + // 输出 tokens + @Column({ name: 'output_tokens', default: 0 }) + outputTokens: number; + + // 缓存创建的 tokens (Prompt Caching) + @Column({ name: 'cache_creation_tokens', default: 0 }) + cacheCreationTokens: number; + + // 缓存命中的 tokens (Prompt Caching) + @Column({ name: 'cache_read_tokens', default: 0 }) + cacheReadTokens: number; + + // 总 tokens (input + output) + @Column({ name: 'total_tokens', default: 0 }) + totalTokens: number; + + // 估算成本 (美元) + @Column({ name: 'estimated_cost', type: 'decimal', precision: 10, scale: 6, default: 0 }) + estimatedCost: number; + + // 意图类型 + @Column({ name: 'intent_type', length: 30, nullable: true }) + intentType: string | null; + + // 工具调用次数 + @Column({ name: 'tool_calls', default: 0 }) + toolCalls: number; + + // 响应长度(字符数) + @Column({ name: 'response_length', default: 0 }) + responseLength: number; + + // 请求耗时(毫秒) + @Column({ name: 'latency_ms', default: 0 }) + latencyMs: number; + + @CreateDateColumn({ name: 'created_at' }) + createdAt: Date; +} diff --git a/packages/services/conversation-service/src/infrastructure/claude/claude-agent.service.ts b/packages/services/conversation-service/src/infrastructure/claude/claude-agent.service.ts index d6e6cbf..6a23155 100644 --- a/packages/services/conversation-service/src/infrastructure/claude/claude-agent.service.ts +++ b/packages/services/conversation-service/src/infrastructure/claude/claude-agent.service.ts @@ -2,6 +2,7 @@ import { Injectable, OnModuleInit } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import Anthropic from '@anthropic-ai/sdk'; import { ImmigrationToolsService } from './tools/immigration-tools.service'; +import { TokenUsageService } from './token-usage.service'; import { buildSystemPrompt, SystemPromptConfig } from './prompts/system-prompt'; import { KnowledgeClientService } from '../knowledge/knowledge-client.service'; import { intentClassifier, IntentResult, IntentType } from './intent-classifier'; @@ -45,6 +46,7 @@ export class ClaudeAgentService implements OnModuleInit { private configService: ConfigService, private immigrationToolsService: ImmigrationToolsService, private knowledgeClient: KnowledgeClientService, + private tokenUsageService: TokenUsageService, ) {} onModuleInit() { @@ -279,6 +281,14 @@ export class ClaudeAgentService implements OnModuleInit { // 用于收集完整响应以进行门控检查 let fullResponseText = ''; + // Token 使用量累积 + const startTime = Date.now(); + let totalInputTokens = 0; + let totalOutputTokens = 0; + let totalCacheCreationTokens = 0; + let totalCacheReadTokens = 0; + let toolCallCount = 0; + while (iterations < maxIterations) { iterations++; @@ -352,6 +362,19 @@ export class ClaudeAgentService implements OnModuleInit { } } + // 获取最终消息以提取 usage 信息 + const finalMsg = await stream.finalMessage(); + + // 累积 token 使用量 + if (finalMsg.usage) { + totalInputTokens += finalMsg.usage.input_tokens || 0; + totalOutputTokens += finalMsg.usage.output_tokens || 0; + // Prompt Caching 的 tokens (如果 API 返回) + const usage = finalMsg.usage as Record; + totalCacheCreationTokens += usage.cache_creation_input_tokens || 0; + totalCacheReadTokens += usage.cache_read_input_tokens || 0; + } + // If no tool uses, we're done if (toolUses.length === 0) { // ========== 第三层:回复质量门控(日志记录) ========== @@ -362,14 +385,32 @@ export class ClaudeAgentService implements OnModuleInit { console.log(`[ClaudeAgent] Gate suggestions: ${gateResult.suggestions.join(', ')}`); } } + + // ========== 记录 Token 使用量 ========== + const latencyMs = Date.now() - startTime; + this.tokenUsageService.recordUsage({ + userId: context.userId, + conversationId: context.conversationId, + model: 'claude-sonnet-4-20250514', + inputTokens: totalInputTokens, + outputTokens: totalOutputTokens, + cacheCreationTokens: totalCacheCreationTokens, + cacheReadTokens: totalCacheReadTokens, + intentType: intent.type, + toolCalls: toolCallCount, + responseLength: fullResponseText.length, + latencyMs, + }).catch(err => console.error('[ClaudeAgent] Failed to record token usage:', err)); + yield { type: 'end' }; return; } + // 累积工具调用次数 + toolCallCount += toolUses.length; + // Build assistant message content with tool uses - // First get the final message to extract text content - const finalMessage = await stream.finalMessage(); - for (const block of finalMessage.content) { + for (const block of finalMsg.content) { if (block.type === 'text') { assistantContent.push({ type: 'text', text: block.text }); } else if (block.type === 'tool_use') { diff --git a/packages/services/conversation-service/src/infrastructure/claude/claude.module.ts b/packages/services/conversation-service/src/infrastructure/claude/claude.module.ts index 6a0999c..e82661a 100644 --- a/packages/services/conversation-service/src/infrastructure/claude/claude.module.ts +++ b/packages/services/conversation-service/src/infrastructure/claude/claude.module.ts @@ -1,13 +1,20 @@ import { Module, Global } from '@nestjs/common'; import { ConfigModule } from '@nestjs/config'; +import { TypeOrmModule } from '@nestjs/typeorm'; import { ClaudeAgentService } from './claude-agent.service'; import { ImmigrationToolsService } from './tools/immigration-tools.service'; +import { TokenUsageService } from './token-usage.service'; +import { TokenUsageEntity } from '../../domain/entities/token-usage.entity'; import { KnowledgeModule } from '../knowledge/knowledge.module'; @Global() @Module({ - imports: [ConfigModule, KnowledgeModule], - providers: [ClaudeAgentService, ImmigrationToolsService], - exports: [ClaudeAgentService, ImmigrationToolsService], + imports: [ + ConfigModule, + KnowledgeModule, + TypeOrmModule.forFeature([TokenUsageEntity]), + ], + providers: [ClaudeAgentService, ImmigrationToolsService, TokenUsageService], + exports: [ClaudeAgentService, ImmigrationToolsService, TokenUsageService], }) export class ClaudeModule {} diff --git a/packages/services/conversation-service/src/infrastructure/claude/token-usage.service.ts b/packages/services/conversation-service/src/infrastructure/claude/token-usage.service.ts new file mode 100644 index 0000000..bb0fe4d --- /dev/null +++ b/packages/services/conversation-service/src/infrastructure/claude/token-usage.service.ts @@ -0,0 +1,296 @@ +import { Injectable } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; +import { Repository, Between, MoreThanOrEqual } from 'typeorm'; +import { TokenUsageEntity } from '../../domain/entities/token-usage.entity'; + +/** + * Claude API 定价 (截至 2024年) + * claude-sonnet-4-20250514: + * - Input: $3/MTok + * - Output: $15/MTok + * - Cache write: $3.75/MTok + * - Cache read: $0.30/MTok + */ +const PRICING = { + 'claude-sonnet-4-20250514': { + input: 3 / 1_000_000, + output: 15 / 1_000_000, + cacheWrite: 3.75 / 1_000_000, + cacheRead: 0.30 / 1_000_000, + }, + default: { + input: 3 / 1_000_000, + output: 15 / 1_000_000, + cacheWrite: 3.75 / 1_000_000, + cacheRead: 0.30 / 1_000_000, + }, +}; + +export interface TokenUsageInput { + userId?: string; + conversationId: string; + messageId?: string; + model: string; + inputTokens: number; + outputTokens: number; + cacheCreationTokens?: number; + cacheReadTokens?: number; + intentType?: string; + toolCalls?: number; + responseLength?: number; + latencyMs?: number; +} + +export interface UsageStats { + totalRequests: number; + totalInputTokens: number; + totalOutputTokens: number; + totalCacheReadTokens: number; + totalCacheCreationTokens: number; + totalTokens: number; + totalCost: number; + avgInputTokens: number; + avgOutputTokens: number; + avgLatencyMs: number; + cacheHitRate: number; +} + +@Injectable() +export class TokenUsageService { + constructor( + @InjectRepository(TokenUsageEntity) + private tokenUsageRepository: Repository, + ) {} + + /** + * 计算估算成本 + */ + private calculateCost( + model: string, + inputTokens: number, + outputTokens: number, + cacheCreationTokens: number, + cacheReadTokens: number, + ): number { + const pricing = PRICING[model] || PRICING.default; + + // 缓存命中的 tokens 不计入普通输入 + const regularInputTokens = inputTokens - cacheReadTokens; + + return ( + regularInputTokens * pricing.input + + outputTokens * pricing.output + + cacheCreationTokens * pricing.cacheWrite + + cacheReadTokens * pricing.cacheRead + ); + } + + /** + * 记录一次 API 调用的 token 使用量 + */ + async recordUsage(input: TokenUsageInput): Promise { + const cacheCreationTokens = input.cacheCreationTokens || 0; + const cacheReadTokens = input.cacheReadTokens || 0; + const totalTokens = input.inputTokens + input.outputTokens; + + const estimatedCost = this.calculateCost( + input.model, + input.inputTokens, + input.outputTokens, + cacheCreationTokens, + cacheReadTokens, + ); + + const entity = this.tokenUsageRepository.create({ + userId: input.userId || null, + conversationId: input.conversationId, + messageId: input.messageId || null, + model: input.model, + inputTokens: input.inputTokens, + outputTokens: input.outputTokens, + cacheCreationTokens, + cacheReadTokens, + totalTokens, + estimatedCost, + intentType: input.intentType || null, + toolCalls: input.toolCalls || 0, + responseLength: input.responseLength || 0, + latencyMs: input.latencyMs || 0, + }); + + const saved = await this.tokenUsageRepository.save(entity); + + console.log( + `[TokenUsage] Recorded: in=${input.inputTokens}, out=${input.outputTokens}, ` + + `cache_read=${cacheReadTokens}, cost=$${estimatedCost.toFixed(6)}, ` + + `intent=${input.intentType}, latency=${input.latencyMs}ms` + ); + + return saved; + } + + /** + * 获取用户的 token 使用统计 + */ + async getUserStats(userId: string, days: number = 30): Promise { + const since = new Date(); + since.setDate(since.getDate() - days); + + const records = await this.tokenUsageRepository.find({ + where: { + userId, + createdAt: MoreThanOrEqual(since), + }, + }); + + return this.calculateStats(records); + } + + /** + * 获取对话的 token 使用统计 + */ + async getConversationStats(conversationId: string): Promise { + const records = await this.tokenUsageRepository.find({ + where: { conversationId }, + }); + + return this.calculateStats(records); + } + + /** + * 获取全局统计 (管理员用) + */ + async getGlobalStats(days: number = 30): Promise { + const since = new Date(); + since.setDate(since.getDate() - days); + + const records = await this.tokenUsageRepository.find({ + where: { + createdAt: MoreThanOrEqual(since), + }, + }); + + const stats = this.calculateStats(records); + const uniqueUsers = new Set(records.filter(r => r.userId).map(r => r.userId)).size; + + return { ...stats, uniqueUsers }; + } + + /** + * 获取日统计明细 + */ + async getDailyStats(days: number = 7): Promise> { + const since = new Date(); + since.setDate(since.getDate() - days); + + const records = await this.tokenUsageRepository.find({ + where: { + createdAt: MoreThanOrEqual(since), + }, + order: { createdAt: 'ASC' }, + }); + + // 按日期分组 + const byDate = new Map(); + for (const record of records) { + const date = record.createdAt.toISOString().split('T')[0]; + if (!byDate.has(date)) { + byDate.set(date, []); + } + byDate.get(date)!.push(record); + } + + return Array.from(byDate.entries()).map(([date, dayRecords]) => ({ + date, + requests: dayRecords.length, + totalTokens: dayRecords.reduce((sum, r) => sum + r.totalTokens, 0), + totalCost: dayRecords.reduce((sum, r) => sum + Number(r.estimatedCost), 0), + })); + } + + /** + * 获取用户排行榜 (按 token 消耗) + */ + async getTopUsers(days: number = 30, limit: number = 10): Promise> { + const since = new Date(); + since.setDate(since.getDate() - days); + + const result = await this.tokenUsageRepository + .createQueryBuilder('usage') + .select('usage.user_id', 'userId') + .addSelect('SUM(usage.total_tokens)', 'totalTokens') + .addSelect('SUM(usage.estimated_cost)', 'totalCost') + .addSelect('COUNT(*)', 'requestCount') + .where('usage.created_at >= :since', { since }) + .andWhere('usage.user_id IS NOT NULL') + .groupBy('usage.user_id') + .orderBy('SUM(usage.total_tokens)', 'DESC') + .limit(limit) + .getRawMany(); + + return result.map(r => ({ + userId: r.userId, + totalTokens: parseInt(r.totalTokens) || 0, + totalCost: parseFloat(r.totalCost) || 0, + requestCount: parseInt(r.requestCount) || 0, + })); + } + + /** + * 计算统计数据 + */ + private calculateStats(records: TokenUsageEntity[]): UsageStats { + if (records.length === 0) { + return { + totalRequests: 0, + totalInputTokens: 0, + totalOutputTokens: 0, + totalCacheReadTokens: 0, + totalCacheCreationTokens: 0, + totalTokens: 0, + totalCost: 0, + avgInputTokens: 0, + avgOutputTokens: 0, + avgLatencyMs: 0, + cacheHitRate: 0, + }; + } + + const totalInputTokens = records.reduce((sum, r) => sum + r.inputTokens, 0); + const totalOutputTokens = records.reduce((sum, r) => sum + r.outputTokens, 0); + const totalCacheReadTokens = records.reduce((sum, r) => sum + r.cacheReadTokens, 0); + const totalCacheCreationTokens = records.reduce((sum, r) => sum + r.cacheCreationTokens, 0); + const totalTokens = records.reduce((sum, r) => sum + r.totalTokens, 0); + const totalCost = records.reduce((sum, r) => sum + Number(r.estimatedCost), 0); + const totalLatency = records.reduce((sum, r) => sum + r.latencyMs, 0); + + // 缓存命中率 = 缓存读取的 tokens / 总输入 tokens + const cacheHitRate = totalInputTokens > 0 + ? (totalCacheReadTokens / totalInputTokens) * 100 + : 0; + + return { + totalRequests: records.length, + totalInputTokens, + totalOutputTokens, + totalCacheReadTokens, + totalCacheCreationTokens, + totalTokens, + totalCost, + avgInputTokens: Math.round(totalInputTokens / records.length), + avgOutputTokens: Math.round(totalOutputTokens / records.length), + avgLatencyMs: Math.round(totalLatency / records.length), + cacheHitRate: Math.round(cacheHitRate * 100) / 100, + }; + } +} diff --git a/scripts/init-db.sql b/scripts/init-db.sql index 682f315..3d429ae 100644 --- a/scripts/init-db.sql +++ b/scripts/init-db.sql @@ -1503,6 +1503,58 @@ CREATE INDEX idx_files_type ON files(type); CREATE TRIGGER update_files_updated_at BEFORE UPDATE ON files FOR EACH ROW EXECUTE FUNCTION update_updated_at_column(); +-- =========================================== +-- Token使用统计表 (token_usages) +-- 记录每次AI API调用的token消耗,用于成本分析和商业模式 +-- =========================================== +CREATE TABLE token_usages ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + -- 所属用户ID(可为空,支持匿名用户) + user_id UUID, + -- 所属对话ID + conversation_id UUID NOT NULL, + -- 关联的消息ID(可选) + message_id UUID, + -- 使用的模型 + model VARCHAR(50) NOT NULL, + -- 输入tokens + input_tokens INT NOT NULL DEFAULT 0, + -- 输出tokens + output_tokens INT NOT NULL DEFAULT 0, + -- 缓存创建的tokens (Prompt Caching) + cache_creation_tokens INT NOT NULL DEFAULT 0, + -- 缓存命中的tokens (Prompt Caching) + cache_read_tokens INT NOT NULL DEFAULT 0, + -- 总tokens (input + output) + total_tokens INT NOT NULL DEFAULT 0, + -- 估算成本(美元) + estimated_cost DECIMAL(10, 6) NOT NULL DEFAULT 0, + -- 意图类型 + intent_type VARCHAR(30), + -- 工具调用次数 + tool_calls INT NOT NULL DEFAULT 0, + -- 响应长度(字符数) + response_length INT NOT NULL DEFAULT 0, + -- 请求耗时(毫秒) + latency_ms INT NOT NULL DEFAULT 0, + -- 创建时间 + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +COMMENT ON TABLE token_usages IS 'Token使用统计表 - 记录每次AI API调用的token消耗'; +COMMENT ON COLUMN token_usages.cache_read_tokens IS 'Prompt Caching命中的tokens,成本仅为正常输入的10%'; +COMMENT ON COLUMN token_usages.estimated_cost IS '根据当前定价估算的成本(美元)'; +COMMENT ON COLUMN token_usages.intent_type IS '意图分类类型: SIMPLE_QUERY, DEEP_CONSULTATION, ACTION_NEEDED, CHAT, CLARIFICATION, CONFIRMATION'; +COMMENT ON COLUMN token_usages.latency_ms IS 'API请求的端到端延迟,单位毫秒'; + +CREATE INDEX idx_token_usages_user ON token_usages(user_id); +CREATE INDEX idx_token_usages_conversation ON token_usages(conversation_id); +CREATE INDEX idx_token_usages_created ON token_usages(created_at DESC); +CREATE INDEX idx_token_usages_model ON token_usages(model); +CREATE INDEX idx_token_usages_intent ON token_usages(intent_type); +-- BRIN索引,用于时间范围查询优化 +CREATE INDEX idx_token_usages_created_brin ON token_usages USING BRIN(created_at); + -- =========================================== -- 结束 -- ===========================================