feat(conversation): add token usage tracking for API cost analysis

- Add TokenUsageEntity to store per-request token consumption
- Add TokenUsageService with cost calculation and statistics APIs
  - Record input/output/cache tokens per API call
  - Calculate estimated cost based on Claude pricing
  - Provide user/conversation/global stats aggregation
  - Support daily stats and top users ranking
- Integrate token tracking in ClaudeAgentService
  - Track latency, tool calls, response length
  - Accumulate tokens across tool loop iterations
- Add token_usages table to init-db.sql with proper indexes

This enables:
- Per-user token consumption tracking
- Cost analysis and optimization
- Future billing/quota features

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-01-23 08:23:58 -08:00
parent c768e2aa53
commit 849a4a3099
6 changed files with 488 additions and 7 deletions

View File

@ -19,7 +19,16 @@
"Bash(scp:*)",
"Bash(timeout 20 cat:*)",
"Bash(npm run build:*)",
"Bash(pnpm run build:*)"
"Bash(pnpm run build:*)",
"Bash(pnpm --filter admin-client build:*)",
"Bash(node:*)",
"Bash(npm run build:conversation:*)",
"Bash(npm run lint)",
"Bash(dir:*)",
"Bash(cmd /c \"dir %USERPROFILE%\\\\.ssh\")",
"Bash(git fetch:*)",
"Bash(TEST_USER_ID=\"a1b2c3d4-e5f6-7890-abcd-ef1234567890\":*)",
"Bash(git reset:*)"
]
}
}

View File

@ -0,0 +1,76 @@
import {
Entity,
PrimaryGeneratedColumn,
Column,
CreateDateColumn,
Index,
} from 'typeorm';
/**
* Token 使
* Claude API token
*/
@Entity('token_usages')
@Index('idx_token_usages_user', ['userId'])
@Index('idx_token_usages_conversation', ['conversationId'])
@Index('idx_token_usages_created', ['createdAt'])
@Index('idx_token_usages_model', ['model'])
export class TokenUsageEntity {
@PrimaryGeneratedColumn('uuid')
id: string;
@Column({ name: 'user_id', type: 'uuid', nullable: true })
userId: string | null;
@Column({ name: 'conversation_id', type: 'uuid' })
conversationId: string;
@Column({ name: 'message_id', type: 'uuid', nullable: true })
messageId: string | null;
@Column({ length: 50 })
model: string;
// 输入 tokens
@Column({ name: 'input_tokens', default: 0 })
inputTokens: number;
// 输出 tokens
@Column({ name: 'output_tokens', default: 0 })
outputTokens: number;
// 缓存创建的 tokens (Prompt Caching)
@Column({ name: 'cache_creation_tokens', default: 0 })
cacheCreationTokens: number;
// 缓存命中的 tokens (Prompt Caching)
@Column({ name: 'cache_read_tokens', default: 0 })
cacheReadTokens: number;
// 总 tokens (input + output)
@Column({ name: 'total_tokens', default: 0 })
totalTokens: number;
// 估算成本 (美元)
@Column({ name: 'estimated_cost', type: 'decimal', precision: 10, scale: 6, default: 0 })
estimatedCost: number;
// 意图类型
@Column({ name: 'intent_type', length: 30, nullable: true })
intentType: string | null;
// 工具调用次数
@Column({ name: 'tool_calls', default: 0 })
toolCalls: number;
// 响应长度(字符数)
@Column({ name: 'response_length', default: 0 })
responseLength: number;
// 请求耗时(毫秒)
@Column({ name: 'latency_ms', default: 0 })
latencyMs: number;
@CreateDateColumn({ name: 'created_at' })
createdAt: Date;
}

View File

