it0/packages/services/agent-service/src/infrastructure/engines/claude-agent-sdk/claude-agent-sdk-engine.ts

387 lines
14 KiB
TypeScript

/**
* Claude Agent SDK Engine
*
* Third engine implementation using @anthropic-ai/claude-agent-sdk.
* Integrates with the existing EngineRegistry via the AgentEnginePort interface.
*
* Key features:
* - Default subscription auth (operator's Claude Code login, no API key needed)
* - Optional per-tenant API key billing (AES-256-GCM encrypted)
* - L2 approval flow via canUseTool callback with configurable auto-approve timeout
* - SDK session ID persistence for resume support
* - RBAC-based tool whitelist resolution per tenant/role
*
* Billing modes:
* - 'subscription': Uses inherited CLI auth from `claude login` (default)
* - 'api_key': Tenant provides own Anthropic API key, passed via env.ANTHROPIC_API_KEY
*/
import { Injectable, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { AgentEnginePort, EngineTaskParams, EngineStreamEvent } from '../../../domain/ports/outbound/agent-engine.port';
import { AgentEngineType } from '../../../domain/value-objects/agent-engine-type.vo';
import { CommandRiskLevel } from '../../../domain/value-objects/command-risk-level.vo';
import { CommandRiskClassifier } from '../../../domain/services/command-risk-classifier';
import { TenantAgentConfigService } from '../../services/tenant-agent-config.service';
import { AllowedToolsResolverService } from '../../../domain/services/allowed-tools-resolver.service';
import { TenantContextService } from '@it0/common';
import { ApprovalGate } from './approval-gate';
/** Tracks an active SDK session for cancellation and approval resolution. */
interface ActiveSession {
abort: AbortController;
gate: ApprovalGate;
sdkSessionId?: string;
}
@Injectable()
export class ClaudeAgentSdkEngine implements AgentEnginePort {
readonly engineType = AgentEngineType.CLAUDE_AGENT_SDK;
private readonly logger = new Logger(ClaudeAgentSdkEngine.name);
private readonly activeSessions = new Map<string, ActiveSession>();
private readonly classifier = new CommandRiskClassifier();
constructor(
private readonly configService: ConfigService,
private readonly tenantConfigService: TenantAgentConfigService,
private readonly allowedToolsResolver: AllowedToolsResolverService,
) {}
async *executeTask(params: EngineTaskParams): AsyncGenerator<EngineStreamEvent> {
const tenantId = TenantContextService.getTenantId();
const tenantConfig = await this.tenantConfigService.findByTenantId(tenantId);
// Build environment — subscription mode uses OAuth from ~/.claude/.credentials.json
const env: Record<string, string> = { ...process.env } as Record<string, string>;
// Disable TLS verification for proxy endpoints (self-signed certs)
const baseURL = this.configService.get<string>('ANTHROPIC_BASE_URL');
if (baseURL) {
env.NODE_TLS_REJECT_UNAUTHORIZED = '0';
env.ANTHROPIC_BASE_URL = baseURL;
}
if (tenantConfig?.billingMode === 'api_key') {
// Tenant uses their own API key
try {
env.ANTHROPIC_API_KEY = this.tenantConfigService.decryptApiKey(tenantConfig);
} catch (err) {
yield { type: 'error', message: 'Tenant API key not configured or invalid', code: 'API_KEY_ERROR' };
return;
}
} else {
// Subscription mode: remove API key so SDK uses OAuth credentials
delete env.ANTHROPIC_API_KEY;
}
// Create approval gate with tenant-configurable timeout
const timeoutSec = tenantConfig?.approvalTimeoutSeconds ?? 120;
const gate = new ApprovalGate(timeoutSec);
const abortController = new AbortController();
this.activeSessions.set(params.sessionId, { abort: abortController, gate });
// Event queue for merging SDK stream events and approval events
const eventQueue: (EngineStreamEvent | null)[] = [];
let resolveWait: (() => void) | null = null;
const pushEvent = (event: EngineStreamEvent | null) => {
eventQueue.push(event);
if (resolveWait) {
const resolve = resolveWait;
resolveWait = null;
resolve();
}
};
try {
const { query } = await import('@anthropic-ai/claude-agent-sdk');
const sdkQuery = query({
prompt: params.prompt,
options: {
systemPrompt: params.systemPrompt || undefined,
allowedTools: params.allowedTools?.length ? params.allowedTools : undefined,
maxTurns: params.maxTurns,
maxBudgetUsd: params.maxBudgetUsd,
env,
abortController,
permissionMode: 'default',
canUseTool: async (toolName, toolInput, { signal }) => {
const riskLevel = this.classifyToolRisk(toolName, toolInput);
// L0-L1: auto-approve
if (riskLevel <= CommandRiskLevel.LOW_RISK_WRITE) {
return { behavior: 'allow' as const, updatedInput: toolInput };
}
// L3: always block
if (riskLevel >= CommandRiskLevel.FORBIDDEN) {
return {
behavior: 'deny' as const,
message: `Command blocked: risk level FORBIDDEN`,
interrupt: false,
};
}
// L2: emit approval_required and wait for gate
pushEvent({
type: 'approval_required',
command: `${toolName}: ${JSON.stringify(toolInput).slice(0, 200)}`,
riskLevel: riskLevel,
taskId: params.sessionId,
});
const approved = await gate.waitForApproval(toolName, toolInput as Record<string, unknown>);
if (approved) {
return { behavior: 'allow' as const, updatedInput: toolInput };
} else {
return {
behavior: 'deny' as const,
message: 'User rejected the command',
interrupt: false,
};
}
},
},
});
// Consume SDK messages in the background and push to event queue
(async () => {
try {
for await (const message of sdkQuery) {
// Capture SDK session ID for resume support
if ('session_id' in message && message.session_id) {
const session = this.activeSessions.get(params.sessionId);
if (session) {
session.sdkSessionId = message.session_id;
}
}
const events = this.mapSdkMessage(message);
for (const event of events) {
pushEvent(event);
}
}
} catch (err: any) {
if (err?.name !== 'AbortError') {
pushEvent({
type: 'error',
message: err?.message ?? 'SDK execution error',
code: 'SDK_ERROR',
});
}
} finally {
pushEvent(null); // Signal end of stream
}
})();
// Yield events from the merged queue
while (true) {
if (eventQueue.length === 0) {
await new Promise<void>((resolve) => {
resolveWait = resolve;
});
}
while (eventQueue.length > 0) {
const event = eventQueue.shift();
if (event === null || event === undefined) return;
yield event;
}
}
} catch (err: any) {
yield {
type: 'error',
message: `Failed to initialize Agent SDK: ${err?.message}`,
code: 'SDK_INIT_ERROR',
};
} finally {
const session = this.activeSessions.get(params.sessionId);
if (session) {
session.gate.dispose();
}
this.activeSessions.delete(params.sessionId);
}
}
async cancelTask(sessionId: string): Promise<void> {
const session = this.activeSessions.get(sessionId);
if (session) {
session.gate.dispose();
session.abort.abort();
this.activeSessions.delete(sessionId);
}
}
async *continueSession(sessionId: string, message: string): AsyncGenerator<EngineStreamEvent> {
const session = this.activeSessions.get(sessionId);
// If there's a pending approval, resolve it
if (session?.gate.hasPending()) {
session.gate.resolveApproval(message === 'approved');
return;
}
// Otherwise, resume the SDK session
const sdkSessionId = session?.sdkSessionId;
if (!sdkSessionId) {
yield {
type: 'error',
message: `No SDK session found for session ${sessionId}`,
code: 'SESSION_NOT_FOUND',
};
return;
}
const tenantId = TenantContextService.getTenantId();
const tenantConfig = await this.tenantConfigService.findByTenantId(tenantId);
const env: Record<string, string> = { ...process.env } as Record<string, string>;
const baseURL = this.configService.get<string>('ANTHROPIC_BASE_URL');
if (baseURL) {
env.NODE_TLS_REJECT_UNAUTHORIZED = '0';
env.ANTHROPIC_BASE_URL = baseURL;
}
if (tenantConfig?.billingMode === 'api_key') {
try {
env.ANTHROPIC_API_KEY = this.tenantConfigService.decryptApiKey(tenantConfig);
} catch {
yield { type: 'error', message: 'Tenant API key invalid', code: 'API_KEY_ERROR' };
return;
}
} else {
delete env.ANTHROPIC_API_KEY;
}
const timeoutSec = tenantConfig?.approvalTimeoutSeconds ?? 120;
const gate = new ApprovalGate(timeoutSec);
const abortController = new AbortController();
this.activeSessions.set(sessionId, { abort: abortController, gate, sdkSessionId });
try {
const { query } = await import('@anthropic-ai/claude-agent-sdk');
const sdkQuery = query({
prompt: message,
options: {
resume: sdkSessionId,
env,
abortController,
permissionMode: 'default',
canUseTool: async (toolName, toolInput) => {
const riskLevel = this.classifyToolRisk(toolName, toolInput);
if (riskLevel <= CommandRiskLevel.LOW_RISK_WRITE) {
return { behavior: 'allow' as const, updatedInput: toolInput };
}
if (riskLevel >= CommandRiskLevel.FORBIDDEN) {
return { behavior: 'deny' as const, message: 'Command blocked: FORBIDDEN', interrupt: false };
}
const approved = await gate.waitForApproval(toolName, toolInput as Record<string, unknown>);
return approved
? { behavior: 'allow' as const, updatedInput: toolInput }
: { behavior: 'deny' as const, message: 'User rejected', interrupt: false };
},
},
});
for await (const msg of sdkQuery) {
const events = this.mapSdkMessage(msg);
for (const event of events) {
yield event;
}
}
} catch (err: any) {
if (err?.name !== 'AbortError') {
yield { type: 'error', message: err?.message ?? 'SDK resume error', code: 'SDK_ERROR' };
}
} finally {
const s = this.activeSessions.get(sessionId);
if (s) s.gate.dispose();
this.activeSessions.delete(sessionId);
}
}
async healthCheck(): Promise<boolean> {
try {
await import('@anthropic-ai/claude-agent-sdk');
return true;
} catch {
return false;
}
}
/**
* Resolve the approval for a pending session (called from controller).
*/
resolveApproval(sessionId: string, approved: boolean): boolean {
const session = this.activeSessions.get(sessionId);
if (!session?.gate.hasPending()) return false;
session.gate.resolveApproval(approved);
return true;
}
/**
* Get the SDK session ID for persistence in AgentSession.metadata.
*/
getSdkSessionId(sessionId: string): string | undefined {
return this.activeSessions.get(sessionId)?.sdkSessionId;
}
private classifyToolRisk(toolName: string, toolInput: any): CommandRiskLevel {
// Only classify Bash commands for risk; other tools are auto-allowed
if (toolName === 'Bash' && typeof toolInput?.command === 'string') {
return this.classifier.classify(toolInput.command);
}
// Write/Edit operations are low-risk write
if (toolName === 'Write' || toolName === 'Edit' || toolName === 'NotebookEdit') {
return CommandRiskLevel.LOW_RISK_WRITE;
}
// Read-only tools
return CommandRiskLevel.READ_ONLY;
}
private mapSdkMessage(message: any): EngineStreamEvent[] {
const events: EngineStreamEvent[] = [];
if (message.type === 'assistant') {
const content = message.message?.content;
if (Array.isArray(content)) {
for (const block of content) {
if (block.type === 'thinking') {
events.push({ type: 'thinking', content: block.thinking ?? '' });
} else if (block.type === 'text') {
events.push({ type: 'text', content: block.text ?? '' });
} else if (block.type === 'tool_use') {
events.push({
type: 'tool_use',
toolName: block.name ?? 'unknown',
input: block.input ?? {},
});
} else if (block.type === 'tool_result') {
events.push({
type: 'tool_result',
toolName: block.tool_use_id ?? 'unknown',
output: typeof block.content === 'string'
? block.content
: JSON.stringify(block.content ?? ''),
isError: block.is_error ?? false,
});
}
}
}
} else if (message.type === 'result') {
events.push({
type: 'completed',
summary: message.result ?? 'Task completed',
tokensUsed: message.usage
? (message.usage.input_tokens ?? 0) + (message.usage.output_tokens ?? 0)
: undefined,
});
} else if (message.type === 'system' && message.subtype === 'init') {
this.logger.log(`SDK session initialized: ${message.session_id}`);
}
return events;
}
}