fix(conversation): improve token tracking accuracy
- Add 'error' chunk type to StreamChunk for partial token capture - Record partial tokens to token_usage table even on API errors - Capture error chunk tokens in conversation.service.ts - Save partial response and tokens before re-throwing errors - Add token aggregation from token_usage table for accurate stats - Display detailed token info in admin (cache tokens, cost, API calls) - Export TokenDetails type for frontend consumption Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
ae99b78579
commit
7acdf78e0c
|
|
@ -63,4 +63,5 @@ export type {
|
|||
PaginatedConversations,
|
||||
ConversationStatistics,
|
||||
ConversationQueryParams,
|
||||
TokenDetails,
|
||||
} from '../infrastructure/conversations.api';
|
||||
|
|
|
|||
|
|
@ -11,6 +11,14 @@ export interface DeviceInfo {
|
|||
region?: string;
|
||||
}
|
||||
|
||||
export interface TokenDetails {
|
||||
cacheCreationTokens: number;
|
||||
cacheReadTokens: number;
|
||||
totalTokens: number;
|
||||
estimatedCost: number;
|
||||
apiCallCount: number;
|
||||
}
|
||||
|
||||
export interface ConversationDto {
|
||||
id: string;
|
||||
userId: string;
|
||||
|
|
@ -23,6 +31,7 @@ export interface ConversationDto {
|
|||
assistantMessageCount: number;
|
||||
totalInputTokens: number;
|
||||
totalOutputTokens: number;
|
||||
tokenDetails?: TokenDetails;
|
||||
rating: number | null;
|
||||
feedback: string | null;
|
||||
hasConverted: boolean;
|
||||
|
|
|
|||
|
|
@ -419,25 +419,67 @@ export function ConversationsPage() {
|
|||
Token 使用
|
||||
</Space>
|
||||
</Title>
|
||||
<Row gutter={16} className="mb-4">
|
||||
<Col span={12}>
|
||||
<Row gutter={[8, 8]} className="mb-4">
|
||||
<Col span={8}>
|
||||
<Card size="small">
|
||||
<Statistic
|
||||
title="输入 Tokens"
|
||||
value={conversationDetail.totalInputTokens}
|
||||
valueStyle={{ fontSize: 18 }}
|
||||
valueStyle={{ fontSize: 16 }}
|
||||
/>
|
||||
</Card>
|
||||
</Col>
|
||||
<Col span={12}>
|
||||
<Col span={8}>
|
||||
<Card size="small">
|
||||
<Statistic
|
||||
title="输出 Tokens"
|
||||
value={conversationDetail.totalOutputTokens}
|
||||
valueStyle={{ fontSize: 18 }}
|
||||
valueStyle={{ fontSize: 16 }}
|
||||
/>
|
||||
</Card>
|
||||
</Col>
|
||||
<Col span={8}>
|
||||
<Card size="small">
|
||||
<Statistic
|
||||
title="API 调用"
|
||||
value={conversationDetail.tokenDetails?.apiCallCount || 0}
|
||||
suffix="次"
|
||||
valueStyle={{ fontSize: 16 }}
|
||||
/>
|
||||
</Card>
|
||||
</Col>
|
||||
{conversationDetail.tokenDetails && (
|
||||
<>
|
||||
<Col span={8}>
|
||||
<Card size="small">
|
||||
<Statistic
|
||||
title="Cache 创建"
|
||||
value={conversationDetail.tokenDetails.cacheCreationTokens}
|
||||
valueStyle={{ fontSize: 14, color: '#faad14' }}
|
||||
/>
|
||||
</Card>
|
||||
</Col>
|
||||
<Col span={8}>
|
||||
<Card size="small">
|
||||
<Statistic
|
||||
title="Cache 读取"
|
||||
value={conversationDetail.tokenDetails.cacheReadTokens}
|
||||
valueStyle={{ fontSize: 14, color: '#52c41a' }}
|
||||
/>
|
||||
</Card>
|
||||
</Col>
|
||||
<Col span={8}>
|
||||
<Card size="small">
|
||||
<Statistic
|
||||
title="预估成本"
|
||||
value={conversationDetail.tokenDetails.estimatedCost.toFixed(4)}
|
||||
prefix="$"
|
||||
valueStyle={{ fontSize: 14, color: '#1890ff' }}
|
||||
/>
|
||||
</Card>
|
||||
</Col>
|
||||
</>
|
||||
)}
|
||||
</Row>
|
||||
|
||||
{/* Messages */}
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import { Repository } from 'typeorm';
|
|||
import * as jwt from 'jsonwebtoken';
|
||||
import { ConversationORM } from '../../infrastructure/database/postgres/entities/conversation.orm';
|
||||
import { MessageORM } from '../../infrastructure/database/postgres/entities/message.orm';
|
||||
import { TokenUsageORM } from '../../infrastructure/database/postgres/entities/token-usage.orm';
|
||||
|
||||
interface AdminPayload {
|
||||
id: string;
|
||||
|
|
@ -18,6 +19,16 @@ interface AdminPayload {
|
|||
role: string;
|
||||
}
|
||||
|
||||
interface TokenAggregation {
|
||||
totalInputTokens: number;
|
||||
totalOutputTokens: number;
|
||||
totalCacheCreationTokens: number;
|
||||
totalCacheReadTokens: number;
|
||||
totalTokens: number;
|
||||
totalEstimatedCost: number;
|
||||
apiCallCount: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* 管理员对话 API - 供 admin-client 使用
|
||||
* 需要管理员 JWT 认证
|
||||
|
|
@ -29,6 +40,8 @@ export class AdminConversationController {
|
|||
private conversationRepo: Repository<ConversationORM>,
|
||||
@InjectRepository(MessageORM)
|
||||
private messageRepo: Repository<MessageORM>,
|
||||
@InjectRepository(TokenUsageORM)
|
||||
private tokenUsageRepo: Repository<TokenUsageORM>,
|
||||
) {}
|
||||
|
||||
/**
|
||||
|
|
@ -49,6 +62,35 @@ export class AdminConversationController {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 从 token_usage 表聚合准确的 token 使用数据
|
||||
*/
|
||||
private async aggregateTokenUsage(conversationId: string): Promise<TokenAggregation> {
|
||||
const result = await this.tokenUsageRepo
|
||||
.createQueryBuilder('t')
|
||||
.select([
|
||||
'SUM(t.input_tokens) as "totalInputTokens"',
|
||||
'SUM(t.output_tokens) as "totalOutputTokens"',
|
||||
'SUM(t.cache_creation_tokens) as "totalCacheCreationTokens"',
|
||||
'SUM(t.cache_read_tokens) as "totalCacheReadTokens"',
|
||||
'SUM(t.total_tokens) as "totalTokens"',
|
||||
'SUM(t.estimated_cost) as "totalEstimatedCost"',
|
||||
'COUNT(*) as "apiCallCount"',
|
||||
])
|
||||
.where('t.conversation_id = :conversationId', { conversationId })
|
||||
.getRawOne();
|
||||
|
||||
return {
|
||||
totalInputTokens: parseInt(result?.totalInputTokens || '0'),
|
||||
totalOutputTokens: parseInt(result?.totalOutputTokens || '0'),
|
||||
totalCacheCreationTokens: parseInt(result?.totalCacheCreationTokens || '0'),
|
||||
totalCacheReadTokens: parseInt(result?.totalCacheReadTokens || '0'),
|
||||
totalTokens: parseInt(result?.totalTokens || '0'),
|
||||
totalEstimatedCost: parseFloat(result?.totalEstimatedCost || '0'),
|
||||
apiCallCount: parseInt(result?.apiCallCount || '0'),
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取所有对话列表(分页)
|
||||
*/
|
||||
|
|
@ -189,6 +231,7 @@ export class AdminConversationController {
|
|||
|
||||
/**
|
||||
* 获取单个对话详情
|
||||
* 包含从 token_usage 表聚合的准确 token 数据
|
||||
*/
|
||||
@Get(':id')
|
||||
async getConversation(
|
||||
|
|
@ -208,6 +251,9 @@ export class AdminConversationController {
|
|||
};
|
||||
}
|
||||
|
||||
// 从 token_usage 表获取准确的 token 统计(而不是仅依赖 conversation 实体)
|
||||
const tokenStats = await this.aggregateTokenUsage(id);
|
||||
|
||||
return {
|
||||
success: true,
|
||||
data: {
|
||||
|
|
@ -220,8 +266,17 @@ export class AdminConversationController {
|
|||
messageCount: conversation.messageCount,
|
||||
userMessageCount: conversation.userMessageCount,
|
||||
assistantMessageCount: conversation.assistantMessageCount,
|
||||
totalInputTokens: conversation.totalInputTokens,
|
||||
totalOutputTokens: conversation.totalOutputTokens,
|
||||
// 使用聚合的准确 token 数据
|
||||
totalInputTokens: tokenStats.totalInputTokens,
|
||||
totalOutputTokens: tokenStats.totalOutputTokens,
|
||||
// 额外的 token 详情
|
||||
tokenDetails: {
|
||||
cacheCreationTokens: tokenStats.totalCacheCreationTokens,
|
||||
cacheReadTokens: tokenStats.totalCacheReadTokens,
|
||||
totalTokens: tokenStats.totalTokens,
|
||||
estimatedCost: tokenStats.totalEstimatedCost,
|
||||
apiCallCount: tokenStats.apiCallCount,
|
||||
},
|
||||
rating: conversation.rating,
|
||||
feedback: conversation.feedback,
|
||||
hasConverted: conversation.hasConverted,
|
||||
|
|
|
|||
|
|
@ -160,36 +160,48 @@ export class ConversationService {
|
|||
let updatedState: ConversationContext['consultingState'] | undefined;
|
||||
let inputTokens = 0;
|
||||
let outputTokens = 0;
|
||||
let streamError: Error | null = null;
|
||||
|
||||
// Stream response from Claude (with attachments for multimodal support)
|
||||
for await (const chunk of this.claudeAgentService.sendMessage(
|
||||
params.content,
|
||||
context,
|
||||
params.attachments,
|
||||
)) {
|
||||
if (chunk.type === 'text' && chunk.content) {
|
||||
fullResponse += chunk.content;
|
||||
} else if (chunk.type === 'tool_use') {
|
||||
toolCalls.push({
|
||||
name: chunk.toolName!,
|
||||
input: chunk.toolInput!,
|
||||
result: null,
|
||||
});
|
||||
} else if (chunk.type === 'tool_result') {
|
||||
const lastToolCall = toolCalls[toolCalls.length - 1];
|
||||
if (lastToolCall) {
|
||||
lastToolCall.result = chunk.toolResult;
|
||||
try {
|
||||
for await (const chunk of this.claudeAgentService.sendMessage(
|
||||
params.content,
|
||||
context,
|
||||
params.attachments,
|
||||
)) {
|
||||
if (chunk.type === 'text' && chunk.content) {
|
||||
fullResponse += chunk.content;
|
||||
} else if (chunk.type === 'tool_use') {
|
||||
toolCalls.push({
|
||||
name: chunk.toolName!,
|
||||
input: chunk.toolInput!,
|
||||
result: null,
|
||||
});
|
||||
} else if (chunk.type === 'tool_result') {
|
||||
const lastToolCall = toolCalls[toolCalls.length - 1];
|
||||
if (lastToolCall) {
|
||||
lastToolCall.result = chunk.toolResult;
|
||||
}
|
||||
} else if (chunk.type === 'state_update' && chunk.newState) {
|
||||
// V2: Capture updated consulting state
|
||||
updatedState = chunk.newState;
|
||||
} else if (chunk.type === 'end') {
|
||||
// Capture token usage from end chunk
|
||||
inputTokens = chunk.inputTokens || 0;
|
||||
outputTokens = chunk.outputTokens || 0;
|
||||
} else if (chunk.type === 'error') {
|
||||
// Capture partial token usage from error chunk
|
||||
inputTokens = chunk.inputTokens || 0;
|
||||
outputTokens = chunk.outputTokens || 0;
|
||||
console.warn(`[ConversationService] Stream error, captured partial tokens: in=${inputTokens}, out=${outputTokens}`);
|
||||
}
|
||||
} else if (chunk.type === 'state_update' && chunk.newState) {
|
||||
// V2: Capture updated consulting state
|
||||
updatedState = chunk.newState;
|
||||
} else if (chunk.type === 'end') {
|
||||
// Capture token usage from end chunk
|
||||
inputTokens = chunk.inputTokens || 0;
|
||||
outputTokens = chunk.outputTokens || 0;
|
||||
}
|
||||
|
||||
yield chunk;
|
||||
yield chunk;
|
||||
}
|
||||
} catch (error) {
|
||||
// Capture the error but continue to save partial data
|
||||
streamError = error instanceof Error ? error : new Error(String(error));
|
||||
console.error('[ConversationService] Stream error:', streamError.message);
|
||||
}
|
||||
|
||||
// V2: Save updated consulting state to conversation
|
||||
|
|
@ -199,20 +211,24 @@ export class ConversationService {
|
|||
conversation.updateConsultingState(stateForDb);
|
||||
}
|
||||
|
||||
// Save assistant response
|
||||
const assistantMessage = MessageEntity.create({
|
||||
id: uuidv4(),
|
||||
conversationId: params.conversationId,
|
||||
role: MessageRole.ASSISTANT,
|
||||
type: MessageType.TEXT,
|
||||
content: fullResponse,
|
||||
metadata: toolCalls.length > 0 ? { toolCalls } : undefined,
|
||||
});
|
||||
await this.messageRepo.save(assistantMessage);
|
||||
// Save assistant response (even partial response on error)
|
||||
if (fullResponse) {
|
||||
const assistantMessage = MessageEntity.create({
|
||||
id: uuidv4(),
|
||||
conversationId: params.conversationId,
|
||||
role: MessageRole.ASSISTANT,
|
||||
type: MessageType.TEXT,
|
||||
content: fullResponse,
|
||||
metadata: toolCalls.length > 0 ? { toolCalls } : undefined,
|
||||
});
|
||||
await this.messageRepo.save(assistantMessage);
|
||||
}
|
||||
|
||||
// Update conversation statistics
|
||||
// Update conversation statistics (always update tokens, even on error)
|
||||
conversation.incrementMessageCount('user');
|
||||
conversation.incrementMessageCount('assistant');
|
||||
if (fullResponse) {
|
||||
conversation.incrementMessageCount('assistant');
|
||||
}
|
||||
conversation.addTokens(inputTokens, outputTokens);
|
||||
|
||||
// Update conversation title if first message
|
||||
|
|
@ -224,6 +240,11 @@ export class ConversationService {
|
|||
|
||||
// Save all updates to conversation
|
||||
await this.conversationRepo.update(conversation);
|
||||
|
||||
// Re-throw error after saving partial data
|
||||
if (streamError) {
|
||||
throw streamError;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -58,7 +58,7 @@ export interface ConversationContext {
|
|||
}
|
||||
|
||||
export interface StreamChunk {
|
||||
type: 'text' | 'tool_use' | 'tool_result' | 'end' | 'stage_change' | 'state_update';
|
||||
type: 'text' | 'tool_use' | 'tool_result' | 'end' | 'stage_change' | 'state_update' | 'error';
|
||||
content?: string;
|
||||
toolName?: string;
|
||||
toolInput?: Record<string, unknown>;
|
||||
|
|
@ -66,9 +66,11 @@ export interface StreamChunk {
|
|||
// V2新增
|
||||
stageName?: string;
|
||||
newState?: ConsultingState;
|
||||
// Token usage (returned with 'end' chunk)
|
||||
// Token usage (returned with 'end' or 'error' chunk)
|
||||
inputTokens?: number;
|
||||
outputTokens?: number;
|
||||
// Error info (only for 'error' type)
|
||||
errorMessage?: string;
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
|
|
@ -401,6 +403,32 @@ export class ClaudeAgentServiceV2 implements OnModuleInit {
|
|||
|
||||
} catch (error) {
|
||||
console.error('[ClaudeAgentV2] Claude API error:', error);
|
||||
|
||||
// Record partial token usage even on error (for cost tracking)
|
||||
if (totalInputTokens > 0 || totalOutputTokens > 0) {
|
||||
const latencyMs = Date.now() - startTime;
|
||||
this.tokenUsageService.recordUsage({
|
||||
userId: context.userId,
|
||||
conversationId: context.conversationId,
|
||||
model: 'claude-sonnet-4-20250514',
|
||||
inputTokens: totalInputTokens,
|
||||
outputTokens: totalOutputTokens,
|
||||
cacheCreationTokens: totalCacheCreationTokens,
|
||||
cacheReadTokens: totalCacheReadTokens,
|
||||
intentType: intent?.type || 'UNKNOWN',
|
||||
toolCalls: toolCallCount,
|
||||
responseLength: fullResponseText.length,
|
||||
latencyMs,
|
||||
}).catch(err => console.error('[ClaudeAgentV2] Failed to record partial token usage:', err));
|
||||
}
|
||||
|
||||
// Yield error chunk with partial tokens before throwing
|
||||
yield {
|
||||
type: 'error',
|
||||
inputTokens: totalInputTokens,
|
||||
outputTokens: totalOutputTokens,
|
||||
errorMessage: error instanceof Error ? error.message : 'Unknown error',
|
||||
};
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue