diff --git a/packages/services/agent-service/src/interfaces/rest/controllers/agent.controller.ts b/packages/services/agent-service/src/interfaces/rest/controllers/agent.controller.ts index 2b399d9..f8e6325 100644 --- a/packages/services/agent-service/src/interfaces/rest/controllers/agent.controller.ts +++ b/packages/services/agent-service/src/interfaces/rest/controllers/agent.controller.ts @@ -1,7 +1,7 @@ import { Controller, Post, Body, Param, Delete, Get, NotFoundException, BadRequestException, ForbiddenException, Logger, UseInterceptors, UploadedFile, Req } from '@nestjs/common'; import { FileInterceptor } from '@nestjs/platform-express'; import { memoryStorage } from 'multer'; -import { TenantId, EventPatterns } from '@it0/common'; +import { TenantId, EventPatterns, TenantContextService } from '@it0/common'; import { OpenAISttService } from '../../../infrastructure/stt/openai-stt.service'; import { EngineRegistry } from '../../../infrastructure/engines/engine-registry'; import { AgentStreamGateway } from '../../ws/agent-stream.gateway'; @@ -958,7 +958,7 @@ export class AgentController { sessionKey, idempotencyKey: task.id, callbackUrl, - callbackData: { sessionId: session.id, taskId: task.id }, + callbackData: { sessionId: session.id, taskId: task.id, tenantId }, ...(bridgeAttachments && bridgeAttachments.length > 0 && { attachments: bridgeAttachments }), }), }) @@ -1023,11 +1023,11 @@ export class AgentController { result?: string; error?: string; isTimeout?: boolean; - callbackData: { sessionId: string; taskId: string }; + callbackData: { sessionId: string; taskId: string; tenantId?: string }; }, ) { const { ok, result, error, isTimeout, callbackData } = body; - const { sessionId, taskId } = callbackData ?? {}; + const { sessionId, taskId, tenantId } = callbackData ?? {}; this.logger.log( `OpenClaw app callback: ok=${ok} taskId=${taskId} sessionId=${sessionId} ` + @@ -1046,41 +1046,64 @@ export class AgentController { this.pendingCallbackTimers.delete(taskId); } - const task = await this.taskRepository.findById(taskId); - + // Emit WS events immediately (no tenant context needed). if (ok && result) { - // Emit text + completed events so Flutter's WS stream receives the reply this.gateway.emitStreamEvent(sessionId, { type: 'text', content: result }); this.gateway.emitStreamEvent(sessionId, { type: 'completed', summary: result, tokensUsed: 0 }); - - // Persist assistant reply to conversation history - await this.contextService.saveAssistantMessage(sessionId, result); - - if (task) { - task.status = TaskStatus.COMPLETED; - task.result = result; - task.completedAt = new Date(); - await this.taskRepository.save(task); - } - - const session = await this.sessionRepository.findById(sessionId); - if (session) { - session.status = 'active'; - session.updatedAt = new Date(); - await this.sessionRepository.save(session); - } } else { const errorMsg = isTimeout ? '智能体响应超时,请重试' : (error || '智能体发生错误'); this.gateway.emitStreamEvent(sessionId, { type: 'error', message: errorMsg }); - - if (task) { - task.status = TaskStatus.FAILED; - task.result = errorMsg; - task.completedAt = new Date(); - await this.taskRepository.save(task); - } } + // DB updates require tenant context — run inside TenantContextService.run(). + // tenantId is included in callbackData by executeInstanceTask. + if (!tenantId) { + this.logger.warn(`[Task ${taskId}] OpenClaw callback missing tenantId — skipping DB update`); + return { received: true }; + } + + await TenantContextService.run( + { + tenantId, + tenantName: tenantId, + plan: 'enterprise', + schemaName: `it0_t_${tenantId}`, + maxServers: -1, + maxUsers: -1, + maxStandingOrders: -1, + maxAgentTokensPerMonth: -1, + }, + async () => { + const task = await this.taskRepository.findById(taskId); + + if (ok && result) { + await this.contextService.saveAssistantMessage(sessionId, result); + + if (task) { + task.status = TaskStatus.COMPLETED; + task.result = result; + task.completedAt = new Date(); + await this.taskRepository.save(task); + } + + const session = await this.sessionRepository.findById(sessionId); + if (session) { + session.status = 'active'; + session.updatedAt = new Date(); + await this.sessionRepository.save(session); + } + } else { + const errorMsg = isTimeout ? '智能体响应超时,请重试' : (error || '智能体发生错误'); + if (task) { + task.status = TaskStatus.FAILED; + task.result = errorMsg; + task.completedAt = new Date(); + await this.taskRepository.save(task); + } + } + }, + ); + return { received: true }; }