import { FastifyRequest, FastifyReply } from 'fastify'; import { GatewayConfig } from '../config'; import { ApiKeyRecord, AnthropicRequestBody, FilterResult } from '../types'; import { isModelAllowed } from '../middleware/auth'; import { checkContent } from '../middleware/content-filter'; import { injectSystemPrompt } from '../injection/system-prompt-injector'; import { recordFromAnthropicResponse } from '../logging/usage-tracker'; import { recordAudit } from '../logging/audit-logger'; import { pipeSSEStream, createStreamUsageTracker } from './stream-pipe'; import { sanitizeAnthropicResponse, buildAnthropicStreamTransform } from './response-sanitizer'; export function createAnthropicProxy(config: GatewayConfig) { return async function handleMessages(request: FastifyRequest, reply: FastifyReply): Promise { const startTime = Date.now(); const apiKeyRecord: ApiKeyRecord = (request as any).apiKeyRecord; const clientIp = request.ip || request.headers['x-forwarded-for'] as string || 'unknown'; // 1. Parse request body let body: AnthropicRequestBody; try { body = JSON.parse(request.body as string); } catch { reply.status(400).send({ error: { type: 'invalid_request_error', message: 'Invalid JSON body.' }, }); return; } const requestedModel = body.model || 'unknown'; // 2. Resolve effective model (override takes priority) const effectiveModel = apiKeyRecord.modelOverride || requestedModel; const aliasModel = apiKeyRecord.modelOverride ? (apiKeyRecord.modelAlias || apiKeyRecord.modelOverride) : requestedModel; // 3. Check model permission (skip when override is set — admin controls the model) if (!apiKeyRecord.modelOverride && !isModelAllowed(apiKeyRecord, requestedModel)) { reply.status(403).send({ error: { type: 'permission_error', message: `Model "${requestedModel}" is not allowed for this API key.`, }, }); return; } // Replace model in body with effective model for upstream body.model = effectiveModel; // 3. Check streaming permission if (body.stream && apiKeyRecord.permissions?.allowStreaming === false) { reply.status(403).send({ error: { type: 'permission_error', message: 'Streaming is not allowed for this API key.', }, }); return; } // 4. Content filtering let filterResult: FilterResult = { blocked: false }; if (body.messages) { filterResult = await checkContent(body.messages); if (filterResult.blocked) { recordAudit({ apiKeyId: apiKeyRecord.id, requestMethod: 'POST', requestPath: '/v1/messages', requestModel: effectiveModel, requestIp: clientIp, contentFiltered: true, filterRuleId: filterResult.ruleId || null, injectionApplied: false, responseStatus: 403, durationMs: Date.now() - startTime, }); reply.status(403).send({ error: { type: 'content_policy_violation', message: filterResult.reason || 'Content blocked by policy.', }, }); return; } } // 5. Inject regulatory content into system prompt const injection = await injectSystemPrompt(body.system, effectiveModel, apiKeyRecord.id); body.system = injection.system; // 6. Build upstream request headers const upstreamHeaders: Record = { 'Content-Type': 'application/json', 'anthropic-version': (request.headers['anthropic-version'] as string) || '2023-06-01', 'X-Api-Key': config.anthropicApiKey, }; // Forward optional headers const beta = request.headers['anthropic-beta']; if (beta) upstreamHeaders['anthropic-beta'] = beta as string; // 7. Forward to upstream let upstreamResponse: Response; try { upstreamResponse = await fetch(`${config.anthropicUpstreamUrl}/v1/messages`, { method: 'POST', headers: upstreamHeaders, body: JSON.stringify(body), }); } catch (err: any) { const durationMs = Date.now() - startTime; recordAudit({ apiKeyId: apiKeyRecord.id, requestMethod: 'POST', requestPath: '/v1/messages', requestModel: effectiveModel, requestIp: clientIp, contentFiltered: filterResult.action === 'warn' || filterResult.action === 'log', filterRuleId: filterResult.ruleId || null, injectionApplied: injection.applied, responseStatus: 502, durationMs, }); reply.status(502).send({ error: { type: 'upstream_error', message: `Failed to connect to upstream API: ${err.message}`, }, }); return; } const durationMs = Date.now() - startTime; // 8. Handle response if (body.stream && upstreamResponse.ok && upstreamResponse.body) { // Streaming response — pipe SSE directly reply.raw.writeHead(upstreamResponse.status, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'X-Accel-Buffering': 'no', }); const usageTracker = createStreamUsageTracker(); // Build transform to sanitize provider identity in SSE chunks const streamTransform = apiKeyRecord.modelOverride ? buildAnthropicStreamTransform(effectiveModel, aliasModel) : undefined; await pipeSSEStream(upstreamResponse.body, reply.raw, usageTracker.onDataLine, streamTransform); // Record usage from stream — log real model for billing const streamUsage = usageTracker.getUsage(); recordFromAnthropicResponse( apiKeyRecord.id, effectiveModel, { input_tokens: streamUsage.inputTokens, output_tokens: streamUsage.outputTokens }, upstreamResponse.status, Date.now() - startTime, ); recordAudit({ apiKeyId: apiKeyRecord.id, requestMethod: 'POST', requestPath: '/v1/messages', requestModel: effectiveModel, requestIp: clientIp, contentFiltered: filterResult.action === 'warn' || filterResult.action === 'log', filterRuleId: filterResult.ruleId || null, injectionApplied: injection.applied, responseStatus: upstreamResponse.status, durationMs: Date.now() - startTime, }); } else { // Non-streaming response — buffer and forward const responseText = await upstreamResponse.text(); // Try to extract usage for logging (use real model for billing) try { const responseJson = JSON.parse(responseText); if (responseJson.usage) { recordFromAnthropicResponse( apiKeyRecord.id, effectiveModel, responseJson.usage, upstreamResponse.status, durationMs, ); } // Deep sanitize response — mask model, id, provider-specific fields if (apiKeyRecord.modelOverride && responseJson.model) { sanitizeAnthropicResponse(responseJson, aliasModel); const maskedText = JSON.stringify(responseJson); recordAudit({ apiKeyId: apiKeyRecord.id, requestMethod: 'POST', requestPath: '/v1/messages', requestModel: effectiveModel, requestIp: clientIp, contentFiltered: filterResult.action === 'warn' || filterResult.action === 'log', filterRuleId: filterResult.ruleId || null, injectionApplied: injection.applied, responseStatus: upstreamResponse.status, durationMs, }); reply.raw.writeHead(upstreamResponse.status, { 'Content-Type': 'application/json', }); reply.raw.end(maskedText); return; } } catch { // Not JSON — still forward } recordAudit({ apiKeyId: apiKeyRecord.id, requestMethod: 'POST', requestPath: '/v1/messages', requestModel: effectiveModel, requestIp: clientIp, contentFiltered: filterResult.action === 'warn' || filterResult.action === 'log', filterRuleId: filterResult.ruleId || null, injectionApplied: injection.applied, responseStatus: upstreamResponse.status, durationMs, }); // Forward all response headers from upstream reply.raw.writeHead(upstreamResponse.status, { 'Content-Type': upstreamResponse.headers.get('content-type') || 'application/json', }); reply.raw.end(responseText); } }; }