diff --git a/packages/admin-client/src/features/conversations/application/useConversations.ts b/packages/admin-client/src/features/conversations/application/useConversations.ts index 20af5dc..3d7669e 100644 --- a/packages/admin-client/src/features/conversations/application/useConversations.ts +++ b/packages/admin-client/src/features/conversations/application/useConversations.ts @@ -63,4 +63,5 @@ export type { PaginatedConversations, ConversationStatistics, ConversationQueryParams, + TokenDetails, } from '../infrastructure/conversations.api'; diff --git a/packages/admin-client/src/features/conversations/infrastructure/conversations.api.ts b/packages/admin-client/src/features/conversations/infrastructure/conversations.api.ts index d87bf71..269ddb1 100644 --- a/packages/admin-client/src/features/conversations/infrastructure/conversations.api.ts +++ b/packages/admin-client/src/features/conversations/infrastructure/conversations.api.ts @@ -11,6 +11,14 @@ export interface DeviceInfo { region?: string; } +export interface TokenDetails { + cacheCreationTokens: number; + cacheReadTokens: number; + totalTokens: number; + estimatedCost: number; + apiCallCount: number; +} + export interface ConversationDto { id: string; userId: string; @@ -23,6 +31,7 @@ export interface ConversationDto { assistantMessageCount: number; totalInputTokens: number; totalOutputTokens: number; + tokenDetails?: TokenDetails; rating: number | null; feedback: string | null; hasConverted: boolean; diff --git a/packages/admin-client/src/features/conversations/presentation/pages/ConversationsPage.tsx b/packages/admin-client/src/features/conversations/presentation/pages/ConversationsPage.tsx index ec9ae6d..4829850 100644 --- a/packages/admin-client/src/features/conversations/presentation/pages/ConversationsPage.tsx +++ b/packages/admin-client/src/features/conversations/presentation/pages/ConversationsPage.tsx @@ -419,25 +419,67 @@ export function ConversationsPage() { Token 使用 - - + + - + + + + + + + {conversationDetail.tokenDetails && ( + <> + + + + + + + + + + + + + + + + + )} {/* Messages */} diff --git a/packages/services/conversation-service/src/adapters/inbound/admin-conversation.controller.ts b/packages/services/conversation-service/src/adapters/inbound/admin-conversation.controller.ts index 6b1d22f..ec4bb3d 100644 --- a/packages/services/conversation-service/src/adapters/inbound/admin-conversation.controller.ts +++ b/packages/services/conversation-service/src/adapters/inbound/admin-conversation.controller.ts @@ -11,6 +11,7 @@ import { Repository } from 'typeorm'; import * as jwt from 'jsonwebtoken'; import { ConversationORM } from '../../infrastructure/database/postgres/entities/conversation.orm'; import { MessageORM } from '../../infrastructure/database/postgres/entities/message.orm'; +import { TokenUsageORM } from '../../infrastructure/database/postgres/entities/token-usage.orm'; interface AdminPayload { id: string; @@ -18,6 +19,16 @@ interface AdminPayload { role: string; } +interface TokenAggregation { + totalInputTokens: number; + totalOutputTokens: number; + totalCacheCreationTokens: number; + totalCacheReadTokens: number; + totalTokens: number; + totalEstimatedCost: number; + apiCallCount: number; +} + /** * 管理员对话 API - 供 admin-client 使用 * 需要管理员 JWT 认证 @@ -29,6 +40,8 @@ export class AdminConversationController { private conversationRepo: Repository, @InjectRepository(MessageORM) private messageRepo: Repository, + @InjectRepository(TokenUsageORM) + private tokenUsageRepo: Repository, ) {} /** @@ -49,6 +62,35 @@ export class AdminConversationController { } } + /** + * 从 token_usage 表聚合准确的 token 使用数据 + */ + private async aggregateTokenUsage(conversationId: string): Promise { + const result = await this.tokenUsageRepo + .createQueryBuilder('t') + .select([ + 'SUM(t.input_tokens) as "totalInputTokens"', + 'SUM(t.output_tokens) as "totalOutputTokens"', + 'SUM(t.cache_creation_tokens) as "totalCacheCreationTokens"', + 'SUM(t.cache_read_tokens) as "totalCacheReadTokens"', + 'SUM(t.total_tokens) as "totalTokens"', + 'SUM(t.estimated_cost) as "totalEstimatedCost"', + 'COUNT(*) as "apiCallCount"', + ]) + .where('t.conversation_id = :conversationId', { conversationId }) + .getRawOne(); + + return { + totalInputTokens: parseInt(result?.totalInputTokens || '0'), + totalOutputTokens: parseInt(result?.totalOutputTokens || '0'), + totalCacheCreationTokens: parseInt(result?.totalCacheCreationTokens || '0'), + totalCacheReadTokens: parseInt(result?.totalCacheReadTokens || '0'), + totalTokens: parseInt(result?.totalTokens || '0'), + totalEstimatedCost: parseFloat(result?.totalEstimatedCost || '0'), + apiCallCount: parseInt(result?.apiCallCount || '0'), + }; + } + /** * 获取所有对话列表(分页) */ @@ -189,6 +231,7 @@ export class AdminConversationController { /** * 获取单个对话详情 + * 包含从 token_usage 表聚合的准确 token 数据 */ @Get(':id') async getConversation( @@ -208,6 +251,9 @@ export class AdminConversationController { }; } + // 从 token_usage 表获取准确的 token 统计(而不是仅依赖 conversation 实体) + const tokenStats = await this.aggregateTokenUsage(id); + return { success: true, data: { @@ -220,8 +266,17 @@ export class AdminConversationController { messageCount: conversation.messageCount, userMessageCount: conversation.userMessageCount, assistantMessageCount: conversation.assistantMessageCount, - totalInputTokens: conversation.totalInputTokens, - totalOutputTokens: conversation.totalOutputTokens, + // 使用聚合的准确 token 数据 + totalInputTokens: tokenStats.totalInputTokens, + totalOutputTokens: tokenStats.totalOutputTokens, + // 额外的 token 详情 + tokenDetails: { + cacheCreationTokens: tokenStats.totalCacheCreationTokens, + cacheReadTokens: tokenStats.totalCacheReadTokens, + totalTokens: tokenStats.totalTokens, + estimatedCost: tokenStats.totalEstimatedCost, + apiCallCount: tokenStats.apiCallCount, + }, rating: conversation.rating, feedback: conversation.feedback, hasConverted: conversation.hasConverted, diff --git a/packages/services/conversation-service/src/application/services/conversation.service.ts b/packages/services/conversation-service/src/application/services/conversation.service.ts index 13a2bd3..5642a7e 100644 --- a/packages/services/conversation-service/src/application/services/conversation.service.ts +++ b/packages/services/conversation-service/src/application/services/conversation.service.ts @@ -160,36 +160,48 @@ export class ConversationService { let updatedState: ConversationContext['consultingState'] | undefined; let inputTokens = 0; let outputTokens = 0; + let streamError: Error | null = null; // Stream response from Claude (with attachments for multimodal support) - for await (const chunk of this.claudeAgentService.sendMessage( - params.content, - context, - params.attachments, - )) { - if (chunk.type === 'text' && chunk.content) { - fullResponse += chunk.content; - } else if (chunk.type === 'tool_use') { - toolCalls.push({ - name: chunk.toolName!, - input: chunk.toolInput!, - result: null, - }); - } else if (chunk.type === 'tool_result') { - const lastToolCall = toolCalls[toolCalls.length - 1]; - if (lastToolCall) { - lastToolCall.result = chunk.toolResult; + try { + for await (const chunk of this.claudeAgentService.sendMessage( + params.content, + context, + params.attachments, + )) { + if (chunk.type === 'text' && chunk.content) { + fullResponse += chunk.content; + } else if (chunk.type === 'tool_use') { + toolCalls.push({ + name: chunk.toolName!, + input: chunk.toolInput!, + result: null, + }); + } else if (chunk.type === 'tool_result') { + const lastToolCall = toolCalls[toolCalls.length - 1]; + if (lastToolCall) { + lastToolCall.result = chunk.toolResult; + } + } else if (chunk.type === 'state_update' && chunk.newState) { + // V2: Capture updated consulting state + updatedState = chunk.newState; + } else if (chunk.type === 'end') { + // Capture token usage from end chunk + inputTokens = chunk.inputTokens || 0; + outputTokens = chunk.outputTokens || 0; + } else if (chunk.type === 'error') { + // Capture partial token usage from error chunk + inputTokens = chunk.inputTokens || 0; + outputTokens = chunk.outputTokens || 0; + console.warn(`[ConversationService] Stream error, captured partial tokens: in=${inputTokens}, out=${outputTokens}`); } - } else if (chunk.type === 'state_update' && chunk.newState) { - // V2: Capture updated consulting state - updatedState = chunk.newState; - } else if (chunk.type === 'end') { - // Capture token usage from end chunk - inputTokens = chunk.inputTokens || 0; - outputTokens = chunk.outputTokens || 0; - } - yield chunk; + yield chunk; + } + } catch (error) { + // Capture the error but continue to save partial data + streamError = error instanceof Error ? error : new Error(String(error)); + console.error('[ConversationService] Stream error:', streamError.message); } // V2: Save updated consulting state to conversation @@ -199,20 +211,24 @@ export class ConversationService { conversation.updateConsultingState(stateForDb); } - // Save assistant response - const assistantMessage = MessageEntity.create({ - id: uuidv4(), - conversationId: params.conversationId, - role: MessageRole.ASSISTANT, - type: MessageType.TEXT, - content: fullResponse, - metadata: toolCalls.length > 0 ? { toolCalls } : undefined, - }); - await this.messageRepo.save(assistantMessage); + // Save assistant response (even partial response on error) + if (fullResponse) { + const assistantMessage = MessageEntity.create({ + id: uuidv4(), + conversationId: params.conversationId, + role: MessageRole.ASSISTANT, + type: MessageType.TEXT, + content: fullResponse, + metadata: toolCalls.length > 0 ? { toolCalls } : undefined, + }); + await this.messageRepo.save(assistantMessage); + } - // Update conversation statistics + // Update conversation statistics (always update tokens, even on error) conversation.incrementMessageCount('user'); - conversation.incrementMessageCount('assistant'); + if (fullResponse) { + conversation.incrementMessageCount('assistant'); + } conversation.addTokens(inputTokens, outputTokens); // Update conversation title if first message @@ -224,6 +240,11 @@ export class ConversationService { // Save all updates to conversation await this.conversationRepo.update(conversation); + + // Re-throw error after saving partial data + if (streamError) { + throw streamError; + } } /** diff --git a/packages/services/conversation-service/src/infrastructure/claude/claude-agent-v2.service.ts b/packages/services/conversation-service/src/infrastructure/claude/claude-agent-v2.service.ts index 46c13dc..cd30779 100644 --- a/packages/services/conversation-service/src/infrastructure/claude/claude-agent-v2.service.ts +++ b/packages/services/conversation-service/src/infrastructure/claude/claude-agent-v2.service.ts @@ -58,7 +58,7 @@ export interface ConversationContext { } export interface StreamChunk { - type: 'text' | 'tool_use' | 'tool_result' | 'end' | 'stage_change' | 'state_update'; + type: 'text' | 'tool_use' | 'tool_result' | 'end' | 'stage_change' | 'state_update' | 'error'; content?: string; toolName?: string; toolInput?: Record; @@ -66,9 +66,11 @@ export interface StreamChunk { // V2新增 stageName?: string; newState?: ConsultingState; - // Token usage (returned with 'end' chunk) + // Token usage (returned with 'end' or 'error' chunk) inputTokens?: number; outputTokens?: number; + // Error info (only for 'error' type) + errorMessage?: string; } @Injectable() @@ -401,6 +403,32 @@ export class ClaudeAgentServiceV2 implements OnModuleInit { } catch (error) { console.error('[ClaudeAgentV2] Claude API error:', error); + + // Record partial token usage even on error (for cost tracking) + if (totalInputTokens > 0 || totalOutputTokens > 0) { + const latencyMs = Date.now() - startTime; + this.tokenUsageService.recordUsage({ + userId: context.userId, + conversationId: context.conversationId, + model: 'claude-sonnet-4-20250514', + inputTokens: totalInputTokens, + outputTokens: totalOutputTokens, + cacheCreationTokens: totalCacheCreationTokens, + cacheReadTokens: totalCacheReadTokens, + intentType: intent?.type || 'UNKNOWN', + toolCalls: toolCallCount, + responseLength: fullResponseText.length, + latencyMs, + }).catch(err => console.error('[ClaudeAgentV2] Failed to record partial token usage:', err)); + } + + // Yield error chunk with partial tokens before throwing + yield { + type: 'error', + inputTokens: totalInputTokens, + outputTokens: totalOutputTokens, + errorMessage: error instanceof Error ? error.message : 'Unknown error', + }; throw error; } }