iconsulting/packages/services/llm-gateway/src/logging/usage-tracker.ts

143 lines
3.9 KiB
TypeScript

import { query } from '../db';
import { v4 as uuidv4 } from 'uuid';
import { UsageLogEntry } from '../types';
// ─── Cost estimation (USD per million tokens, approximate) ───
const COST_TABLE: Record<string, { input: number; output: number }> = {
'claude-sonnet-4-5': { input: 3.0, output: 15.0 },
'claude-haiku-4-5': { input: 0.8, output: 4.0 },
'claude-opus-4': { input: 15.0, output: 75.0 },
'text-embedding-3-small': { input: 0.02, output: 0 },
'text-embedding-3-large': { input: 0.13, output: 0 },
};
function estimateCost(model: string, inputTokens: number, outputTokens: number): number | null {
// Find matching cost entry by prefix
for (const [prefix, costs] of Object.entries(COST_TABLE)) {
if (model.startsWith(prefix)) {
return (inputTokens * costs.input + outputTokens * costs.output) / 1_000_000;
}
}
return null;
}
// ─── Batch insert queue (async, non-blocking) ───
const pendingLogs: UsageLogEntry[] = [];
let flushTimer: ReturnType<typeof setTimeout> | null = null;
const FLUSH_INTERVAL_MS = 5_000;
const BATCH_SIZE = 50;
async function flushLogs(): Promise<void> {
if (pendingLogs.length === 0) return;
const batch = pendingLogs.splice(0, BATCH_SIZE);
try {
const values: any[] = [];
const placeholders: string[] = [];
for (let i = 0; i < batch.length; i++) {
const entry = batch[i];
const offset = i * 10;
placeholders.push(
`($${offset + 1}, $${offset + 2}, $${offset + 3}, $${offset + 4}, $${offset + 5}, $${offset + 6}, $${offset + 7}, $${offset + 8}, $${offset + 9}, $${offset + 10})`,
);
values.push(
uuidv4(),
entry.apiKeyId,
entry.model,
entry.provider,
entry.inputTokens,
entry.outputTokens,
entry.totalTokens,
entry.costUsd,
entry.durationMs,
entry.statusCode,
);
}
await query(
`INSERT INTO gateway_usage_logs (id, api_key_id, model, provider, input_tokens, output_tokens, total_tokens, cost_usd, duration_ms, status_code)
VALUES ${placeholders.join(', ')}`,
values,
);
} catch (err: any) {
console.error('[UsageTracker] Failed to flush logs:', err.message);
// Re-queue failed entries (at the front, max 500 pending)
if (pendingLogs.length < 500) {
pendingLogs.unshift(...batch);
}
}
}
function scheduleFlush(): void {
if (flushTimer) return;
flushTimer = setTimeout(async () => {
flushTimer = null;
await flushLogs();
if (pendingLogs.length > 0) scheduleFlush();
}, FLUSH_INTERVAL_MS);
}
// ─── Public API ───
export function recordUsage(entry: UsageLogEntry): void {
if (!entry.costUsd && entry.costUsd !== 0) {
entry.costUsd = estimateCost(entry.model, entry.inputTokens, entry.outputTokens);
}
pendingLogs.push(entry);
if (pendingLogs.length >= BATCH_SIZE) {
flushLogs();
} else {
scheduleFlush();
}
}
export function recordFromAnthropicResponse(
apiKeyId: string,
model: string,
usage: { input_tokens?: number; output_tokens?: number } | undefined,
statusCode: number,
durationMs: number,
): void {
recordUsage({
apiKeyId,
model,
provider: 'anthropic',
inputTokens: usage?.input_tokens || 0,
outputTokens: usage?.output_tokens || 0,
totalTokens: (usage?.input_tokens || 0) + (usage?.output_tokens || 0),
costUsd: null,
durationMs,
statusCode,
});
}
export function recordFromOpenAIResponse(
apiKeyId: string,
model: string,
usage: { prompt_tokens?: number; total_tokens?: number } | undefined,
statusCode: number,
durationMs: number,
): void {
recordUsage({
apiKeyId,
model,
provider: 'openai',
inputTokens: usage?.prompt_tokens || 0,
outputTokens: 0,
totalTokens: usage?.total_tokens || 0,
costUsd: null,
durationMs,
statusCode,
});
}
export async function shutdown(): Promise<void> {
if (flushTimer) clearTimeout(flushTimer);
await flushLogs();
}