@ -2,6 +2,7 @@ import { Injectable, OnModuleInit } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import Anthropic from '@anthropic-ai/sdk';
import { ImmigrationToolsService } from './tools/immigration-tools.service';
import { TokenUsageService } from './token-usage.service';
import { buildSystemPrompt, SystemPromptConfig } from './prompts/system-prompt';
import { KnowledgeClientService } from '../knowledge/knowledge-client.service';
import { intentClassifier, IntentResult, IntentType } from './intent-classifier';
@ -45,6 +46,7 @@ export class ClaudeAgentService implements OnModuleInit {
private configService: ConfigService,
private immigrationToolsService: ImmigrationToolsService,
private knowledgeClient: KnowledgeClientService,
private tokenUsageService: TokenUsageService,
) {}
onModuleInit() {
@ -279,6 +281,14 @@ export class ClaudeAgentService implements OnModuleInit {
// 用于收集完整响应以进行门控检查
let fullResponseText = '';
// Token 使用量累积
const startTime = Date.now();
let totalInputTokens = 0;
let totalOutputTokens = 0;
let totalCacheCreationTokens = 0;
let totalCacheReadTokens = 0;
let toolCallCount = 0;
while (iterations < maxIterations) {
iterations++;
@ -352,6 +362,19 @@ export class ClaudeAgentService implements OnModuleInit {
}
}
// 获取最终消息以提取 usage 信息
const finalMsg = await stream.finalMessage();
// 累积 token 使用量
if (finalMsg.usage) {
totalInputTokens += finalMsg.usage.input_tokens || 0;
totalOutputTokens += finalMsg.usage.output_tokens || 0;
// Prompt Caching 的 tokens (如果 API 返回)
const usage = finalMsg.usage as Record<string, number>;
totalCacheCreationTokens += usage.cache_creation_input_tokens || 0;
totalCacheReadTokens += usage.cache_read_input_tokens || 0;
}
// If no tool uses, we're done
if (toolUses.length === 0) {
// ========== 第三层:回复质量门控(日志记录) ==========
@ -362,14 +385,32 @@ export class ClaudeAgentService implements OnModuleInit {
console.log(`[ClaudeAgent] Gate suggestions: ${gateResult.suggestions.join(', ')}`);
}
}
// ========== 记录 Token 使用量 ==========
const latencyMs = Date.now() - startTime;
this.tokenUsageService.recordUsage({
userId: context.userId,
conversationId: context.conversationId,
model: 'claude-sonnet-4-20250514',
inputTokens: totalInputTokens,
outputTokens: totalOutputTokens,
cacheCreationTokens: totalCacheCreationTokens,
cacheReadTokens: totalCacheReadTokens,
intentType: intent.type,
toolCalls: toolCallCount,
responseLength: fullResponseText.length,
latencyMs,
}).catch(err => console.error('[ClaudeAgent] Failed to record token usage:', err));
yield { type: 'end' };
return;
}
// 累积工具调用次数
toolCallCount += toolUses.length;
// Build assistant message content with tool uses
// First get the final message to extract text content
const finalMessage = await stream.finalMessage();
for (const block of finalMessage.content) {
for (const block of finalMsg.content) {
if (block.type === 'text') {
assistantContent.push({ type: 'text', text: block.text });
} else if (block.type === 'tool_use') {

View File

@ -1,13 +1,20 @@
import { Module, Global } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { TypeOrmModule } from '@nestjs/typeorm';
import { ClaudeAgentService } from './claude-agent.service';
import { ImmigrationToolsService } from './tools/immigration-tools.service';
import { TokenUsageService } from './token-usage.service';
import { TokenUsageEntity } from '../../domain/entities/token-usage.entity';
import { KnowledgeModule } from '../knowledge/knowledge.module';
@Global()
@Module({
imports: [ConfigModule, KnowledgeModule],
providers: [ClaudeAgentService, ImmigrationToolsService],
exports: [ClaudeAgentService, ImmigrationToolsService],
imports: [
ConfigModule,
KnowledgeModule,
TypeOrmModule.forFeature([TokenUsageEntity]),
],
providers: [ClaudeAgentService, ImmigrationToolsService, TokenUsageService],
exports: [ClaudeAgentService, ImmigrationToolsService, TokenUsageService],
})
export class ClaudeModule {}

View File

@ -0,0 +1,296 @@
import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository, Between, MoreThanOrEqual } from 'typeorm';
import { TokenUsageEntity } from '../../domain/entities/token-usage.entity';
/**
* Claude API ( 2024)
* claude-sonnet-4-20250514:
* - Input: $3/MTok
* - Output: $15/MTok
* - Cache write: $3.75/MTok
* - Cache read: $0.30/MTok
*/
const PRICING = {
'claude-sonnet-4-20250514': {
input: 3 / 1_000_000,
output: 15 / 1_000_000,
cacheWrite: 3.75 / 1_000_000,
cacheRead: 0.30 / 1_000_000,
},
default: {
input: 3 / 1_000_000,
output: 15 / 1_000_000,
cacheWrite: 3.75 / 1_000_000,
cacheRead: 0.30 / 1_000_000,
},
};
export interface TokenUsageInput {
userId?: string;
conversationId: string;
messageId?: string;
model: string;
inputTokens: number;
outputTokens: number;
cacheCreationTokens?: number;
cacheReadTokens?: number;
intentType?: string;
toolCalls?: number;
responseLength?: number;
latencyMs?: number;
}
export interface UsageStats {
totalRequests: number;
totalInputTokens: number;
totalOutputTokens: number;
totalCacheReadTokens: number;
totalCacheCreationTokens: number;
totalTokens: number;
totalCost: number;
avgInputTokens: number;
avgOutputTokens: number;
avgLatencyMs: number;
cacheHitRate: number;
}
@Injectable()
export class TokenUsageService {
constructor(
@InjectRepository(TokenUsageEntity)
private tokenUsageRepository: Repository<TokenUsageEntity>,
) {}
/**
*
*/
private calculateCost(
model: string,
inputTokens: number,
outputTokens: number,
cacheCreationTokens: number,
cacheReadTokens: number,
): number {
const pricing = PRICING[model] || PRICING.default;
// 缓存命中的 tokens 不计入普通输入
const regularInputTokens = inputTokens - cacheReadTokens;
return (
regularInputTokens * pricing.input +
outputTokens * pricing.output +
cacheCreationTokens * pricing.cacheWrite +
cacheReadTokens * pricing.cacheRead
);
}
/**
* API token 使
*/
async recordUsage(input: TokenUsageInput): Promise<TokenUsageEntity> {
const cacheCreationTokens = input.cacheCreationTokens || 0;
const cacheReadTokens = input.cacheReadTokens || 0;
const totalTokens = input.inputTokens + input.outputTokens;
const estimatedCost = this.calculateCost(
input.model,
input.inputTokens,
input.outputTokens,
cacheCreationTokens,
cacheReadTokens,
);
const entity = this.tokenUsageRepository.create({
userId: input.userId || null,
conversationId: input.conversationId,
messageId: input.messageId || null,
model: input.model,
inputTokens: input.inputTokens,
outputTokens: input.outputTokens,
cacheCreationTokens,
cacheReadTokens,
totalTokens,
estimatedCost,
intentType: input.intentType || null,
toolCalls: input.toolCalls || 0,
responseLength: input.responseLength || 0,
latencyMs: input.latencyMs || 0,
});
const saved = await this.tokenUsageRepository.save(entity);
console.log(
`[TokenUsage] Recorded: in=${input.inputTokens}, out=${input.outputTokens}, ` +
`cache_read=${cacheReadTokens}, cost=$${estimatedCost.toFixed(6)}, ` +
`intent=${input.intentType}, latency=${input.latencyMs}ms`
);
return saved;
}
/**
* token 使
*/
async getUserStats(userId: string, days: number = 30): Promise<UsageStats> {
const since = new Date();
since.setDate(since.getDate() - days);
const records = await this.tokenUsageRepository.find({
where: {
userId,
createdAt: MoreThanOrEqual(since),
},
});
return this.calculateStats(records);
}
/**
* token 使
*/
async getConversationStats(conversationId: string): Promise<UsageStats> {
const records = await this.tokenUsageRepository.find({
where: { conversationId },
});
return this.calculateStats(records);
}
/**
* ()
*/
async getGlobalStats(days: number = 30): Promise<UsageStats & { uniqueUsers: number }> {
const since = new Date();
since.setDate(since.getDate() - days);
const records = await this.tokenUsageRepository.find({
where: {
createdAt: MoreThanOrEqual(since),
},
});
const stats = this.calculateStats(records);
const uniqueUsers = new Set(records.filter(r => r.userId).map(r => r.userId)).size;
return { ...stats, uniqueUsers };
}
/**
*
*/
async getDailyStats(days: number = 7): Promise<Array<{
date: string;
requests: number;
totalTokens: number;
totalCost: number;
}>> {
const since = new Date();
since.setDate(since.getDate() - days);
const records = await this.tokenUsageRepository.find({
where: {
createdAt: MoreThanOrEqual(since),
},
order: { createdAt: 'ASC' },
});
// 按日期分组
const byDate = new Map<string, TokenUsageEntity[]>();
for (const record of records) {
const date = record.createdAt.toISOString().split('T')[0];
if (!byDate.has(date)) {
byDate.set(date, []);
}
byDate.get(date)!.push(record);
}
return Array.from(byDate.entries()).map(([date, dayRecords]) => ({
date,
requests: dayRecords.length,
totalTokens: dayRecords.reduce((sum, r) => sum + r.totalTokens, 0),
totalCost: dayRecords.reduce((sum, r) => sum + Number(r.estimatedCost), 0),
}));
}
/**
* ( token )
*/
async getTopUsers(days: number = 30, limit: number = 10): Promise<Array<{
userId: string;
totalTokens: number;
totalCost: number;
requestCount: number;
}>> {
const since = new Date();
since.setDate(since.getDate() - days);
const result = await this.tokenUsageRepository
.createQueryBuilder('usage')
.select('usage.user_id', 'userId')
.addSelect('SUM(usage.total_tokens)', 'totalTokens')
.addSelect('SUM(usage.estimated_cost)', 'totalCost')
.addSelect('COUNT(*)', 'requestCount')
.where('usage.created_at >= :since', { since })
.andWhere('usage.user_id IS NOT NULL')
.groupBy('usage.user_id')
.orderBy('SUM(usage.total_tokens)', 'DESC')
.limit(limit)
.getRawMany();
return result.map(r => ({
userId: r.userId,
totalTokens: parseInt(r.totalTokens) || 0,
totalCost: parseFloat(r.totalCost) || 0,
requestCount: parseInt(r.requestCount) || 0,
}));
}
/**
*
*/
private calculateStats(records: TokenUsageEntity[]): UsageStats {
if (records.length === 0) {
return {
totalRequests: 0,
totalInputTokens: 0,
totalOutputTokens: 0,
totalCacheReadTokens: 0,
totalCacheCreationTokens: 0,
totalTokens: 0,
totalCost: 0,
avgInputTokens: 0,
avgOutputTokens: 0,
avgLatencyMs: 0,
cacheHitRate: 0,
};
}
const totalInputTokens = records.reduce((sum, r) => sum + r.inputTokens, 0);
const totalOutputTokens = records.reduce((sum, r) => sum + r.outputTokens, 0);
const totalCacheReadTokens = records.reduce((sum, r) => sum + r.cacheReadTokens, 0);
const totalCacheCreationTokens = records.reduce((sum, r) => sum + r.cacheCreationTokens, 0);
const totalTokens = records.reduce((sum, r) => sum + r.totalTokens, 0);
const totalCost = records.reduce((sum, r) => sum + Number(r.estimatedCost), 0);
const totalLatency = records.reduce((sum, r) => sum + r.latencyMs, 0);
// 缓存命中率 = 缓存读取的 tokens / 总输入 tokens
const cacheHitRate = totalInputTokens > 0
? (totalCacheReadTokens / totalInputTokens) * 100
: 0;
return {
totalRequests: records.length,
totalInputTokens,
totalOutputTokens,
totalCacheReadTokens,
totalCacheCreationTokens,
totalTokens,
totalCost,
avgInputTokens: Math.round(totalInputTokens / records.length),
avgOutputTokens: Math.round(totalOutputTokens / records.length),
avgLatencyMs: Math.round(totalLatency / records.length),
cacheHitRate: Math.round(cacheHitRate * 100) / 100,
};
}
}

View File

@ -1503,6 +1503,58 @@ CREATE INDEX idx_files_type ON files(type);
CREATE TRIGGER update_files_updated_at BEFORE UPDATE ON files FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();
-- ===========================================
-- Token使用统计表 (token_usages)
-- 记录每次AI API调用的token消耗用于成本分析和商业模式
-- ===========================================
CREATE TABLE token_usages (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
-- 所属用户ID可为空支持匿名用户
user_id UUID,
-- 所属对话ID
conversation_id UUID NOT NULL,
-- 关联的消息ID可选
message_id UUID,
-- 使用的模型
model VARCHAR(50) NOT NULL,
-- 输入tokens
input_tokens INT NOT NULL DEFAULT 0,
-- 输出tokens
output_tokens INT NOT NULL DEFAULT 0,
-- 缓存创建的tokens (Prompt Caching)
cache_creation_tokens INT NOT NULL DEFAULT 0,
-- 缓存命中的tokens (Prompt Caching)
cache_read_tokens INT NOT NULL DEFAULT 0,
-- 总tokens (input + output)
total_tokens INT NOT NULL DEFAULT 0,
-- 估算成本(美元)
estimated_cost DECIMAL(10, 6) NOT NULL DEFAULT 0,
-- 意图类型
intent_type VARCHAR(30),
-- 工具调用次数
tool_calls INT NOT NULL DEFAULT 0,
-- 响应长度(字符数)
response_length INT NOT NULL DEFAULT 0,
-- 请求耗时(毫秒)
latency_ms INT NOT NULL DEFAULT 0,
-- 创建时间
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
COMMENT ON TABLE token_usages IS 'Token使用统计表 - 记录每次AI API调用的token消耗';
COMMENT ON COLUMN token_usages.cache_read_tokens IS 'Prompt Caching命中的tokens成本仅为正常输入的10%';
COMMENT ON COLUMN token_usages.estimated_cost IS '根据当前定价估算的成本(美元)';
COMMENT ON COLUMN token_usages.intent_type IS '意图分类类型: SIMPLE_QUERY, DEEP_CONSULTATION, ACTION_NEEDED, CHAT, CLARIFICATION, CONFIRMATION';
COMMENT ON COLUMN token_usages.latency_ms IS 'API请求的端到端延迟单位毫秒';
CREATE INDEX idx_token_usages_user ON token_usages(user_id);
CREATE INDEX idx_token_usages_conversation ON token_usages(conversation_id);
CREATE INDEX idx_token_usages_created ON token_usages(created_at DESC);
CREATE INDEX idx_token_usages_model ON token_usages(model);
CREATE INDEX idx_token_usages_intent ON token_usages(intent_type);
-- BRIN索引用于时间范围查询优化
CREATE INDEX idx_token_usages_created_brin ON token_usages USING BRIN(created_at);
-- ===========================================
-- 结束
-- ===========================================