From 50dbb641a3bb889691344d4ca9a755fac1c53eb1 Mon Sep 17 00:00:00 2001 From: hailin Date: Fri, 27 Feb 2026 22:20:46 -0800 Subject: [PATCH] fix: comprehensive hardening of agent task cancel/inject/approve flows MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 6 rounds of systematic audit identified and fixed 14 bugs across backend controller and Flutter client: ## Backend (agent.controller.ts) Security & Tenant Isolation: - Add @TenantId + ForbiddenException check to cancelTask, injectMessage, approveCommand — all 4 write endpoints now enforce tenant isolation - Add tenantId check on session reuse in executeTask to prevent cross-tenant session hijacking Architecture & Correctness: - Extract shared runTaskStream() from inline fire-and-forget block, used by both executeTask and injectMessage to reduce duplication - Use session.engineType (not getActiveEngine()) in cancelTask, injectMessage, approveCommand — fixes wrong-engine-cancel when global engine config is switched after task creation - Add concurrent task prevention: executeTask checks for existing RUNNING task on same session and cancels it before starting new one - Add runningTasks Map to track task promises, awaitTaskCleanup() helper with 3s timeout for inject to wait for partial text save - captureSdkSessionId() captures SDK session ID into metadata without DB save (callers persist), preventing fire-and-forget race Cancel/Reject Improvements: - cancelTask: idempotent (returns early if already CANCELLED/COMPLETED), session stays 'active' (was 'cancelled'), emits cancelled WS event - approveCommand reject: session stays 'active' (was 'cancelled'), now emits cancelled WS event so Flutter stream listeners clean up - approveCommand approved: collect text events and save assistant response to conversation history on completion (was missing) Minor: - task.result! non-null assertion → task.result ?? 'Unknown error' - Add findRunningBySessionId() to TaskRepository ## Flutter API Contract Fix: - approveCommand: route changed from /api/v1/ops/approvals/:id/approve to /api/v1/agent/tasks/:id/approve with {approved: true} body - rejectCommand: route changed from /api/v1/ops/approvals/:id/reject to /api/v1/agent/tasks/:id/approve with {approved: false} body Resource Management: - ChatNotifier.dispose() now disconnects WebSocket to prevent connection leak when navigating away from chat Co-Authored-By: Claude Opus 4.6 --- .../datasources/chat_remote_datasource.dart | 11 +- .../providers/chat_providers.dart | 1 + .../repositories/task.repository.ts | 10 + .../rest/controllers/agent.controller.ts | 464 +++++++++++++----- 4 files changed, 368 insertions(+), 118 deletions(-) diff --git a/it0_app/lib/features/chat/data/datasources/chat_remote_datasource.dart b/it0_app/lib/features/chat/data/datasources/chat_remote_datasource.dart index c879aed..6e81963 100644 --- a/it0_app/lib/features/chat/data/datasources/chat_remote_datasource.dart +++ b/it0_app/lib/features/chat/data/datasources/chat_remote_datasource.dart @@ -105,16 +105,17 @@ class ChatRemoteDatasource { /// Approves a pending command for a given task. Future approveCommand(String taskId) async { - await _dio.post('${ApiEndpoints.approvals}/$taskId/approve'); + await _dio.post( + '${ApiEndpoints.tasks}/$taskId/approve', + data: {'approved': true}, + ); } /// Rejects a pending command for a given task with an optional reason. Future rejectCommand(String taskId, {String? reason}) async { await _dio.post( - '${ApiEndpoints.approvals}/$taskId/reject', - data: { - if (reason != null) 'reason': reason, - }, + '${ApiEndpoints.tasks}/$taskId/approve', + data: {'approved': false}, ); } diff --git a/it0_app/lib/features/chat/presentation/providers/chat_providers.dart b/it0_app/lib/features/chat/presentation/providers/chat_providers.dart index de5d3d3..9b701d7 100644 --- a/it0_app/lib/features/chat/presentation/providers/chat_providers.dart +++ b/it0_app/lib/features/chat/presentation/providers/chat_providers.dart @@ -569,6 +569,7 @@ class ChatNotifier extends StateNotifier { @override void dispose() { _eventSubscription?.cancel(); + _ref.read(webSocketClientProvider).disconnect(); super.dispose(); } } diff --git a/packages/services/agent-service/src/infrastructure/repositories/task.repository.ts b/packages/services/agent-service/src/infrastructure/repositories/task.repository.ts index c1bd9f4..00bf10c 100644 --- a/packages/services/agent-service/src/infrastructure/repositories/task.repository.ts +++ b/packages/services/agent-service/src/infrastructure/repositories/task.repository.ts @@ -2,6 +2,7 @@ import { Injectable } from '@nestjs/common'; import { DataSource } from 'typeorm'; import { TenantAwareRepository } from '@it0/database'; import { AgentTask } from '../../domain/entities/agent-task.entity'; +import { TaskStatus } from '../../domain/value-objects/task-status.vo'; @Injectable() export class TaskRepository extends TenantAwareRepository { @@ -14,4 +15,13 @@ export class TaskRepository extends TenantAwareRepository { repo.find({ where: { sessionId } as any }), ); } + + async findRunningBySessionId(sessionId: string): Promise { + return this.withRepository((repo) => + repo.findOne({ + where: { sessionId, status: TaskStatus.RUNNING } as any, + order: { createdAt: 'DESC' } as any, + }), + ); + } } diff --git a/packages/services/agent-service/src/interfaces/rest/controllers/agent.controller.ts b/packages/services/agent-service/src/interfaces/rest/controllers/agent.controller.ts index 2765bd7..75e7ace 100644 --- a/packages/services/agent-service/src/interfaces/rest/controllers/agent.controller.ts +++ b/packages/services/agent-service/src/interfaces/rest/controllers/agent.controller.ts @@ -1,4 +1,4 @@ -import { Controller, Post, Body, Param, Delete, Get, NotFoundException, BadRequestException, Logger } from '@nestjs/common'; +import { Controller, Post, Body, Param, Delete, Get, NotFoundException, BadRequestException, ForbiddenException, Logger } from '@nestjs/common'; import { TenantId } from '@it0/common'; import { EngineRegistry } from '../../../infrastructure/engines/engine-registry'; import { AgentStreamGateway } from '../../ws/agent-stream.gateway'; @@ -9,12 +9,15 @@ import { AgentSession } from '../../../domain/entities/agent-session.entity'; import { AgentTask } from '../../../domain/entities/agent-task.entity'; import { TaskStatus } from '../../../domain/value-objects/task-status.vo'; import { AgentEngineType } from '../../../domain/value-objects/agent-engine-type.vo'; +import { AgentEnginePort } from '../../../domain/ports/outbound/agent-engine.port'; import { ClaudeAgentSdkEngine } from '../../../infrastructure/engines/claude-agent-sdk/claude-agent-sdk-engine'; import * as crypto from 'crypto'; @Controller('api/v1/agent') export class AgentController { private readonly logger = new Logger(AgentController.name); + /** Tracks running task promises so cancel/inject can await cleanup. */ + private readonly runningTasks = new Map>(); constructor( private readonly engineRegistry: EngineRegistry, @@ -46,7 +49,7 @@ export class AgentController { let session: AgentSession; if (body.sessionId) { const existing = await this.sessionRepository.findById(body.sessionId); - if (existing && existing.status === 'active') { + if (existing && existing.status === 'active' && existing.tenantId === tenantId) { session = existing; } else { session = this.createNewSession(tenantId, engine.engineType, body.systemPrompt); @@ -59,6 +62,18 @@ export class AgentController { session.updatedAt = new Date(); await this.sessionRepository.save(session); + // Prevent concurrent tasks on the same session: cancel any still-running task + const existingRunning = await this.taskRepository.findRunningBySessionId(session.id); + if (existingRunning) { + this.logger.warn(`[Session ${session.id}] Cancelling stale running task ${existingRunning.id} before starting new one`); + this.captureSdkSessionId(engine, session, existingRunning.id); + await engine.cancelTask(session.id).catch(() => {}); + existingRunning.status = TaskStatus.CANCELLED; + existingRunning.completedAt = new Date(); + await this.taskRepository.save(existingRunning); + await this.awaitTaskCleanup(existingRunning.id); + } + const task = new AgentTask(); task.id = crypto.randomUUID(); task.tenantId = tenantId; @@ -77,151 +92,187 @@ export class AgentController { const conversationHistory = await this.contextService.loadContext(session.id, maxCtx); this.logger.log(`[Task ${task.id}] Loaded ${conversationHistory.length} history messages for session=${session.id}`); - // Fire-and-forget: iterate engine stream and emit events via gateway - (async () => { - try { - this.logger.log(`[Task ${task.id}] Starting engine stream for session=${session.id}, prompt="${body.prompt.slice(0, 80)}"`); + // Pass conversation history (excluding the current user message, which is the last one) + const historyForEngine = conversationHistory.slice(0, -1); - // Pass conversation history (excluding the current user message, which is the last one) - // loadContext returns all messages including the one we just saved, - // so we pass the history minus the last user message (it will be added by the engine as params.prompt) - const historyForEngine = conversationHistory.slice(0, -1); + // For SDK engine: load previous SDK session ID for native resume + const isSdkEngine = engine.engineType === AgentEngineType.CLAUDE_AGENT_SDK; + const resumeSessionId = isSdkEngine + ? (session.metadata as any)?.sdkSessionId as string | undefined + : undefined; - // For SDK engine: load previous SDK session ID for native resume - const isSdkEngine = engine.engineType === AgentEngineType.CLAUDE_AGENT_SDK; - const resumeSessionId = isSdkEngine - ? (session.metadata as any)?.sdkSessionId as string | undefined - : undefined; + if (resumeSessionId) { + this.logger.log(`[Task ${task.id}] Resuming SDK session: ${resumeSessionId}`); + } - if (resumeSessionId) { - this.logger.log(`[Task ${task.id}] Resuming SDK session: ${resumeSessionId}`); - } - - const stream = engine.executeTask({ - sessionId: session.id, - prompt: body.prompt, - systemPrompt: body.systemPrompt || '', - allowedTools: body.allowedTools || [], - maxTurns: body.maxTurns || 10, - conversationHistory: historyForEngine.length > 0 ? historyForEngine : undefined, - resumeSessionId, - }); - - let eventCount = 0; - let finished = false; - const textParts: string[] = []; - - for await (const event of stream) { - eventCount++; - this.logger.log(`[Task ${task.id}] Event #${eventCount}: type=${event.type}${event.type === 'text' ? ` len=${(event as any).content?.length}` : ''}${event.type === 'error' ? ` msg=${(event as any).message}` : ''}`); - this.gateway.emitStreamEvent(session.id, event); - - // Collect text for assistant message - if (event.type === 'text') { - textParts.push(event.content); - } - - if (event.type === 'completed' && !finished) { - finished = true; - task.status = TaskStatus.COMPLETED; - task.result = event.summary; - task.tokensUsed = event.tokensUsed; - task.completedAt = new Date(); - await this.taskRepository.save(task); - - // Save assistant response to conversation history - const assistantText = textParts.join('') || event.summary; - if (assistantText) { - await this.contextService.saveAssistantMessage(session.id, assistantText); - } - - // For SDK engine: persist the SDK session ID for future resume - if (isSdkEngine && engine instanceof ClaudeAgentSdkEngine) { - const newSdkSessionId = engine.getSdkSessionId(session.id); - if (newSdkSessionId) { - session.metadata = { ...session.metadata, sdkSessionId: newSdkSessionId }; - this.logger.log(`[Task ${task.id}] Persisted SDK session ID: ${newSdkSessionId}`); - } - } - - // Keep session active (don't mark completed) so it can be reused - session.updatedAt = new Date(); - await this.sessionRepository.save(session); - } - - if (event.type === 'error' && !finished) { - finished = true; - task.status = TaskStatus.FAILED; - task.result = event.message; - task.completedAt = new Date(); - await this.taskRepository.save(task); - - session.status = 'error'; - session.updatedAt = new Date(); - await this.sessionRepository.save(session); - } - - if (event.type === 'approval_required') { - task.status = TaskStatus.AWAITING_APPROVAL; - await this.taskRepository.save(task); - } - } - this.logger.log(`[Task ${task.id}] Stream ended after ${eventCount} events`); - } catch (error) { - this.logger.error(`[Task ${task.id}] Stream error: ${error instanceof Error ? error.message : error}`); - task.status = TaskStatus.FAILED; - task.result = error instanceof Error ? error.message : 'Unknown error'; - task.completedAt = new Date(); - await this.taskRepository.save(task); - - session.status = 'error'; - session.updatedAt = new Date(); - await this.sessionRepository.save(session); - - this.gateway.emitStreamEvent(session.id, { - type: 'error', - message: task.result, - code: 'EXECUTION_ERROR', - }); - } - })(); + // Fire-and-forget: run the task stream + this.runTaskStream(engine, session, task, { + prompt: body.prompt, + systemPrompt: body.systemPrompt || '', + allowedTools: body.allowedTools || [], + maxTurns: body.maxTurns || 10, + conversationHistory: historyForEngine.length > 0 ? historyForEngine : undefined, + resumeSessionId, + }); return { sessionId: session.id, taskId: task.id }; } @Delete('tasks/:taskId') - async cancelTask(@Param('taskId') taskId: string) { + async cancelTask(@TenantId() tenantId: string, @Param('taskId') taskId: string) { const task = await this.taskRepository.findById(taskId); if (!task) { throw new NotFoundException(`Task ${taskId} not found`); } + // Tenant isolation + if (task.tenantId !== tenantId) { + throw new ForbiddenException('Task does not belong to this tenant'); + } + + // Idempotent: already finished → return immediately + if (task.status === TaskStatus.CANCELLED || task.status === TaskStatus.COMPLETED) { + return { message: 'Task already finished', taskId }; + } + const session = await this.sessionRepository.findById(task.sessionId); if (!session) { throw new NotFoundException(`Session ${task.sessionId} not found`); } - const engine = this.engineRegistry.getActiveEngine(); + // Use the engine that created this session, not the globally active one + const engine = this.engineRegistry.switchEngine(session.engineType as AgentEngineType); + + // Capture SDK session ID BEFORE cancelling (cancelTask deletes from activeSessions) + this.captureSdkSessionId(engine, session, task.id); + await engine.cancelTask(session.id); task.status = TaskStatus.CANCELLED; task.completedAt = new Date(); await this.taskRepository.save(task); - session.status = 'cancelled'; + // Keep session ACTIVE for reuse (not 'cancelled') + session.status = 'active'; session.updatedAt = new Date(); await this.sessionRepository.save(session); + // Emit cancelled event via WebSocket so client knows stream is done + this.gateway.emitStreamEvent(session.id, { + type: 'cancelled', + message: 'Task cancelled by user', + code: 'USER_CANCEL', + }); + return { message: 'Task cancelled', taskId }; } + @Post('tasks/:taskId/inject') + async injectMessage( + @TenantId() tenantId: string, + @Param('taskId') taskId: string, + @Body() body: { message: string }, + ) { + if (!body.message?.trim()) { + throw new BadRequestException('Message is required'); + } + + const currentTask = await this.taskRepository.findById(taskId); + if (!currentTask) { + throw new NotFoundException(`Task ${taskId} not found`); + } + + // Tenant isolation + if (currentTask.tenantId !== tenantId) { + throw new ForbiddenException('Task does not belong to this tenant'); + } + + const session = await this.sessionRepository.findById(currentTask.sessionId); + if (!session) { + throw new NotFoundException(`Session ${currentTask.sessionId} not found`); + } + + // Use the engine that created this session, not the globally active one + const engine = this.engineRegistry.switchEngine(session.engineType as AgentEngineType); + + // 1. Capture SDK session ID before cancel + this.captureSdkSessionId(engine, session, taskId); + + // 2. Cancel current task + await engine.cancelTask(session.id); + currentTask.status = TaskStatus.CANCELLED; + currentTask.completedAt = new Date(); + await this.taskRepository.save(currentTask); + + // Emit cancelled event so client knows old stream is done + this.gateway.emitStreamEvent(session.id, { + type: 'cancelled', + message: 'Interrupted by user injection', + code: 'USER_INJECT', + }); + + // 3. Keep session active + session.status = 'active'; + session.updatedAt = new Date(); + await this.sessionRepository.save(session); + + // Wait for old task's cleanup (partial text save) before loading context + await this.awaitTaskCleanup(taskId); + + // 4. Save injected user message to conversation history + await this.contextService.saveUserMessage(session.id, body.message); + + // 5. Create new task + const newTask = new AgentTask(); + newTask.id = crypto.randomUUID(); + newTask.tenantId = tenantId; + newTask.sessionId = session.id; + newTask.prompt = body.message; + newTask.status = TaskStatus.RUNNING; + newTask.startedAt = new Date(); + newTask.createdAt = new Date(); + await this.taskRepository.save(newTask); + + // 6. Load conversation history (includes partial text from cancelled task + new user message) + const conversationHistory = await this.contextService.loadContext(session.id, 20); + const historyForEngine = conversationHistory.slice(0, -1); + + const isSdkEngine = engine.engineType === AgentEngineType.CLAUDE_AGENT_SDK; + const resumeSessionId = isSdkEngine + ? (session.metadata as any)?.sdkSessionId as string | undefined + : undefined; + + // 7. Emit task_info so client picks up the new taskId + this.gateway.emitStreamEvent(session.id, { + type: 'task_info', + taskId: newTask.id, + }); + + // 8. Fire-and-forget: run the new task stream + this.runTaskStream(engine, session, newTask, { + prompt: body.message, + systemPrompt: session.systemPrompt || '', + allowedTools: [], + maxTurns: 10, + conversationHistory: historyForEngine.length > 0 ? historyForEngine : undefined, + resumeSessionId, + }); + + return { sessionId: session.id, taskId: newTask.id, injected: true }; + } + @Post('tasks/:taskId/approve') - async approveCommand(@Param('taskId') taskId: string, @Body() body: { approved: boolean }) { + async approveCommand(@TenantId() tenantId: string, @Param('taskId') taskId: string, @Body() body: { approved: boolean }) { const task = await this.taskRepository.findById(taskId); if (!task) { throw new NotFoundException(`Task ${taskId} not found`); } + // Tenant isolation + if (task.tenantId !== tenantId) { + throw new ForbiddenException('Task does not belong to this tenant'); + } + if (task.status !== TaskStatus.AWAITING_APPROVAL) { throw new BadRequestException(`Task ${taskId} is not awaiting approval (current status: ${task.status})`); } @@ -231,7 +282,8 @@ export class AgentController { throw new NotFoundException(`Session ${task.sessionId} not found`); } - const engine = this.engineRegistry.getActiveEngine(); + // Use the engine that created this session, not the globally active one + const engine = this.engineRegistry.switchEngine(session.engineType as AgentEngineType); if (body.approved) { task.status = TaskStatus.RUNNING; @@ -239,11 +291,16 @@ export class AgentController { // Fire-and-forget: continue the session and stream events (async () => { + const textParts: string[] = []; try { const stream = engine.continueSession(session.id, 'approved'); for await (const event of stream) { this.gateway.emitStreamEvent(session.id, event); + if (event.type === 'text') { + textParts.push(event.content); + } + if (event.type === 'completed') { task.status = TaskStatus.COMPLETED; task.result = event.summary; @@ -251,6 +308,13 @@ export class AgentController { task.completedAt = new Date(); await this.taskRepository.save(task); + // Save assistant response to conversation history + const assistantText = textParts.join('') || event.summary; + if (assistantText) { + await this.contextService.saveAssistantMessage(session.id, assistantText); + } + + this.captureSdkSessionId(engine, session, task.id); session.updatedAt = new Date(); await this.sessionRepository.save(session); } @@ -291,10 +355,18 @@ export class AgentController { await engine.cancelTask(session.id); - session.status = 'cancelled'; + // Keep session active for reuse (not 'cancelled') + session.status = 'active'; session.updatedAt = new Date(); await this.sessionRepository.save(session); + // Notify client via WebSocket so stream listeners can clean up + this.gateway.emitStreamEvent(session.id, { + type: 'cancelled', + message: 'Command rejected by user', + code: 'USER_REJECT', + }); + return { message: 'Command rejected', taskId }; } } @@ -311,6 +383,172 @@ export class AgentController { return { message: 'Engine switched', engineType: engine.engineType }; } + // --------------------------------------------------------------------------- + // Private helpers + // --------------------------------------------------------------------------- + + /** + * Shared fire-and-forget stream processing for executeTask and injectMessage. + */ + private runTaskStream( + engine: AgentEnginePort, + session: AgentSession, + task: AgentTask, + params: { + prompt: string; + systemPrompt: string; + allowedTools: string[]; + maxTurns: number; + conversationHistory?: Array<{ role: 'user' | 'assistant'; content: string | any[] }>; + resumeSessionId?: string; + }, + ) { + const isSdkEngine = engine.engineType === AgentEngineType.CLAUDE_AGENT_SDK; + + const taskPromise = (async () => { + let finished = false; + const textParts: string[] = []; + + try { + this.logger.log(`[Task ${task.id}] Starting engine stream for session=${session.id}, prompt="${params.prompt.slice(0, 80)}"`); + + const stream = engine.executeTask({ + sessionId: session.id, + prompt: params.prompt, + systemPrompt: params.systemPrompt, + allowedTools: params.allowedTools, + maxTurns: params.maxTurns, + conversationHistory: params.conversationHistory, + resumeSessionId: params.resumeSessionId, + }); + + let eventCount = 0; + + for await (const event of stream) { + eventCount++; + this.logger.log(`[Task ${task.id}] Event #${eventCount}: type=${event.type}${event.type === 'text' ? ` len=${(event as any).content?.length}` : ''}${event.type === 'error' ? ` msg=${(event as any).message}` : ''}`); + this.gateway.emitStreamEvent(session.id, event); + + // Collect text for assistant message + if (event.type === 'text') { + textParts.push(event.content); + } + + if (event.type === 'completed' && !finished) { + finished = true; + task.status = TaskStatus.COMPLETED; + task.result = event.summary; + task.tokensUsed = event.tokensUsed; + task.completedAt = new Date(); + await this.taskRepository.save(task); + + // Save assistant response to conversation history + const assistantText = textParts.join('') || event.summary; + if (assistantText) { + await this.contextService.saveAssistantMessage(session.id, assistantText); + } + + // For SDK engine: capture the SDK session ID for future resume + this.captureSdkSessionId(engine, session, task.id); + + // Keep session active so it can be reused (also persists captured SDK session ID) + session.updatedAt = new Date(); + await this.sessionRepository.save(session); + } + + if (event.type === 'error' && !finished) { + finished = true; + task.status = TaskStatus.FAILED; + task.result = event.message; + task.completedAt = new Date(); + await this.taskRepository.save(task); + + session.status = 'error'; + session.updatedAt = new Date(); + await this.sessionRepository.save(session); + } + + if (event.type === 'approval_required') { + task.status = TaskStatus.AWAITING_APPROVAL; + await this.taskRepository.save(task); + } + } + this.logger.log(`[Task ${task.id}] Stream ended after ${eventCount} events`); + } catch (error) { + this.logger.error(`[Task ${task.id}] Stream error: ${error instanceof Error ? error.message : error}`); + if (!finished) { + task.status = TaskStatus.FAILED; + task.result = error instanceof Error ? error.message : 'Unknown error'; + task.completedAt = new Date(); + await this.taskRepository.save(task); + + session.status = 'error'; + session.updatedAt = new Date(); + await this.sessionRepository.save(session); + + this.gateway.emitStreamEvent(session.id, { + type: 'error', + message: task.result ?? 'Unknown error', + code: 'EXECUTION_ERROR', + }); + } + } finally { + // Save partial assistant text if stream was interrupted (cancelled/aborted) + if (!finished && textParts.length > 0) { + const partialText = textParts.join(''); + if (partialText) { + try { + await this.contextService.saveAssistantMessage( + session.id, + partialText + '\n[中断]', + ); + this.logger.log(`[Task ${task.id}] Saved partial assistant text (${partialText.length} chars)`); + } catch (saveErr) { + this.logger.error(`[Task ${task.id}] Failed to save partial text: ${saveErr}`); + } + } + // Note: SDK session ID is already captured by cancelTask/injectMessage + // BEFORE engine.cancelTask() deletes it from activeSessions. + } + } + })(); + + // Track the promise so cancel/inject can await cleanup + this.runningTasks.set(task.id, taskPromise); + taskPromise.finally(() => this.runningTasks.delete(task.id)); + } + + /** + * Await the running task's cleanup (partial text save). + * Returns immediately if no task is running for this ID. + */ + private async awaitTaskCleanup(taskId: string, timeoutMs = 3000): Promise { + const p = this.runningTasks.get(taskId); + if (!p) return; + try { + await Promise.race([ + p, + new Promise(resolve => setTimeout(resolve, timeoutMs)), + ]); + } catch (_) { + // Swallow — errors are already handled inside runTaskStream + } + } + + /** + * Capture SDK session ID into session.metadata (does NOT save to DB). + * Callers are responsible for persisting the session afterwards. + */ + private captureSdkSessionId(engine: AgentEnginePort, session: AgentSession, taskId: string) { + if (engine.engineType === AgentEngineType.CLAUDE_AGENT_SDK && engine instanceof ClaudeAgentSdkEngine) { + const sdkSessionId = engine.getSdkSessionId(session.id); + if (sdkSessionId) { + session.metadata = { ...session.metadata, sdkSessionId }; + this.logger.log(`[Task ${taskId}] Captured SDK session ID: ${sdkSessionId}`); + } + } + } + private createNewSession(tenantId: string, engineType: string, systemPrompt?: string): AgentSession { const session = new AgentSession(); session.id = crypto.randomUUID();