fix: comprehensive hardening of agent task cancel/inject/approve flows
6 rounds of systematic audit identified and fixed 14 bugs across
backend controller and Flutter client:
## Backend (agent.controller.ts)
Security & Tenant Isolation:
- Add @TenantId + ForbiddenException check to cancelTask, injectMessage,
approveCommand — all 4 write endpoints now enforce tenant isolation
- Add tenantId check on session reuse in executeTask to prevent
cross-tenant session hijacking
Architecture & Correctness:
- Extract shared runTaskStream() from inline fire-and-forget block,
used by both executeTask and injectMessage to reduce duplication
- Use session.engineType (not getActiveEngine()) in cancelTask,
injectMessage, approveCommand — fixes wrong-engine-cancel when
global engine config is switched after task creation
- Add concurrent task prevention: executeTask checks for existing
RUNNING task on same session and cancels it before starting new one
- Add runningTasks Map to track task promises, awaitTaskCleanup()
helper with 3s timeout for inject to wait for partial text save
- captureSdkSessionId() captures SDK session ID into metadata
without DB save (callers persist), preventing fire-and-forget race
Cancel/Reject Improvements:
- cancelTask: idempotent (returns early if already CANCELLED/COMPLETED),
session stays 'active' (was 'cancelled'), emits cancelled WS event
- approveCommand reject: session stays 'active' (was 'cancelled'),
now emits cancelled WS event so Flutter stream listeners clean up
- approveCommand approved: collect text events and save assistant
response to conversation history on completion (was missing)
Minor:
- task.result! non-null assertion → task.result ?? 'Unknown error'
- Add findRunningBySessionId() to TaskRepository
## Flutter
API Contract Fix:
- approveCommand: route changed from /api/v1/ops/approvals/:id/approve
to /api/v1/agent/tasks/:id/approve with {approved: true} body
- rejectCommand: route changed from /api/v1/ops/approvals/:id/reject
to /api/v1/agent/tasks/:id/approve with {approved: false} body
Resource Management:
- ChatNotifier.dispose() now disconnects WebSocket to prevent
connection leak when navigating away from chat
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
d5f663f7af
commit
50dbb641a3
|
|
@ -105,16 +105,17 @@ class ChatRemoteDatasource {
|
||||||
|
|
||||||
/// Approves a pending command for a given task.
|
/// Approves a pending command for a given task.
|
||||||
Future<void> approveCommand(String taskId) async {
|
Future<void> approveCommand(String taskId) async {
|
||||||
await _dio.post('${ApiEndpoints.approvals}/$taskId/approve');
|
await _dio.post(
|
||||||
|
'${ApiEndpoints.tasks}/$taskId/approve',
|
||||||
|
data: {'approved': true},
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Rejects a pending command for a given task with an optional reason.
|
/// Rejects a pending command for a given task with an optional reason.
|
||||||
Future<void> rejectCommand(String taskId, {String? reason}) async {
|
Future<void> rejectCommand(String taskId, {String? reason}) async {
|
||||||
await _dio.post(
|
await _dio.post(
|
||||||
'${ApiEndpoints.approvals}/$taskId/reject',
|
'${ApiEndpoints.tasks}/$taskId/approve',
|
||||||
data: {
|
data: {'approved': false},
|
||||||
if (reason != null) 'reason': reason,
|
|
||||||
},
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -569,6 +569,7 @@ class ChatNotifier extends StateNotifier<ChatState> {
|
||||||
@override
|
@override
|
||||||
void dispose() {
|
void dispose() {
|
||||||
_eventSubscription?.cancel();
|
_eventSubscription?.cancel();
|
||||||
|
_ref.read(webSocketClientProvider).disconnect();
|
||||||
super.dispose();
|
super.dispose();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@ import { Injectable } from '@nestjs/common';
|
||||||
import { DataSource } from 'typeorm';
|
import { DataSource } from 'typeorm';
|
||||||
import { TenantAwareRepository } from '@it0/database';
|
import { TenantAwareRepository } from '@it0/database';
|
||||||
import { AgentTask } from '../../domain/entities/agent-task.entity';
|
import { AgentTask } from '../../domain/entities/agent-task.entity';
|
||||||
|
import { TaskStatus } from '../../domain/value-objects/task-status.vo';
|
||||||
|
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class TaskRepository extends TenantAwareRepository<AgentTask> {
|
export class TaskRepository extends TenantAwareRepository<AgentTask> {
|
||||||
|
|
@ -14,4 +15,13 @@ export class TaskRepository extends TenantAwareRepository<AgentTask> {
|
||||||
repo.find({ where: { sessionId } as any }),
|
repo.find({ where: { sessionId } as any }),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async findRunningBySessionId(sessionId: string): Promise<AgentTask | null> {
|
||||||
|
return this.withRepository((repo) =>
|
||||||
|
repo.findOne({
|
||||||
|
where: { sessionId, status: TaskStatus.RUNNING } as any,
|
||||||
|
order: { createdAt: 'DESC' } as any,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
import { Controller, Post, Body, Param, Delete, Get, NotFoundException, BadRequestException, Logger } from '@nestjs/common';
|
import { Controller, Post, Body, Param, Delete, Get, NotFoundException, BadRequestException, ForbiddenException, Logger } from '@nestjs/common';
|
||||||
import { TenantId } from '@it0/common';
|
import { TenantId } from '@it0/common';
|
||||||
import { EngineRegistry } from '../../../infrastructure/engines/engine-registry';
|
import { EngineRegistry } from '../../../infrastructure/engines/engine-registry';
|
||||||
import { AgentStreamGateway } from '../../ws/agent-stream.gateway';
|
import { AgentStreamGateway } from '../../ws/agent-stream.gateway';
|
||||||
|
|
@ -9,12 +9,15 @@ import { AgentSession } from '../../../domain/entities/agent-session.entity';
|
||||||
import { AgentTask } from '../../../domain/entities/agent-task.entity';
|
import { AgentTask } from '../../../domain/entities/agent-task.entity';
|
||||||
import { TaskStatus } from '../../../domain/value-objects/task-status.vo';
|
import { TaskStatus } from '../../../domain/value-objects/task-status.vo';
|
||||||
import { AgentEngineType } from '../../../domain/value-objects/agent-engine-type.vo';
|
import { AgentEngineType } from '../../../domain/value-objects/agent-engine-type.vo';
|
||||||
|
import { AgentEnginePort } from '../../../domain/ports/outbound/agent-engine.port';
|
||||||
import { ClaudeAgentSdkEngine } from '../../../infrastructure/engines/claude-agent-sdk/claude-agent-sdk-engine';
|
import { ClaudeAgentSdkEngine } from '../../../infrastructure/engines/claude-agent-sdk/claude-agent-sdk-engine';
|
||||||
import * as crypto from 'crypto';
|
import * as crypto from 'crypto';
|
||||||
|
|
||||||
@Controller('api/v1/agent')
|
@Controller('api/v1/agent')
|
||||||
export class AgentController {
|
export class AgentController {
|
||||||
private readonly logger = new Logger(AgentController.name);
|
private readonly logger = new Logger(AgentController.name);
|
||||||
|
/** Tracks running task promises so cancel/inject can await cleanup. */
|
||||||
|
private readonly runningTasks = new Map<string, Promise<void>>();
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private readonly engineRegistry: EngineRegistry,
|
private readonly engineRegistry: EngineRegistry,
|
||||||
|
|
@ -46,7 +49,7 @@ export class AgentController {
|
||||||
let session: AgentSession;
|
let session: AgentSession;
|
||||||
if (body.sessionId) {
|
if (body.sessionId) {
|
||||||
const existing = await this.sessionRepository.findById(body.sessionId);
|
const existing = await this.sessionRepository.findById(body.sessionId);
|
||||||
if (existing && existing.status === 'active') {
|
if (existing && existing.status === 'active' && existing.tenantId === tenantId) {
|
||||||
session = existing;
|
session = existing;
|
||||||
} else {
|
} else {
|
||||||
session = this.createNewSession(tenantId, engine.engineType, body.systemPrompt);
|
session = this.createNewSession(tenantId, engine.engineType, body.systemPrompt);
|
||||||
|
|
@ -59,6 +62,18 @@ export class AgentController {
|
||||||
session.updatedAt = new Date();
|
session.updatedAt = new Date();
|
||||||
await this.sessionRepository.save(session);
|
await this.sessionRepository.save(session);
|
||||||
|
|
||||||
|
// Prevent concurrent tasks on the same session: cancel any still-running task
|
||||||
|
const existingRunning = await this.taskRepository.findRunningBySessionId(session.id);
|
||||||
|
if (existingRunning) {
|
||||||
|
this.logger.warn(`[Session ${session.id}] Cancelling stale running task ${existingRunning.id} before starting new one`);
|
||||||
|
this.captureSdkSessionId(engine, session, existingRunning.id);
|
||||||
|
await engine.cancelTask(session.id).catch(() => {});
|
||||||
|
existingRunning.status = TaskStatus.CANCELLED;
|
||||||
|
existingRunning.completedAt = new Date();
|
||||||
|
await this.taskRepository.save(existingRunning);
|
||||||
|
await this.awaitTaskCleanup(existingRunning.id);
|
||||||
|
}
|
||||||
|
|
||||||
const task = new AgentTask();
|
const task = new AgentTask();
|
||||||
task.id = crypto.randomUUID();
|
task.id = crypto.randomUUID();
|
||||||
task.tenantId = tenantId;
|
task.tenantId = tenantId;
|
||||||
|
|
@ -77,14 +92,7 @@ export class AgentController {
|
||||||
const conversationHistory = await this.contextService.loadContext(session.id, maxCtx);
|
const conversationHistory = await this.contextService.loadContext(session.id, maxCtx);
|
||||||
this.logger.log(`[Task ${task.id}] Loaded ${conversationHistory.length} history messages for session=${session.id}`);
|
this.logger.log(`[Task ${task.id}] Loaded ${conversationHistory.length} history messages for session=${session.id}`);
|
||||||
|
|
||||||
// Fire-and-forget: iterate engine stream and emit events via gateway
|
|
||||||
(async () => {
|
|
||||||
try {
|
|
||||||
this.logger.log(`[Task ${task.id}] Starting engine stream for session=${session.id}, prompt="${body.prompt.slice(0, 80)}"`);
|
|
||||||
|
|
||||||
// Pass conversation history (excluding the current user message, which is the last one)
|
// Pass conversation history (excluding the current user message, which is the last one)
|
||||||
// loadContext returns all messages including the one we just saved,
|
|
||||||
// so we pass the history minus the last user message (it will be added by the engine as params.prompt)
|
|
||||||
const historyForEngine = conversationHistory.slice(0, -1);
|
const historyForEngine = conversationHistory.slice(0, -1);
|
||||||
|
|
||||||
// For SDK engine: load previous SDK session ID for native resume
|
// For SDK engine: load previous SDK session ID for native resume
|
||||||
|
|
@ -97,8 +105,8 @@ export class AgentController {
|
||||||
this.logger.log(`[Task ${task.id}] Resuming SDK session: ${resumeSessionId}`);
|
this.logger.log(`[Task ${task.id}] Resuming SDK session: ${resumeSessionId}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
const stream = engine.executeTask({
|
// Fire-and-forget: run the task stream
|
||||||
sessionId: session.id,
|
this.runTaskStream(engine, session, task, {
|
||||||
prompt: body.prompt,
|
prompt: body.prompt,
|
||||||
systemPrompt: body.systemPrompt || '',
|
systemPrompt: body.systemPrompt || '',
|
||||||
allowedTools: body.allowedTools || [],
|
allowedTools: body.allowedTools || [],
|
||||||
|
|
@ -107,22 +115,193 @@ export class AgentController {
|
||||||
resumeSessionId,
|
resumeSessionId,
|
||||||
});
|
});
|
||||||
|
|
||||||
let eventCount = 0;
|
return { sessionId: session.id, taskId: task.id };
|
||||||
let finished = false;
|
}
|
||||||
const textParts: string[] = [];
|
|
||||||
|
|
||||||
|
@Delete('tasks/:taskId')
|
||||||
|
async cancelTask(@TenantId() tenantId: string, @Param('taskId') taskId: string) {
|
||||||
|
const task = await this.taskRepository.findById(taskId);
|
||||||
|
if (!task) {
|
||||||
|
throw new NotFoundException(`Task ${taskId} not found`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tenant isolation
|
||||||
|
if (task.tenantId !== tenantId) {
|
||||||
|
throw new ForbiddenException('Task does not belong to this tenant');
|
||||||
|
}
|
||||||
|
|
||||||
|
// Idempotent: already finished → return immediately
|
||||||
|
if (task.status === TaskStatus.CANCELLED || task.status === TaskStatus.COMPLETED) {
|
||||||
|
return { message: 'Task already finished', taskId };
|
||||||
|
}
|
||||||
|
|
||||||
|
const session = await this.sessionRepository.findById(task.sessionId);
|
||||||
|
if (!session) {
|
||||||
|
throw new NotFoundException(`Session ${task.sessionId} not found`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use the engine that created this session, not the globally active one
|
||||||
|
const engine = this.engineRegistry.switchEngine(session.engineType as AgentEngineType);
|
||||||
|
|
||||||
|
// Capture SDK session ID BEFORE cancelling (cancelTask deletes from activeSessions)
|
||||||
|
this.captureSdkSessionId(engine, session, task.id);
|
||||||
|
|
||||||
|
await engine.cancelTask(session.id);
|
||||||
|
|
||||||
|
task.status = TaskStatus.CANCELLED;
|
||||||
|
task.completedAt = new Date();
|
||||||
|
await this.taskRepository.save(task);
|
||||||
|
|
||||||
|
// Keep session ACTIVE for reuse (not 'cancelled')
|
||||||
|
session.status = 'active';
|
||||||
|
session.updatedAt = new Date();
|
||||||
|
await this.sessionRepository.save(session);
|
||||||
|
|
||||||
|
// Emit cancelled event via WebSocket so client knows stream is done
|
||||||
|
this.gateway.emitStreamEvent(session.id, {
|
||||||
|
type: 'cancelled',
|
||||||
|
message: 'Task cancelled by user',
|
||||||
|
code: 'USER_CANCEL',
|
||||||
|
});
|
||||||
|
|
||||||
|
return { message: 'Task cancelled', taskId };
|
||||||
|
}
|
||||||
|
|
||||||
|
@Post('tasks/:taskId/inject')
|
||||||
|
async injectMessage(
|
||||||
|
@TenantId() tenantId: string,
|
||||||
|
@Param('taskId') taskId: string,
|
||||||
|
@Body() body: { message: string },
|
||||||
|
) {
|
||||||
|
if (!body.message?.trim()) {
|
||||||
|
throw new BadRequestException('Message is required');
|
||||||
|
}
|
||||||
|
|
||||||
|
const currentTask = await this.taskRepository.findById(taskId);
|
||||||
|
if (!currentTask) {
|
||||||
|
throw new NotFoundException(`Task ${taskId} not found`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tenant isolation
|
||||||
|
if (currentTask.tenantId !== tenantId) {
|
||||||
|
throw new ForbiddenException('Task does not belong to this tenant');
|
||||||
|
}
|
||||||
|
|
||||||
|
const session = await this.sessionRepository.findById(currentTask.sessionId);
|
||||||
|
if (!session) {
|
||||||
|
throw new NotFoundException(`Session ${currentTask.sessionId} not found`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use the engine that created this session, not the globally active one
|
||||||
|
const engine = this.engineRegistry.switchEngine(session.engineType as AgentEngineType);
|
||||||
|
|
||||||
|
// 1. Capture SDK session ID before cancel
|
||||||
|
this.captureSdkSessionId(engine, session, taskId);
|
||||||
|
|
||||||
|
// 2. Cancel current task
|
||||||
|
await engine.cancelTask(session.id);
|
||||||
|
currentTask.status = TaskStatus.CANCELLED;
|
||||||
|
currentTask.completedAt = new Date();
|
||||||
|
await this.taskRepository.save(currentTask);
|
||||||
|
|
||||||
|
// Emit cancelled event so client knows old stream is done
|
||||||
|
this.gateway.emitStreamEvent(session.id, {
|
||||||
|
type: 'cancelled',
|
||||||
|
message: 'Interrupted by user injection',
|
||||||
|
code: 'USER_INJECT',
|
||||||
|
});
|
||||||
|
|
||||||
|
// 3. Keep session active
|
||||||
|
session.status = 'active';
|
||||||
|
session.updatedAt = new Date();
|
||||||
|
await this.sessionRepository.save(session);
|
||||||
|
|
||||||
|
// Wait for old task's cleanup (partial text save) before loading context
|
||||||
|
await this.awaitTaskCleanup(taskId);
|
||||||
|
|
||||||
|
// 4. Save injected user message to conversation history
|
||||||
|
await this.contextService.saveUserMessage(session.id, body.message);
|
||||||
|
|
||||||
|
// 5. Create new task
|
||||||
|
const newTask = new AgentTask();
|
||||||
|
newTask.id = crypto.randomUUID();
|
||||||
|
newTask.tenantId = tenantId;
|
||||||
|
newTask.sessionId = session.id;
|
||||||
|
newTask.prompt = body.message;
|
||||||
|
newTask.status = TaskStatus.RUNNING;
|
||||||
|
newTask.startedAt = new Date();
|
||||||
|
newTask.createdAt = new Date();
|
||||||
|
await this.taskRepository.save(newTask);
|
||||||
|
|
||||||
|
// 6. Load conversation history (includes partial text from cancelled task + new user message)
|
||||||
|
const conversationHistory = await this.contextService.loadContext(session.id, 20);
|
||||||
|
const historyForEngine = conversationHistory.slice(0, -1);
|
||||||
|
|
||||||
|
const isSdkEngine = engine.engineType === AgentEngineType.CLAUDE_AGENT_SDK;
|
||||||
|
const resumeSessionId = isSdkEngine
|
||||||
|
? (session.metadata as any)?.sdkSessionId as string | undefined
|
||||||
|
: undefined;
|
||||||
|
|
||||||
|
// 7. Emit task_info so client picks up the new taskId
|
||||||
|
this.gateway.emitStreamEvent(session.id, {
|
||||||
|
type: 'task_info',
|
||||||
|
taskId: newTask.id,
|
||||||
|
});
|
||||||
|
|
||||||
|
// 8. Fire-and-forget: run the new task stream
|
||||||
|
this.runTaskStream(engine, session, newTask, {
|
||||||
|
prompt: body.message,
|
||||||
|
systemPrompt: session.systemPrompt || '',
|
||||||
|
allowedTools: [],
|
||||||
|
maxTurns: 10,
|
||||||
|
conversationHistory: historyForEngine.length > 0 ? historyForEngine : undefined,
|
||||||
|
resumeSessionId,
|
||||||
|
});
|
||||||
|
|
||||||
|
return { sessionId: session.id, taskId: newTask.id, injected: true };
|
||||||
|
}
|
||||||
|
|
||||||
|
@Post('tasks/:taskId/approve')
|
||||||
|
async approveCommand(@TenantId() tenantId: string, @Param('taskId') taskId: string, @Body() body: { approved: boolean }) {
|
||||||
|
const task = await this.taskRepository.findById(taskId);
|
||||||
|
if (!task) {
|
||||||
|
throw new NotFoundException(`Task ${taskId} not found`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tenant isolation
|
||||||
|
if (task.tenantId !== tenantId) {
|
||||||
|
throw new ForbiddenException('Task does not belong to this tenant');
|
||||||
|
}
|
||||||
|
|
||||||
|
if (task.status !== TaskStatus.AWAITING_APPROVAL) {
|
||||||
|
throw new BadRequestException(`Task ${taskId} is not awaiting approval (current status: ${task.status})`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const session = await this.sessionRepository.findById(task.sessionId);
|
||||||
|
if (!session) {
|
||||||
|
throw new NotFoundException(`Session ${task.sessionId} not found`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use the engine that created this session, not the globally active one
|
||||||
|
const engine = this.engineRegistry.switchEngine(session.engineType as AgentEngineType);
|
||||||
|
|
||||||
|
if (body.approved) {
|
||||||
|
task.status = TaskStatus.RUNNING;
|
||||||
|
await this.taskRepository.save(task);
|
||||||
|
|
||||||
|
// Fire-and-forget: continue the session and stream events
|
||||||
|
(async () => {
|
||||||
|
const textParts: string[] = [];
|
||||||
|
try {
|
||||||
|
const stream = engine.continueSession(session.id, 'approved');
|
||||||
for await (const event of stream) {
|
for await (const event of stream) {
|
||||||
eventCount++;
|
|
||||||
this.logger.log(`[Task ${task.id}] Event #${eventCount}: type=${event.type}${event.type === 'text' ? ` len=${(event as any).content?.length}` : ''}${event.type === 'error' ? ` msg=${(event as any).message}` : ''}`);
|
|
||||||
this.gateway.emitStreamEvent(session.id, event);
|
this.gateway.emitStreamEvent(session.id, event);
|
||||||
|
|
||||||
// Collect text for assistant message
|
|
||||||
if (event.type === 'text') {
|
if (event.type === 'text') {
|
||||||
textParts.push(event.content);
|
textParts.push(event.content);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (event.type === 'completed' && !finished) {
|
if (event.type === 'completed') {
|
||||||
finished = true;
|
|
||||||
task.status = TaskStatus.COMPLETED;
|
task.status = TaskStatus.COMPLETED;
|
||||||
task.result = event.summary;
|
task.result = event.summary;
|
||||||
task.tokensUsed = event.tokensUsed;
|
task.tokensUsed = event.tokensUsed;
|
||||||
|
|
@ -135,122 +314,7 @@ export class AgentController {
|
||||||
await this.contextService.saveAssistantMessage(session.id, assistantText);
|
await this.contextService.saveAssistantMessage(session.id, assistantText);
|
||||||
}
|
}
|
||||||
|
|
||||||
// For SDK engine: persist the SDK session ID for future resume
|
this.captureSdkSessionId(engine, session, task.id);
|
||||||
if (isSdkEngine && engine instanceof ClaudeAgentSdkEngine) {
|
|
||||||
const newSdkSessionId = engine.getSdkSessionId(session.id);
|
|
||||||
if (newSdkSessionId) {
|
|
||||||
session.metadata = { ...session.metadata, sdkSessionId: newSdkSessionId };
|
|
||||||
this.logger.log(`[Task ${task.id}] Persisted SDK session ID: ${newSdkSessionId}`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Keep session active (don't mark completed) so it can be reused
|
|
||||||
session.updatedAt = new Date();
|
|
||||||
await this.sessionRepository.save(session);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (event.type === 'error' && !finished) {
|
|
||||||
finished = true;
|
|
||||||
task.status = TaskStatus.FAILED;
|
|
||||||
task.result = event.message;
|
|
||||||
task.completedAt = new Date();
|
|
||||||
await this.taskRepository.save(task);
|
|
||||||
|
|
||||||
session.status = 'error';
|
|
||||||
session.updatedAt = new Date();
|
|
||||||
await this.sessionRepository.save(session);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (event.type === 'approval_required') {
|
|
||||||
task.status = TaskStatus.AWAITING_APPROVAL;
|
|
||||||
await this.taskRepository.save(task);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
this.logger.log(`[Task ${task.id}] Stream ended after ${eventCount} events`);
|
|
||||||
} catch (error) {
|
|
||||||
this.logger.error(`[Task ${task.id}] Stream error: ${error instanceof Error ? error.message : error}`);
|
|
||||||
task.status = TaskStatus.FAILED;
|
|
||||||
task.result = error instanceof Error ? error.message : 'Unknown error';
|
|
||||||
task.completedAt = new Date();
|
|
||||||
await this.taskRepository.save(task);
|
|
||||||
|
|
||||||
session.status = 'error';
|
|
||||||
session.updatedAt = new Date();
|
|
||||||
await this.sessionRepository.save(session);
|
|
||||||
|
|
||||||
this.gateway.emitStreamEvent(session.id, {
|
|
||||||
type: 'error',
|
|
||||||
message: task.result,
|
|
||||||
code: 'EXECUTION_ERROR',
|
|
||||||
});
|
|
||||||
}
|
|
||||||
})();
|
|
||||||
|
|
||||||
return { sessionId: session.id, taskId: task.id };
|
|
||||||
}
|
|
||||||
|
|
||||||
@Delete('tasks/:taskId')
|
|
||||||
async cancelTask(@Param('taskId') taskId: string) {
|
|
||||||
const task = await this.taskRepository.findById(taskId);
|
|
||||||
if (!task) {
|
|
||||||
throw new NotFoundException(`Task ${taskId} not found`);
|
|
||||||
}
|
|
||||||
|
|
||||||
const session = await this.sessionRepository.findById(task.sessionId);
|
|
||||||
if (!session) {
|
|
||||||
throw new NotFoundException(`Session ${task.sessionId} not found`);
|
|
||||||
}
|
|
||||||
|
|
||||||
const engine = this.engineRegistry.getActiveEngine();
|
|
||||||
await engine.cancelTask(session.id);
|
|
||||||
|
|
||||||
task.status = TaskStatus.CANCELLED;
|
|
||||||
task.completedAt = new Date();
|
|
||||||
await this.taskRepository.save(task);
|
|
||||||
|
|
||||||
session.status = 'cancelled';
|
|
||||||
session.updatedAt = new Date();
|
|
||||||
await this.sessionRepository.save(session);
|
|
||||||
|
|
||||||
return { message: 'Task cancelled', taskId };
|
|
||||||
}
|
|
||||||
|
|
||||||
@Post('tasks/:taskId/approve')
|
|
||||||
async approveCommand(@Param('taskId') taskId: string, @Body() body: { approved: boolean }) {
|
|
||||||
const task = await this.taskRepository.findById(taskId);
|
|
||||||
if (!task) {
|
|
||||||
throw new NotFoundException(`Task ${taskId} not found`);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (task.status !== TaskStatus.AWAITING_APPROVAL) {
|
|
||||||
throw new BadRequestException(`Task ${taskId} is not awaiting approval (current status: ${task.status})`);
|
|
||||||
}
|
|
||||||
|
|
||||||
const session = await this.sessionRepository.findById(task.sessionId);
|
|
||||||
if (!session) {
|
|
||||||
throw new NotFoundException(`Session ${task.sessionId} not found`);
|
|
||||||
}
|
|
||||||
|
|
||||||
const engine = this.engineRegistry.getActiveEngine();
|
|
||||||
|
|
||||||
if (body.approved) {
|
|
||||||
task.status = TaskStatus.RUNNING;
|
|
||||||
await this.taskRepository.save(task);
|
|
||||||
|
|
||||||
// Fire-and-forget: continue the session and stream events
|
|
||||||
(async () => {
|
|
||||||
try {
|
|
||||||
const stream = engine.continueSession(session.id, 'approved');
|
|
||||||
for await (const event of stream) {
|
|
||||||
this.gateway.emitStreamEvent(session.id, event);
|
|
||||||
|
|
||||||
if (event.type === 'completed') {
|
|
||||||
task.status = TaskStatus.COMPLETED;
|
|
||||||
task.result = event.summary;
|
|
||||||
task.tokensUsed = event.tokensUsed;
|
|
||||||
task.completedAt = new Date();
|
|
||||||
await this.taskRepository.save(task);
|
|
||||||
|
|
||||||
session.updatedAt = new Date();
|
session.updatedAt = new Date();
|
||||||
await this.sessionRepository.save(session);
|
await this.sessionRepository.save(session);
|
||||||
}
|
}
|
||||||
|
|
@ -291,10 +355,18 @@ export class AgentController {
|
||||||
|
|
||||||
await engine.cancelTask(session.id);
|
await engine.cancelTask(session.id);
|
||||||
|
|
||||||
session.status = 'cancelled';
|
// Keep session active for reuse (not 'cancelled')
|
||||||
|
session.status = 'active';
|
||||||
session.updatedAt = new Date();
|
session.updatedAt = new Date();
|
||||||
await this.sessionRepository.save(session);
|
await this.sessionRepository.save(session);
|
||||||
|
|
||||||
|
// Notify client via WebSocket so stream listeners can clean up
|
||||||
|
this.gateway.emitStreamEvent(session.id, {
|
||||||
|
type: 'cancelled',
|
||||||
|
message: 'Command rejected by user',
|
||||||
|
code: 'USER_REJECT',
|
||||||
|
});
|
||||||
|
|
||||||
return { message: 'Command rejected', taskId };
|
return { message: 'Command rejected', taskId };
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -311,6 +383,172 @@ export class AgentController {
|
||||||
return { message: 'Engine switched', engineType: engine.engineType };
|
return { message: 'Engine switched', engineType: engine.engineType };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// Private helpers
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Shared fire-and-forget stream processing for executeTask and injectMessage.
|
||||||
|
*/
|
||||||
|
private runTaskStream(
|
||||||
|
engine: AgentEnginePort,
|
||||||
|
session: AgentSession,
|
||||||
|
task: AgentTask,
|
||||||
|
params: {
|
||||||
|
prompt: string;
|
||||||
|
systemPrompt: string;
|
||||||
|
allowedTools: string[];
|
||||||
|
maxTurns: number;
|
||||||
|
conversationHistory?: Array<{ role: 'user' | 'assistant'; content: string | any[] }>;
|
||||||
|
resumeSessionId?: string;
|
||||||
|
},
|
||||||
|
) {
|
||||||
|
const isSdkEngine = engine.engineType === AgentEngineType.CLAUDE_AGENT_SDK;
|
||||||
|
|
||||||
|
const taskPromise = (async () => {
|
||||||
|
let finished = false;
|
||||||
|
const textParts: string[] = [];
|
||||||
|
|
||||||
|
try {
|
||||||
|
this.logger.log(`[Task ${task.id}] Starting engine stream for session=${session.id}, prompt="${params.prompt.slice(0, 80)}"`);
|
||||||
|
|
||||||
|
const stream = engine.executeTask({
|
||||||
|
sessionId: session.id,
|
||||||
|
prompt: params.prompt,
|
||||||
|
systemPrompt: params.systemPrompt,
|
||||||
|
allowedTools: params.allowedTools,
|
||||||
|
maxTurns: params.maxTurns,
|
||||||
|
conversationHistory: params.conversationHistory,
|
||||||
|
resumeSessionId: params.resumeSessionId,
|
||||||
|
});
|
||||||
|
|
||||||
|
let eventCount = 0;
|
||||||
|
|
||||||
|
for await (const event of stream) {
|
||||||
|
eventCount++;
|
||||||
|
this.logger.log(`[Task ${task.id}] Event #${eventCount}: type=${event.type}${event.type === 'text' ? ` len=${(event as any).content?.length}` : ''}${event.type === 'error' ? ` msg=${(event as any).message}` : ''}`);
|
||||||
|
this.gateway.emitStreamEvent(session.id, event);
|
||||||
|
|
||||||
|
// Collect text for assistant message
|
||||||
|
if (event.type === 'text') {
|
||||||
|
textParts.push(event.content);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (event.type === 'completed' && !finished) {
|
||||||
|
finished = true;
|
||||||
|
task.status = TaskStatus.COMPLETED;
|
||||||
|
task.result = event.summary;
|
||||||
|
task.tokensUsed = event.tokensUsed;
|
||||||
|
task.completedAt = new Date();
|
||||||
|
await this.taskRepository.save(task);
|
||||||
|
|
||||||
|
// Save assistant response to conversation history
|
||||||
|
const assistantText = textParts.join('') || event.summary;
|
||||||
|
if (assistantText) {
|
||||||
|
await this.contextService.saveAssistantMessage(session.id, assistantText);
|
||||||
|
}
|
||||||
|
|
||||||
|
// For SDK engine: capture the SDK session ID for future resume
|
||||||
|
this.captureSdkSessionId(engine, session, task.id);
|
||||||
|
|
||||||
|
// Keep session active so it can be reused (also persists captured SDK session ID)
|
||||||
|
session.updatedAt = new Date();
|
||||||
|
await this.sessionRepository.save(session);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (event.type === 'error' && !finished) {
|
||||||
|
finished = true;
|
||||||
|
task.status = TaskStatus.FAILED;
|
||||||
|
task.result = event.message;
|
||||||
|
task.completedAt = new Date();
|
||||||
|
await this.taskRepository.save(task);
|
||||||
|
|
||||||
|
session.status = 'error';
|
||||||
|
session.updatedAt = new Date();
|
||||||
|
await this.sessionRepository.save(session);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (event.type === 'approval_required') {
|
||||||
|
task.status = TaskStatus.AWAITING_APPROVAL;
|
||||||
|
await this.taskRepository.save(task);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.logger.log(`[Task ${task.id}] Stream ended after ${eventCount} events`);
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.error(`[Task ${task.id}] Stream error: ${error instanceof Error ? error.message : error}`);
|
||||||
|
if (!finished) {
|
||||||
|
task.status = TaskStatus.FAILED;
|
||||||
|
task.result = error instanceof Error ? error.message : 'Unknown error';
|
||||||
|
task.completedAt = new Date();
|
||||||
|
await this.taskRepository.save(task);
|
||||||
|
|
||||||
|
session.status = 'error';
|
||||||
|
session.updatedAt = new Date();
|
||||||
|
await this.sessionRepository.save(session);
|
||||||
|
|
||||||
|
this.gateway.emitStreamEvent(session.id, {
|
||||||
|
type: 'error',
|
||||||
|
message: task.result ?? 'Unknown error',
|
||||||
|
code: 'EXECUTION_ERROR',
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
// Save partial assistant text if stream was interrupted (cancelled/aborted)
|
||||||
|
if (!finished && textParts.length > 0) {
|
||||||
|
const partialText = textParts.join('');
|
||||||
|
if (partialText) {
|
||||||
|
try {
|
||||||
|
await this.contextService.saveAssistantMessage(
|
||||||
|
session.id,
|
||||||
|
partialText + '\n[中断]',
|
||||||
|
);
|
||||||
|
this.logger.log(`[Task ${task.id}] Saved partial assistant text (${partialText.length} chars)`);
|
||||||
|
} catch (saveErr) {
|
||||||
|
this.logger.error(`[Task ${task.id}] Failed to save partial text: ${saveErr}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Note: SDK session ID is already captured by cancelTask/injectMessage
|
||||||
|
// BEFORE engine.cancelTask() deletes it from activeSessions.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})();
|
||||||
|
|
||||||
|
// Track the promise so cancel/inject can await cleanup
|
||||||
|
this.runningTasks.set(task.id, taskPromise);
|
||||||
|
taskPromise.finally(() => this.runningTasks.delete(task.id));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Await the running task's cleanup (partial text save).
|
||||||
|
* Returns immediately if no task is running for this ID.
|
||||||
|
*/
|
||||||
|
private async awaitTaskCleanup(taskId: string, timeoutMs = 3000): Promise<void> {
|
||||||
|
const p = this.runningTasks.get(taskId);
|
||||||
|
if (!p) return;
|
||||||
|
try {
|
||||||
|
await Promise.race([
|
||||||
|
p,
|
||||||
|
new Promise<void>(resolve => setTimeout(resolve, timeoutMs)),
|
||||||
|
]);
|
||||||
|
} catch (_) {
|
||||||
|
// Swallow — errors are already handled inside runTaskStream
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Capture SDK session ID into session.metadata (does NOT save to DB).
|
||||||
|
* Callers are responsible for persisting the session afterwards.
|
||||||
|
*/
|
||||||
|
private captureSdkSessionId(engine: AgentEnginePort, session: AgentSession, taskId: string) {
|
||||||
|
if (engine.engineType === AgentEngineType.CLAUDE_AGENT_SDK && engine instanceof ClaudeAgentSdkEngine) {
|
||||||
|
const sdkSessionId = engine.getSdkSessionId(session.id);
|
||||||
|
if (sdkSessionId) {
|
||||||
|
session.metadata = { ...session.metadata, sdkSessionId };
|
||||||
|
this.logger.log(`[Task ${taskId}] Captured SDK session ID: ${sdkSessionId}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private createNewSession(tenantId: string, engineType: string, systemPrompt?: string): AgentSession {
|
private createNewSession(tenantId: string, engineType: string, systemPrompt?: string): AgentSession {
|
||||||
const session = new AgentSession();
|
const session = new AgentSession();
|
||||||
session.id = crypto.randomUUID();
|
session.id = crypto.randomUUID();
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue