import { Controller, Post, Body, Param, Delete, Get, NotFoundException, BadRequestException, ForbiddenException, Logger, UseInterceptors, UploadedFile, Req } from '@nestjs/common'; import { FileInterceptor } from '@nestjs/platform-express'; import { memoryStorage } from 'multer'; import { TenantId, EventPatterns } from '@it0/common'; import { OpenAISttService } from '../../../infrastructure/stt/openai-stt.service'; import { EngineRegistry } from '../../../infrastructure/engines/engine-registry'; import { AgentStreamGateway } from '../../ws/agent-stream.gateway'; import { SessionRepository } from '../../../infrastructure/repositories/session.repository'; import { TaskRepository } from '../../../infrastructure/repositories/task.repository'; import { UsageRecordRepository } from '../../../infrastructure/repositories/usage-record.repository'; import { ConversationContextService } from '../../../domain/services/conversation-context.service'; import { EventPublisherService } from '../../../infrastructure/messaging/event-publisher.service'; import { AgentSession } from '../../../domain/entities/agent-session.entity'; import { AgentTask } from '../../../domain/entities/agent-task.entity'; import { UsageRecord } from '../../../domain/entities/usage-record.entity'; import { TaskStatus } from '../../../domain/value-objects/task-status.vo'; import { AgentEngineType } from '../../../domain/value-objects/agent-engine-type.vo'; import { AgentEnginePort, EngineStreamEvent } from '../../../domain/ports/outbound/agent-engine.port'; import { ClaudeAgentSdkEngine } from '../../../infrastructure/engines/claude-agent-sdk/claude-agent-sdk-engine'; import { SystemPromptBuilder } from '../../../infrastructure/engines/claude-code-cli/system-prompt-builder'; import { AgentInstanceRepository } from '../../../infrastructure/repositories/agent-instance.repository'; import * as crypto from 'crypto'; @Controller('api/v1/agent') export class AgentController { private readonly logger = new Logger(AgentController.name); /** Tracks running task promises so cancel/inject can await cleanup. */ private readonly runningTasks = new Map>(); constructor( private readonly engineRegistry: EngineRegistry, private readonly gateway: AgentStreamGateway, private readonly sessionRepository: SessionRepository, private readonly taskRepository: TaskRepository, private readonly usageRecordRepository: UsageRecordRepository, private readonly contextService: ConversationContextService, private readonly eventPublisher: EventPublisherService, private readonly sttService: OpenAISttService, private readonly systemPromptBuilder: SystemPromptBuilder, private readonly instanceRepository: AgentInstanceRepository, ) {} @Post('tasks') async executeTask( @TenantId() tenantId: string, @Req() req: any, @Body() body: { prompt: string; sessionId?: string; systemPrompt?: string; maxTurns?: number; allowedTools?: string[]; engineType?: string; maxContextMessages?: number; voiceMode?: boolean; attachments?: Array<{ base64Data: string; mediaType: string; fileName?: string }>; }, ) { // Allow callers to override the engine (e.g. voice uses claude_api for streaming) const engine = body.engineType ? this.engineRegistry.switchEngine(body.engineType as AgentEngineType) : this.engineRegistry.getActiveEngine(); // Reuse existing session or create new one const isVoice = body.voiceMode ?? false; let session: AgentSession; if (body.sessionId) { const existing = await this.sessionRepository.findById(body.sessionId); if (existing && existing.status === 'active' && existing.tenantId === tenantId) { session = existing; } else { session = this.createNewSession(tenantId, engine.engineType, body.systemPrompt, isVoice); } } else { session = this.createNewSession(tenantId, engine.engineType, body.systemPrompt, isVoice); } // Set a human-readable title on the FIRST task of this session. // This title is stored in metadata so the session list can display it // without ever exposing the internal systemPrompt string to the client. // - Text sessions: truncate the first user prompt to 40 chars // - Voice sessions: leave title empty; Flutter renders "语音对话 M/D HH:mm" if (!(session.metadata as Record).title && !(session.metadata as Record).titleSet) { session.metadata = { ...session.metadata as Record, voiceMode: isVoice, title: isVoice ? '' : body.prompt.substring(0, 40).trim(), titleSet: true, // prevent overwrite on subsequent turns }; } // Keep session active for multi-turn session.status = 'active'; session.updatedAt = new Date(); await this.sessionRepository.save(session); // Prevent concurrent tasks on the same session: cancel any still-running task const existingRunning = await this.taskRepository.findRunningBySessionId(session.id); if (existingRunning) { this.logger.warn(`[Session ${session.id}] Cancelling stale running task ${existingRunning.id} before starting new one`); this.captureSdkSessionId(engine, session, existingRunning.id); await engine.cancelTask(session.id).catch(() => {}); existingRunning.status = TaskStatus.CANCELLED; existingRunning.completedAt = new Date(); await this.taskRepository.save(existingRunning); await this.awaitTaskCleanup(existingRunning.id); } const task = new AgentTask(); task.id = crypto.randomUUID(); task.tenantId = tenantId; task.sessionId = session.id; task.prompt = body.prompt; task.status = TaskStatus.RUNNING; task.startedAt = new Date(); task.createdAt = new Date(); await this.taskRepository.save(task); // Save user message to conversation history await this.contextService.saveUserMessage(session.id, body.prompt, body.attachments); // Load conversation history for context const maxCtx = body.maxContextMessages ?? 20; const conversationHistory = await this.contextService.loadContext(session.id, maxCtx); this.logger.log(`[Task ${task.id}] Loaded ${conversationHistory.length} history messages for session=${session.id}`); // When the current message has attachments, keep it in history (it has image content blocks). // Otherwise, strip it so the engine adds a plain-text user message. const hasAttachments = body.attachments && body.attachments.length > 0; const historyForEngine = hasAttachments ? conversationHistory // includes current user message with image blocks : conversationHistory.slice(0, -1); // For SDK engine: load previous SDK session ID for native resume const isSdkEngine = engine.engineType === AgentEngineType.CLAUDE_AGENT_SDK; const resumeSessionId = isSdkEngine ? (session.metadata as any)?.sdkSessionId as string | undefined : undefined; if (resumeSessionId) { this.logger.log(`[Task ${task.id}] Resuming SDK session: ${resumeSessionId}`); } // Build system prompt with user context so iAgent knows who it's serving. // Kong already verifies the JWT — just decode the payload (no signature check needed). const jwtPayload = this.decodeJwt(req.headers?.['authorization'] as string | undefined); const userId: string | undefined = jwtPayload?.sub ?? req.user?.sub ?? req.user?.userId; const userEmail: string | undefined = jwtPayload?.email ?? req.user?.email; const systemPrompt = body.systemPrompt || this.systemPromptBuilder.build({ tenantId, userId, userEmail, // Pass session ID so Claude uses the wget oauth-trigger endpoint (works for ALL engine types). // The oauth-trigger endpoint emits an oauth_prompt WS event to this session's stream, // which Flutter's chat page handles the same way as voice sessions. sessionId: session.id, }); // Fire-and-forget: run the task stream this.runTaskStream(engine, session, task, { prompt: body.prompt, systemPrompt, allowedTools: body.allowedTools || [], maxTurns: body.maxTurns || 10, conversationHistory: historyForEngine.length > 0 ? historyForEngine : undefined, resumeSessionId, voiceMode: body.voiceMode ?? false, }); return { sessionId: session.id, taskId: task.id }; } @Delete('tasks/:taskId') async cancelTask(@TenantId() tenantId: string, @Param('taskId') taskId: string) { const task = await this.taskRepository.findById(taskId); if (!task) { throw new NotFoundException(`Task ${taskId} not found`); } // Tenant isolation if (task.tenantId !== tenantId) { throw new ForbiddenException('Task does not belong to this tenant'); } // Idempotent: already finished → return immediately if (task.status === TaskStatus.CANCELLED || task.status === TaskStatus.COMPLETED) { return { message: 'Task already finished', taskId }; } const session = await this.sessionRepository.findById(task.sessionId); if (!session) { throw new NotFoundException(`Session ${task.sessionId} not found`); } // Use the engine that created this session, not the globally active one const engine = this.engineRegistry.switchEngine(session.engineType as AgentEngineType); // Capture SDK session ID BEFORE cancelling (cancelTask deletes from activeSessions) this.captureSdkSessionId(engine, session, task.id); await engine.cancelTask(session.id); task.status = TaskStatus.CANCELLED; task.completedAt = new Date(); await this.taskRepository.save(task); // Keep session ACTIVE for reuse (not 'cancelled') session.status = 'active'; session.updatedAt = new Date(); await this.sessionRepository.save(session); // Emit cancelled event via WebSocket so client knows stream is done this.gateway.emitStreamEvent(session.id, { type: 'cancelled', message: 'Task cancelled by user', code: 'USER_CANCEL', }); return { message: 'Task cancelled', taskId }; } @Post('tasks/:taskId/inject') async injectMessage( @TenantId() tenantId: string, @Param('taskId') taskId: string, @Body() body: { message: string }, ) { if (!body.message?.trim()) { throw new BadRequestException('Message is required'); } const currentTask = await this.taskRepository.findById(taskId); if (!currentTask) { throw new NotFoundException(`Task ${taskId} not found`); } // Tenant isolation if (currentTask.tenantId !== tenantId) { throw new ForbiddenException('Task does not belong to this tenant'); } const session = await this.sessionRepository.findById(currentTask.sessionId); if (!session) { throw new NotFoundException(`Session ${currentTask.sessionId} not found`); } // Use the engine that created this session, not the globally active one const engine = this.engineRegistry.switchEngine(session.engineType as AgentEngineType); // 1. Capture SDK session ID before cancel this.captureSdkSessionId(engine, session, taskId); // 2. Cancel current task await engine.cancelTask(session.id); currentTask.status = TaskStatus.CANCELLED; currentTask.completedAt = new Date(); await this.taskRepository.save(currentTask); // Emit cancelled event so client knows old stream is done this.gateway.emitStreamEvent(session.id, { type: 'cancelled', message: 'Interrupted by user injection', code: 'USER_INJECT', }); // 3. Keep session active session.status = 'active'; session.updatedAt = new Date(); await this.sessionRepository.save(session); // Wait for old task's cleanup (partial text save) before loading context await this.awaitTaskCleanup(taskId); // 4. Save injected user message to conversation history await this.contextService.saveUserMessage(session.id, body.message); // 5. Create new task const newTask = new AgentTask(); newTask.id = crypto.randomUUID(); newTask.tenantId = tenantId; newTask.sessionId = session.id; newTask.prompt = body.message; newTask.status = TaskStatus.RUNNING; newTask.startedAt = new Date(); newTask.createdAt = new Date(); await this.taskRepository.save(newTask); // 6. Load conversation history (includes partial text from cancelled task + new user message) const conversationHistory = await this.contextService.loadContext(session.id, 20); const historyForEngine = conversationHistory.slice(0, -1); const isSdkEngine = engine.engineType === AgentEngineType.CLAUDE_AGENT_SDK; const resumeSessionId = isSdkEngine ? (session.metadata as any)?.sdkSessionId as string | undefined : undefined; // 7. Emit task_info so client picks up the new taskId this.gateway.emitStreamEvent(session.id, { type: 'task_info', taskId: newTask.id, }); // 8. Fire-and-forget: run the new task stream this.runTaskStream(engine, session, newTask, { prompt: body.message, systemPrompt: session.systemPrompt || '', allowedTools: [], maxTurns: 10, conversationHistory: historyForEngine.length > 0 ? historyForEngine : undefined, resumeSessionId, }); return { sessionId: session.id, taskId: newTask.id, injected: true }; } @Post('tasks/:taskId/approve') async approveCommand(@TenantId() tenantId: string, @Param('taskId') taskId: string, @Body() body: { approved: boolean }) { const task = await this.taskRepository.findById(taskId); if (!task) { throw new NotFoundException(`Task ${taskId} not found`); } // Tenant isolation if (task.tenantId !== tenantId) { throw new ForbiddenException('Task does not belong to this tenant'); } if (task.status !== TaskStatus.AWAITING_APPROVAL) { throw new BadRequestException(`Task ${taskId} is not awaiting approval (current status: ${task.status})`); } const session = await this.sessionRepository.findById(task.sessionId); if (!session) { throw new NotFoundException(`Session ${task.sessionId} not found`); } // Use the engine that created this session, not the globally active one const engine = this.engineRegistry.switchEngine(session.engineType as AgentEngineType); if (body.approved) { task.status = TaskStatus.RUNNING; await this.taskRepository.save(task); // Fire-and-forget: continue the session and stream events (async () => { const textParts: string[] = []; try { const stream = engine.continueSession(session.id, 'approved'); for await (const event of stream) { this.gateway.emitStreamEvent(session.id, event); if (event.type === 'text') { textParts.push(event.content); } if (event.type === 'completed') { task.status = TaskStatus.COMPLETED; task.result = event.summary; task.tokensUsed = event.tokensUsed; task.completedAt = new Date(); await this.taskRepository.save(task); // Save assistant response to conversation history const assistantText = textParts.join('') || event.summary; if (assistantText) { await this.contextService.saveAssistantMessage(session.id, assistantText); } this.captureSdkSessionId(engine, session, task.id); session.updatedAt = new Date(); await this.sessionRepository.save(session); } if (event.type === 'error') { task.status = TaskStatus.FAILED; task.result = event.message; task.completedAt = new Date(); await this.taskRepository.save(task); session.status = 'error'; session.updatedAt = new Date(); await this.sessionRepository.save(session); } if (event.type === 'approval_required') { task.status = TaskStatus.AWAITING_APPROVAL; await this.taskRepository.save(task); } } } catch (error) { task.status = TaskStatus.FAILED; task.result = error instanceof Error ? error.message : 'Unknown error'; task.completedAt = new Date(); await this.taskRepository.save(task); session.status = 'error'; session.updatedAt = new Date(); await this.sessionRepository.save(session); } })(); return { message: 'Command approved', taskId }; } else { task.status = TaskStatus.CANCELLED; task.completedAt = new Date(); await this.taskRepository.save(task); await engine.cancelTask(session.id); // Keep session active for reuse (not 'cancelled') session.status = 'active'; session.updatedAt = new Date(); await this.sessionRepository.save(session); // Notify client via WebSocket so stream listeners can clean up this.gateway.emitStreamEvent(session.id, { type: 'cancelled', message: 'Command rejected by user', code: 'USER_REJECT', }); return { message: 'Command rejected', taskId }; } } /** * Transcribe audio to text (STT only — does NOT trigger the agent). * * POST /api/v1/agent/transcribe * Content-Type: multipart/form-data * Fields: audio (file), language? (string, default 'zh') * * Response: { text: string } */ @Post('transcribe') @UseInterceptors(FileInterceptor('audio', { storage: memoryStorage() })) async transcribeAudio( @UploadedFile() file: { buffer: Buffer; originalname: string; mimetype: string } | undefined, @Body('language') language?: string, ) { if (!file?.buffer?.length) { throw new BadRequestException('audio file is required'); } // language=undefined → Whisper auto-detects (best for mixed-language input) const text = await this.sttService.transcribe( file.buffer, file.originalname || 'audio.m4a', language || undefined, ); return { text: text?.trim() ?? '' }; } /** * Voice message endpoint — WhatsApp-style push-to-talk. * * Accepts a recorded audio file, transcribes it via Whisper, then: * • If the session has a running task → hard-interrupt it and inject the transcript * • Otherwise → start a fresh task with the transcript * * This naturally solves the speaker-diarization problem: whoever presses * record owns the message. It also doubles as an async voice interrupt: * the user can send audio commands into any active agent workflow. * * POST /api/v1/agent/sessions/:sessionId/voice-message * Content-Type: multipart/form-data * Fields: audio (file), language? (string, default 'zh') * * Response: { sessionId, taskId, transcript } */ @Post('sessions/:sessionId/voice-message') @UseInterceptors(FileInterceptor('audio', { storage: memoryStorage() })) async sendVoiceMessage( @TenantId() tenantId: string, @Param('sessionId') sessionId: string, @UploadedFile() file: { buffer: Buffer; originalname: string; mimetype: string } | undefined, @Body('language') language?: string, ) { if (!file?.buffer?.length) { throw new BadRequestException('audio file is required'); } let session = await this.sessionRepository.findById(sessionId); if (!session || session.tenantId !== tenantId) { // No existing session (e.g. first voice message, sessionId = 'new') — auto-create one session = this.createNewSession(tenantId, this.engineRegistry.getActiveEngine().engineType); } // STT: transcribe audio → text const transcript = await this.sttService.transcribe( file.buffer, file.originalname || 'audio.m4a', language ?? 'zh', ); if (!transcript?.trim()) { throw new BadRequestException('Could not transcribe audio — empty result'); } const engine = this.engineRegistry.switchEngine(session.engineType as AgentEngineType); // Hard-interrupt any running task so the voice command takes effect immediately const runningTask = await this.taskRepository.findRunningBySessionId(sessionId); if (runningTask) { this.logger.log(`[VoiceMsg ${sessionId}] Interrupting running task ${runningTask.id}`); this.captureSdkSessionId(engine, session, runningTask.id); await engine.cancelTask(session.id).catch(() => {}); runningTask.status = TaskStatus.CANCELLED; runningTask.completedAt = new Date(); await this.taskRepository.save(runningTask); await this.awaitTaskCleanup(runningTask.id); this.gateway.emitStreamEvent(session.id, { type: 'cancelled', message: 'Interrupted by voice message', code: 'VOICE_INJECT', }); } // Save user message and build context await this.contextService.saveUserMessage(session.id, transcript); const conversationHistory = await this.contextService.loadContext(session.id, 20); const historyForEngine = conversationHistory.slice(0, -1); const isSdkEngine = engine.engineType === AgentEngineType.CLAUDE_AGENT_SDK; const resumeSessionId = isSdkEngine ? (session.metadata as any)?.sdkSessionId as string | undefined : undefined; // Create task record const task = new AgentTask(); task.id = crypto.randomUUID(); task.tenantId = tenantId; task.sessionId = session.id; task.prompt = transcript; task.status = TaskStatus.RUNNING; task.startedAt = new Date(); task.createdAt = new Date(); await this.taskRepository.save(task); session.status = 'active'; session.updatedAt = new Date(); await this.sessionRepository.save(session); // Notify WS subscribers of the new task this.gateway.emitStreamEvent(session.id, { type: 'task_info', taskId: task.id }); // Fire-and-forget stream this.runTaskStream(engine, session, task, { prompt: transcript, systemPrompt: session.systemPrompt || '', allowedTools: [], maxTurns: 10, conversationHistory: historyForEngine.length > 0 ? historyForEngine : undefined, resumeSessionId, }); return { sessionId: session.id, taskId: task.id, transcript }; } @Get('engines') async listEngines() { const engines = this.engineRegistry.listAvailable(); return { engines }; } @Post('engines/switch') async switchEngine(@Body() body: { engineType: string }) { const engine = this.engineRegistry.switchEngine(body.engineType as AgentEngineType); return { message: 'Engine switched', engineType: engine.engineType }; } // --------------------------------------------------------------------------- // Private helpers // --------------------------------------------------------------------------- /** * Shared fire-and-forget stream processing for executeTask and injectMessage. */ private runTaskStream( engine: AgentEnginePort, session: AgentSession, task: AgentTask, params: { prompt: string; systemPrompt: string; allowedTools: string[]; maxTurns: number; conversationHistory?: Array<{ role: 'user' | 'assistant'; content: string | any[] }>; resumeSessionId?: string; voiceMode?: boolean; }, ) { const isSdkEngine = engine.engineType === AgentEngineType.CLAUDE_AGENT_SDK; const voiceMode = params.voiceMode ?? false; /** Event types to suppress in voice mode (only forward text/completed/error) */ const voiceFilteredTypes = new Set(['thinking', 'tool_use', 'tool_result']); if (voiceMode) { this.logger.log(`[Task ${task.id}] Voice mode ON — filtering ${[...voiceFilteredTypes].join(', ')} events`); } const taskPromise = (async () => { let finished = false; const textParts: string[] = []; try { this.logger.log(`[Task ${task.id}] Starting engine stream for session=${session.id}, prompt="${params.prompt.slice(0, 80)}"`); const stream = engine.executeTask({ sessionId: session.id, prompt: params.prompt, systemPrompt: params.systemPrompt, allowedTools: params.allowedTools, maxTurns: params.maxTurns, conversationHistory: params.conversationHistory, resumeSessionId: params.resumeSessionId, }); let eventCount = 0; for await (const event of stream) { eventCount++; const isFiltered = voiceMode && voiceFilteredTypes.has(event.type); this.logger.log(`[Task ${task.id}] Event #${eventCount}: type=${event.type}${event.type === 'text' ? ` len=${(event as any).content?.length}` : ''}${event.type === 'error' ? ` msg=${(event as any).message}` : ''}${isFiltered ? ' [FILTERED-voice]' : ''}`); // In voice mode, skip intermediate events (tool_use, tool_result, thinking) // but still process lifecycle events below (completed/error/approval) if (!isFiltered) { this.gateway.emitStreamEvent(session.id, event); } // Collect text for assistant message if (event.type === 'text') { textParts.push(event.content); } if (event.type === 'completed' && !finished) { finished = true; task.status = TaskStatus.COMPLETED; task.result = event.summary; task.tokensUsed = event.tokensUsed; task.completedAt = new Date(); await this.taskRepository.save(task); // Save assistant response to conversation history const assistantText = textParts.join('') || event.summary; if (assistantText) { await this.contextService.saveAssistantMessage(session.id, assistantText); } // For SDK engine: capture the SDK session ID for future resume this.captureSdkSessionId(engine, session, task.id); // Keep session active so it can be reused (also persists captured SDK session ID) session.updatedAt = new Date(); await this.sessionRepository.save(session); // Record usage for billing (non-blocking) this.recordUsage(session.tenantId, task, session, event, engine.engineType).catch( (err) => this.logger.error(`[Task ${task.id}] Usage recording failed: ${err.message}`), ); } if (event.type === 'error' && !finished) { finished = true; task.status = TaskStatus.FAILED; task.result = event.message; task.completedAt = new Date(); await this.taskRepository.save(task); session.status = 'error'; session.updatedAt = new Date(); await this.sessionRepository.save(session); } if (event.type === 'approval_required') { task.status = TaskStatus.AWAITING_APPROVAL; await this.taskRepository.save(task); } } this.logger.log(`[Task ${task.id}] Stream ended after ${eventCount} events`); } catch (error) { this.logger.error(`[Task ${task.id}] Stream error: ${error instanceof Error ? error.message : error}`); if (!finished) { task.status = TaskStatus.FAILED; task.result = error instanceof Error ? error.message : 'Unknown error'; task.completedAt = new Date(); await this.taskRepository.save(task); session.status = 'error'; session.updatedAt = new Date(); await this.sessionRepository.save(session); this.gateway.emitStreamEvent(session.id, { type: 'error', message: task.result ?? 'Unknown error', code: 'EXECUTION_ERROR', }); } } finally { // Save partial assistant text if stream was interrupted (cancelled/aborted) if (!finished && textParts.length > 0) { const partialText = textParts.join(''); if (partialText) { try { await this.contextService.saveAssistantMessage( session.id, partialText + '\n[中断]', ); this.logger.log(`[Task ${task.id}] Saved partial assistant text (${partialText.length} chars)`); } catch (saveErr) { this.logger.error(`[Task ${task.id}] Failed to save partial text: ${saveErr}`); } } // Note: SDK session ID is already captured by cancelTask/injectMessage // BEFORE engine.cancelTask() deletes it from activeSessions. } } })(); // Track the promise so cancel/inject can await cleanup this.runningTasks.set(task.id, taskPromise); taskPromise.finally(() => this.runningTasks.delete(task.id)); } /** * Await the running task's cleanup (partial text save). * Returns immediately if no task is running for this ID. */ private async awaitTaskCleanup(taskId: string, timeoutMs = 3000): Promise { const p = this.runningTasks.get(taskId); if (!p) return; try { await Promise.race([ p, new Promise(resolve => setTimeout(resolve, timeoutMs)), ]); } catch (_) { // Swallow — errors are already handled inside runTaskStream } } /** * Capture SDK session ID into session.metadata (does NOT save to DB). * Callers are responsible for persisting the session afterwards. */ private captureSdkSessionId(engine: AgentEnginePort, session: AgentSession, taskId: string) { if (engine.engineType === AgentEngineType.CLAUDE_AGENT_SDK && engine instanceof ClaudeAgentSdkEngine) { const sdkSessionId = engine.getSdkSessionId(session.id); if (sdkSessionId) { session.metadata = { ...session.metadata, sdkSessionId }; this.logger.log(`[Task ${taskId}] Captured SDK session ID: ${sdkSessionId}`); } } } private async recordUsage( tenantId: string, task: AgentTask, session: AgentSession, event: Extract, engineType: string, ): Promise { const inputTokens = event.inputTokens ?? 0; const outputTokens = event.outputTokens ?? 0; const totalTokens = event.tokensUsed ?? (inputTokens + outputTokens); if (totalTokens === 0) return; const record = new UsageRecord(); record.id = crypto.randomUUID(); record.tenantId = tenantId; record.taskId = task.id; record.sessionId = session.id; record.engineType = engineType; record.inputTokens = inputTokens; record.outputTokens = outputTokens; record.totalTokens = totalTokens; record.costUsd = event.costUsd ?? 0; record.model = event.model ?? ''; await this.usageRecordRepository.save(record); await this.eventPublisher.publish(EventPatterns.USAGE_RECORDED, { tenantId, taskId: task.id, sessionId: session.id, engineType, inputTokens, outputTokens, totalTokens, costUsd: event.costUsd ?? 0, model: event.model ?? '', }); } /** Decode JWT payload without verifying signature (Kong already verified it). */ private decodeJwt(authHeader: string | undefined): Record | null { if (!authHeader?.startsWith('Bearer ')) return null; try { const payload = authHeader.slice(7).split('.')[1]; return JSON.parse(Buffer.from(payload, 'base64url').toString('utf8')); } catch { return null; } } // --------------------------------------------------------------------------- // Instance chat endpoints — user chatting directly with their OpenClaw agent // --------------------------------------------------------------------------- /** * Start or continue a conversation with a specific OpenClaw agent instance. * * POST /api/v1/agent/instances/:instanceId/tasks * Body: { prompt, sessionId? } * * Routes the message to the OpenClaw bridge via /task-async and returns * immediately. The bridge POSTs the result to openclaw-app-callback when done. * Flutter subscribes to the WS session to receive the reply. */ @Post('instances/:instanceId/tasks') async executeInstanceTask( @TenantId() tenantId: string, @Req() req: any, @Param('instanceId') instanceId: string, @Body() body: { prompt: string; sessionId?: string }, ) { const instance = await this.instanceRepository.findById(instanceId); if (!instance) throw new NotFoundException(`Instance ${instanceId} not found`); // Validate that this instance belongs to the requesting user const jwtPayload = this.decodeJwt(req.headers?.['authorization'] as string | undefined); const userId: string | undefined = jwtPayload?.sub; if (userId && instance.userId !== userId) { throw new ForbiddenException('Instance does not belong to you'); } if (instance.status !== 'running') { throw new BadRequestException(`Instance is ${instance.status} — it must be running to accept messages`); } if (!instance.serverHost) { throw new BadRequestException('Instance has no server host configured'); } // Reuse existing instance session or create a new one let session: AgentSession; if (body.sessionId) { const existing = await this.sessionRepository.findById(body.sessionId); if ( existing && existing.status === 'active' && existing.tenantId === tenantId && (existing as any).agentInstanceId === instanceId ) { session = existing; } else { session = this.createInstanceSession(tenantId, instanceId, body.prompt); } } else { session = this.createInstanceSession(tenantId, instanceId, body.prompt); } session.status = 'active'; session.updatedAt = new Date(); await this.sessionRepository.save(session); // Create task record const task = new AgentTask(); task.id = crypto.randomUUID(); task.tenantId = tenantId; task.sessionId = session.id; task.prompt = body.prompt; task.status = TaskStatus.RUNNING; task.startedAt = new Date(); task.createdAt = new Date(); await this.taskRepository.save(task); // Persist user message for display in conversation history await this.contextService.saveUserMessage(session.id, body.prompt); // The OpenClaw bridge tracks conversation context internally via sessionKey. // We use our DB session ID as the key so each session has isolated context. const sessionKey = `it0:${session.id}`; const callbackUrl = `${process.env.AGENT_SERVICE_PUBLIC_URL}/api/v1/agent/instances/openclaw-app-callback`; const bridgeUrl = `http://${instance.serverHost}:${instance.hostPort}/task-async`; this.logger.log( `[Task ${task.id}] Routing to OpenClaw instance ${instanceId} @ ${bridgeUrl}, session=${session.id}`, ); // Emit session/task info events immediately so Flutter can subscribe this.gateway.emitStreamEvent(session.id, { type: 'session_info', sessionId: session.id }); this.gateway.emitStreamEvent(session.id, { type: 'task_info', taskId: task.id }); // Fire-and-forget POST to OpenClaw bridge fetch(bridgeUrl, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ prompt: body.prompt, sessionKey, idempotencyKey: task.id, callbackUrl, callbackData: { sessionId: session.id, taskId: task.id }, }), }).catch((err: Error) => { this.logger.error(`[Task ${task.id}] Bridge request failed: ${err.message}`); this.gateway.emitStreamEvent(session.id, { type: 'error', message: `无法连接到智能体:${err.message}`, }); task.status = TaskStatus.FAILED; task.result = err.message; task.completedAt = new Date(); this.taskRepository.save(task).catch(() => {}); }); return { sessionId: session.id, taskId: task.id }; } /** * List conversation sessions for a specific OpenClaw instance. * * GET /api/v1/agent/instances/:instanceId/sessions */ @Get('instances/:instanceId/sessions') async listInstanceSessions( @TenantId() tenantId: string, @Param('instanceId') instanceId: string, ) { const sessions = await this.sessionRepository.findByInstanceId(tenantId, instanceId); return sessions.map((s) => ({ id: s.id, title: (s.metadata as any)?.title ?? '', status: s.status, createdAt: s.createdAt, updatedAt: s.updatedAt, })); } /** * OpenClaw bridge callback for in-app instance chat. * Called by the bridge when an async LLM task completes. * PUBLIC — no JWT (internal bridge call from training server). * * POST /api/v1/agent/instances/openclaw-app-callback */ @Post('instances/openclaw-app-callback') async handleOpenClawAppCallback( @Body() body: { ok: boolean; result?: string; error?: string; isTimeout?: boolean; callbackData: { sessionId: string; taskId: string }; }, ) { const { ok, result, error, isTimeout, callbackData } = body; const { sessionId, taskId } = callbackData ?? {}; this.logger.log( `OpenClaw app callback: ok=${ok} taskId=${taskId} sessionId=${sessionId} ` + `${ok ? `replyLen=${result?.length ?? 0}` : `error=${error} isTimeout=${isTimeout}`}`, ); if (!sessionId || !taskId) { this.logger.warn('OpenClaw app callback missing sessionId or taskId'); return { received: true }; } const task = await this.taskRepository.findById(taskId); if (ok && result) { // Emit text + completed events so Flutter's WS stream receives the reply this.gateway.emitStreamEvent(sessionId, { type: 'text', content: result }); this.gateway.emitStreamEvent(sessionId, { type: 'completed', summary: result, tokensUsed: 0 }); // Persist assistant reply to conversation history await this.contextService.saveAssistantMessage(sessionId, result); if (task) { task.status = TaskStatus.COMPLETED; task.result = result; task.completedAt = new Date(); await this.taskRepository.save(task); } const session = await this.sessionRepository.findById(sessionId); if (session) { session.status = 'active'; session.updatedAt = new Date(); await this.sessionRepository.save(session); } } else { const errorMsg = isTimeout ? '智能体响应超时,请重试' : (error || '智能体发生错误'); this.gateway.emitStreamEvent(sessionId, { type: 'error', message: errorMsg }); if (task) { task.status = TaskStatus.FAILED; task.result = errorMsg; task.completedAt = new Date(); await this.taskRepository.save(task); } } return { received: true }; } private createNewSession( tenantId: string, engineType: string, systemPrompt?: string, voiceMode?: boolean, ): AgentSession { const session = new AgentSession(); session.id = crypto.randomUUID(); session.tenantId = tenantId; session.engineType = engineType; session.status = 'active'; session.systemPrompt = systemPrompt; // Pre-populate voiceMode so it's available even before the first task saves it session.metadata = { voiceMode: voiceMode ?? false }; session.createdAt = new Date(); session.updatedAt = new Date(); return session; } private createInstanceSession(tenantId: string, agentInstanceId: string, firstPrompt: string): AgentSession { const session = new AgentSession(); session.id = crypto.randomUUID(); session.tenantId = tenantId; session.engineType = 'openclaw'; session.agentInstanceId = agentInstanceId; session.status = 'active'; session.metadata = { title: firstPrompt.substring(0, 40).trim(), titleSet: true }; session.createdAt = new Date(); session.updatedAt = new Date(); return session; } }