feat: SDK engine native resume with per-tenant HOME isolation
Replace prompt-prefix workaround with SDK's native resume mechanism.
Each tenant gets isolated HOME directory (/data/claude-tenants/{tenantId})
to prevent cross-tenant session file mixing. SDK session IDs are persisted
in session.metadata for cross-request resume support.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
2403ce5636
commit
cc0f06e2be
|
|
@ -121,6 +121,7 @@ services:
|
|||
volumes:
|
||||
- ${HOME}/.claude:/home/appuser/.claude
|
||||
- ${HOME}/.claude.json:/home/appuser/.claude.json
|
||||
- claude_tenants:/data/claude-tenants
|
||||
environment:
|
||||
- DB_HOST=postgres
|
||||
- DB_PORT=5432
|
||||
|
|
@ -371,6 +372,7 @@ services:
|
|||
|
||||
volumes:
|
||||
postgres_data:
|
||||
claude_tenants:
|
||||
|
||||
networks:
|
||||
it0-network:
|
||||
|
|
|
|||
|
|
@ -26,6 +26,8 @@ export interface EngineTaskParams {
|
|||
role: 'user' | 'assistant';
|
||||
content: string | any[];
|
||||
}>;
|
||||
/** SDK session ID for native resume (Agent SDK engine only). */
|
||||
resumeSessionId?: string;
|
||||
}
|
||||
|
||||
export type EngineStreamEvent =
|
||||
|
|
|
|||
|
|
@ -25,6 +25,8 @@ import { TenantAgentConfigService } from '../../services/tenant-agent-config.ser
|
|||
import { AllowedToolsResolverService } from '../../../domain/services/allowed-tools-resolver.service';
|
||||
import { TenantContextService } from '@it0/common';
|
||||
import { ApprovalGate } from './approval-gate';
|
||||
import * as fs from 'fs';
|
||||
import * as path from 'path';
|
||||
|
||||
// Dynamic import helper that survives tsc commonjs compilation
|
||||
// (tsc converts `await import()` → require() which breaks ESM-only packages)
|
||||
|
|
@ -100,65 +102,71 @@ export class ClaudeAgentSdkEngine implements AgentEnginePort {
|
|||
try {
|
||||
const { query } = await dynamicImport('@anthropic-ai/claude-agent-sdk');
|
||||
|
||||
// Build prompt with conversation history prefix if available
|
||||
const promptWithContext = this.buildPromptWithHistory(
|
||||
params.prompt,
|
||||
params.conversationHistory,
|
||||
);
|
||||
// Set tenant-isolated HOME directory so SDK session files are separated per tenant
|
||||
this.ensureTenantHome(tenantId, env);
|
||||
|
||||
// Build SDK query options
|
||||
const sdkOptions: Record<string, any> = {
|
||||
systemPrompt: params.systemPrompt || undefined,
|
||||
allowedTools: params.allowedTools?.length ? params.allowedTools : undefined,
|
||||
maxTurns: params.maxTurns,
|
||||
maxBudgetUsd: params.maxBudgetUsd,
|
||||
env,
|
||||
abortController,
|
||||
includePartialMessages: true,
|
||||
allowDangerouslySkipPermissions: true,
|
||||
permissionMode: 'bypassPermissions',
|
||||
stderr: (data: string) => {
|
||||
this.logger.debug(`SDK stderr (${params.sessionId}): ${data.trim()}`);
|
||||
},
|
||||
canUseTool: async (toolName: string, toolInput: any, { signal }: { signal: AbortSignal }) => {
|
||||
const riskLevel = this.classifyToolRisk(toolName, toolInput);
|
||||
|
||||
// L0-L1: auto-approve
|
||||
if (riskLevel <= CommandRiskLevel.LOW_RISK_WRITE) {
|
||||
return { behavior: 'allow' as const, updatedInput: toolInput };
|
||||
}
|
||||
|
||||
// L3: always block
|
||||
if (riskLevel >= CommandRiskLevel.FORBIDDEN) {
|
||||
return {
|
||||
behavior: 'deny' as const,
|
||||
message: `Command blocked: risk level FORBIDDEN`,
|
||||
interrupt: false,
|
||||
};
|
||||
}
|
||||
|
||||
// L2: emit approval_required and wait for gate
|
||||
pushEvent({
|
||||
type: 'approval_required',
|
||||
command: `${toolName}: ${JSON.stringify(toolInput).slice(0, 200)}`,
|
||||
riskLevel: riskLevel,
|
||||
taskId: params.sessionId,
|
||||
});
|
||||
|
||||
const approved = await gate.waitForApproval(toolName, toolInput as Record<string, unknown>);
|
||||
|
||||
if (approved) {
|
||||
return { behavior: 'allow' as const, updatedInput: toolInput };
|
||||
} else {
|
||||
return {
|
||||
behavior: 'deny' as const,
|
||||
message: 'User rejected the command',
|
||||
interrupt: false,
|
||||
};
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
// Native SDK resume: if we have a previous SDK session ID, resume it
|
||||
if (params.resumeSessionId) {
|
||||
sdkOptions.resume = params.resumeSessionId;
|
||||
this.logger.log(`Resuming SDK session: ${params.resumeSessionId} for session ${params.sessionId}`);
|
||||
}
|
||||
|
||||
const sdkQuery = query({
|
||||
prompt: promptWithContext,
|
||||
options: {
|
||||
systemPrompt: params.systemPrompt || undefined,
|
||||
allowedTools: params.allowedTools?.length ? params.allowedTools : undefined,
|
||||
maxTurns: params.maxTurns,
|
||||
maxBudgetUsd: params.maxBudgetUsd,
|
||||
env,
|
||||
abortController,
|
||||
includePartialMessages: true,
|
||||
allowDangerouslySkipPermissions: true,
|
||||
permissionMode: 'bypassPermissions',
|
||||
stderr: (data: string) => {
|
||||
this.logger.debug(`SDK stderr (${params.sessionId}): ${data.trim()}`);
|
||||
},
|
||||
canUseTool: async (toolName, toolInput, { signal }) => {
|
||||
const riskLevel = this.classifyToolRisk(toolName, toolInput);
|
||||
|
||||
// L0-L1: auto-approve
|
||||
if (riskLevel <= CommandRiskLevel.LOW_RISK_WRITE) {
|
||||
return { behavior: 'allow' as const, updatedInput: toolInput };
|
||||
}
|
||||
|
||||
// L3: always block
|
||||
if (riskLevel >= CommandRiskLevel.FORBIDDEN) {
|
||||
return {
|
||||
behavior: 'deny' as const,
|
||||
message: `Command blocked: risk level FORBIDDEN`,
|
||||
interrupt: false,
|
||||
};
|
||||
}
|
||||
|
||||
// L2: emit approval_required and wait for gate
|
||||
pushEvent({
|
||||
type: 'approval_required',
|
||||
command: `${toolName}: ${JSON.stringify(toolInput).slice(0, 200)}`,
|
||||
riskLevel: riskLevel,
|
||||
taskId: params.sessionId,
|
||||
});
|
||||
|
||||
const approved = await gate.waitForApproval(toolName, toolInput as Record<string, unknown>);
|
||||
|
||||
if (approved) {
|
||||
return { behavior: 'allow' as const, updatedInput: toolInput };
|
||||
} else {
|
||||
return {
|
||||
behavior: 'deny' as const,
|
||||
message: 'User rejected the command',
|
||||
interrupt: false,
|
||||
};
|
||||
}
|
||||
},
|
||||
},
|
||||
prompt: params.prompt,
|
||||
options: sdkOptions,
|
||||
});
|
||||
|
||||
// Consume SDK messages in the background and push to event queue
|
||||
|
|
@ -273,6 +281,9 @@ export class ClaudeAgentSdkEngine implements AgentEnginePort {
|
|||
delete env.ANTHROPIC_API_KEY;
|
||||
}
|
||||
|
||||
// Set tenant-isolated HOME directory
|
||||
this.ensureTenantHome(tenantId, env);
|
||||
|
||||
const timeoutSec = tenantConfig?.approvalTimeoutSeconds ?? 120;
|
||||
const gate = new ApprovalGate(timeoutSec);
|
||||
const abortController = new AbortController();
|
||||
|
|
@ -291,7 +302,7 @@ export class ClaudeAgentSdkEngine implements AgentEnginePort {
|
|||
abortController,
|
||||
allowDangerouslySkipPermissions: true,
|
||||
permissionMode: 'bypassPermissions',
|
||||
canUseTool: async (toolName, toolInput) => {
|
||||
canUseTool: async (toolName: string, toolInput: any) => {
|
||||
const riskLevel = this.classifyToolRisk(toolName, toolInput);
|
||||
if (riskLevel <= CommandRiskLevel.LOW_RISK_WRITE) {
|
||||
return { behavior: 'allow' as const, updatedInput: toolInput };
|
||||
|
|
@ -351,30 +362,54 @@ export class ClaudeAgentSdkEngine implements AgentEnginePort {
|
|||
}
|
||||
|
||||
/**
|
||||
* Build a prompt that includes conversation history as a prefix.
|
||||
* Used when SDK session resume is not available (e.g., after process restart).
|
||||
* Set up a tenant-isolated HOME directory so SDK session files
|
||||
* are stored separately per tenant. Creates the directory structure
|
||||
* and symlinks shared credentials if needed.
|
||||
*/
|
||||
private buildPromptWithHistory(
|
||||
prompt: string,
|
||||
history?: Array<{ role: 'user' | 'assistant'; content: string | any[] }>,
|
||||
): string {
|
||||
if (!history || history.length === 0) return prompt;
|
||||
private ensureTenantHome(tenantId: string, env: Record<string, string>): void {
|
||||
const basePath = '/data/claude-tenants';
|
||||
const tenantHome = path.join(basePath, tenantId);
|
||||
const tenantClaudeDir = path.join(tenantHome, '.claude');
|
||||
|
||||
const lines: string[] = ['[Previous conversation]'];
|
||||
for (const msg of history) {
|
||||
const role = msg.role === 'user' ? 'User' : 'Assistant';
|
||||
const content = typeof msg.content === 'string'
|
||||
? msg.content
|
||||
: JSON.stringify(msg.content);
|
||||
// Truncate very long messages in the prefix
|
||||
const truncated = content.length > 500
|
||||
? content.slice(0, 500) + '...'
|
||||
: content;
|
||||
lines.push(`${role}: ${truncated}`);
|
||||
// Create tenant directory structure if it doesn't exist
|
||||
if (!fs.existsSync(tenantClaudeDir)) {
|
||||
fs.mkdirSync(tenantClaudeDir, { recursive: true });
|
||||
this.logger.log(`Created tenant HOME directory: ${tenantHome}`);
|
||||
}
|
||||
lines.push('', '[Current request]', prompt);
|
||||
|
||||
return lines.join('\n');
|
||||
// Symlink shared credentials for subscription mode
|
||||
const credentialsTarget = path.join(tenantClaudeDir, '.credentials.json');
|
||||
const sharedCredentials = '/home/appuser/.claude/.credentials.json';
|
||||
if (!fs.existsSync(credentialsTarget) && fs.existsSync(sharedCredentials)) {
|
||||
try {
|
||||
fs.symlinkSync(sharedCredentials, credentialsTarget);
|
||||
this.logger.log(`Symlinked credentials for tenant ${tenantId}`);
|
||||
} catch (err: any) {
|
||||
this.logger.warn(`Failed to symlink credentials for tenant ${tenantId}: ${err.message}`);
|
||||
// Fallback: copy the file instead
|
||||
try {
|
||||
fs.copyFileSync(sharedCredentials, credentialsTarget);
|
||||
this.logger.log(`Copied credentials for tenant ${tenantId} (symlink failed)`);
|
||||
} catch (copyErr: any) {
|
||||
this.logger.error(`Failed to copy credentials for tenant ${tenantId}: ${copyErr.message}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Also symlink .claude.json (settings file) if it exists
|
||||
const settingsTarget = path.join(tenantHome, '.claude.json');
|
||||
const sharedSettings = '/home/appuser/.claude.json';
|
||||
if (!fs.existsSync(settingsTarget) && fs.existsSync(sharedSettings)) {
|
||||
try {
|
||||
fs.symlinkSync(sharedSettings, settingsTarget);
|
||||
} catch {
|
||||
try { fs.copyFileSync(sharedSettings, settingsTarget); } catch { /* ignore */ }
|
||||
}
|
||||
}
|
||||
|
||||
// Set HOME to tenant-specific directory
|
||||
env.HOME = tenantHome;
|
||||
this.logger.debug(`Set HOME=${tenantHome} for tenant ${tenantId}`);
|
||||
}
|
||||
|
||||
private classifyToolRisk(toolName: string, toolInput: any): CommandRiskLevel {
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import { AgentSession } from '../../../domain/entities/agent-session.entity';
|
|||
import { AgentTask } from '../../../domain/entities/agent-task.entity';
|
||||
import { TaskStatus } from '../../../domain/value-objects/task-status.vo';
|
||||
import { AgentEngineType } from '../../../domain/value-objects/agent-engine-type.vo';
|
||||
import { ClaudeAgentSdkEngine } from '../../../infrastructure/engines/claude-agent-sdk/claude-agent-sdk-engine';
|
||||
import * as crypto from 'crypto';
|
||||
|
||||
@Controller('api/v1/agent')
|
||||
|
|
@ -86,6 +87,16 @@ export class AgentController {
|
|||
// so we pass the history minus the last user message (it will be added by the engine as params.prompt)
|
||||
const historyForEngine = conversationHistory.slice(0, -1);
|
||||
|
||||
// For SDK engine: load previous SDK session ID for native resume
|
||||
const isSdkEngine = engine.engineType === AgentEngineType.CLAUDE_AGENT_SDK;
|
||||
const resumeSessionId = isSdkEngine
|
||||
? (session.metadata as any)?.sdkSessionId as string | undefined
|
||||
: undefined;
|
||||
|
||||
if (resumeSessionId) {
|
||||
this.logger.log(`[Task ${task.id}] Resuming SDK session: ${resumeSessionId}`);
|
||||
}
|
||||
|
||||
const stream = engine.executeTask({
|
||||
sessionId: session.id,
|
||||
prompt: body.prompt,
|
||||
|
|
@ -93,6 +104,7 @@ export class AgentController {
|
|||
allowedTools: body.allowedTools || [],
|
||||
maxTurns: body.maxTurns || 10,
|
||||
conversationHistory: historyForEngine.length > 0 ? historyForEngine : undefined,
|
||||
resumeSessionId,
|
||||
});
|
||||
|
||||
let eventCount = 0;
|
||||
|
|
@ -123,6 +135,15 @@ export class AgentController {
|
|||
await this.contextService.saveAssistantMessage(session.id, assistantText);
|
||||
}
|
||||
|
||||
// For SDK engine: persist the SDK session ID for future resume
|
||||
if (isSdkEngine && engine instanceof ClaudeAgentSdkEngine) {
|
||||
const newSdkSessionId = engine.getSdkSessionId(session.id);
|
||||
if (newSdkSessionId) {
|
||||
session.metadata = { ...session.metadata, sdkSessionId: newSdkSessionId };
|
||||
this.logger.log(`[Task ${task.id}] Persisted SDK session ID: ${newSdkSessionId}`);
|
||||
}
|
||||
}
|
||||
|
||||
// Keep session active (don't mark completed) so it can be reused
|
||||
session.updatedAt = new Date();
|
||||
await this.sessionRepository.save(session);
|
||||
|
|
|
|||
Loading…
Reference in New Issue