fix(agent): fix openclaw-app-callback Tenant context not initialized

The callback endpoint is public (no JWT), so TenantAwareRepository
calls failed with 'Tenant context not initialized'.

Fix:
1. Include tenantId in callbackData sent to the bridge
2. Wrap all DB operations in TenantContextService.run() using the
   tenantId from callbackData
3. Emit WS events immediately (no tenant context needed) so Flutter
   receives the reply even if DB update fails

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-03-10 09:30:58 -07:00
parent a8c72aca76
commit 265730acb2
1 changed files with 54 additions and 31 deletions

View File

@ -1,7 +1,7 @@
import { Controller, Post, Body, Param, Delete, Get, NotFoundException, BadRequestException, ForbiddenException, Logger, UseInterceptors, UploadedFile, Req } from '@nestjs/common';
import { FileInterceptor } from '@nestjs/platform-express';
import { memoryStorage } from 'multer';
import { TenantId, EventPatterns } from '@it0/common';
import { TenantId, EventPatterns, TenantContextService } from '@it0/common';
import { OpenAISttService } from '../../../infrastructure/stt/openai-stt.service';
import { EngineRegistry } from '../../../infrastructure/engines/engine-registry';
import { AgentStreamGateway } from '../../ws/agent-stream.gateway';
@ -958,7 +958,7 @@ export class AgentController {
sessionKey,
idempotencyKey: task.id,
callbackUrl,
callbackData: { sessionId: session.id, taskId: task.id },
callbackData: { sessionId: session.id, taskId: task.id, tenantId },
...(bridgeAttachments && bridgeAttachments.length > 0 && { attachments: bridgeAttachments }),
}),
})
@ -1023,11 +1023,11 @@ export class AgentController {
result?: string;
error?: string;
isTimeout?: boolean;
callbackData: { sessionId: string; taskId: string };
callbackData: { sessionId: string; taskId: string; tenantId?: string };
},
) {
const { ok, result, error, isTimeout, callbackData } = body;
const { sessionId, taskId } = callbackData ?? {};
const { sessionId, taskId, tenantId } = callbackData ?? {};
this.logger.log(
`OpenClaw app callback: ok=${ok} taskId=${taskId} sessionId=${sessionId} ` +
@ -1046,41 +1046,64 @@ export class AgentController {
this.pendingCallbackTimers.delete(taskId);
}
const task = await this.taskRepository.findById(taskId);
// Emit WS events immediately (no tenant context needed).
if (ok && result) {
// Emit text + completed events so Flutter's WS stream receives the reply
this.gateway.emitStreamEvent(sessionId, { type: 'text', content: result });
this.gateway.emitStreamEvent(sessionId, { type: 'completed', summary: result, tokensUsed: 0 });
// Persist assistant reply to conversation history
await this.contextService.saveAssistantMessage(sessionId, result);
if (task) {
task.status = TaskStatus.COMPLETED;
task.result = result;
task.completedAt = new Date();
await this.taskRepository.save(task);
}
const session = await this.sessionRepository.findById(sessionId);
if (session) {
session.status = 'active';
session.updatedAt = new Date();
await this.sessionRepository.save(session);
}
} else {
const errorMsg = isTimeout ? '智能体响应超时,请重试' : (error || '智能体发生错误');
this.gateway.emitStreamEvent(sessionId, { type: 'error', message: errorMsg });
if (task) {
task.status = TaskStatus.FAILED;
task.result = errorMsg;
task.completedAt = new Date();
await this.taskRepository.save(task);
}
}
// DB updates require tenant context — run inside TenantContextService.run().
// tenantId is included in callbackData by executeInstanceTask.
if (!tenantId) {
this.logger.warn(`[Task ${taskId}] OpenClaw callback missing tenantId — skipping DB update`);
return { received: true };
}
await TenantContextService.run(
{
tenantId,
tenantName: tenantId,
plan: 'enterprise',
schemaName: `it0_t_${tenantId}`,
maxServers: -1,
maxUsers: -1,
maxStandingOrders: -1,
maxAgentTokensPerMonth: -1,
},
async () => {
const task = await this.taskRepository.findById(taskId);
if (ok && result) {
await this.contextService.saveAssistantMessage(sessionId, result);
if (task) {
task.status = TaskStatus.COMPLETED;
task.result = result;
task.completedAt = new Date();
await this.taskRepository.save(task);
}
const session = await this.sessionRepository.findById(sessionId);
if (session) {
session.status = 'active';
session.updatedAt = new Date();
await this.sessionRepository.save(session);
}
} else {
const errorMsg = isTimeout ? '智能体响应超时,请重试' : (error || '智能体发生错误');
if (task) {
task.status = TaskStatus.FAILED;
task.result = errorMsg;
task.completedAt = new Date();
await this.taskRepository.save(task);
}
}
},
);
return { received: true };
}