diff --git a/packages/openclaw-bridge/src/index.ts b/packages/openclaw-bridge/src/index.ts index fb8019e..b39f27b 100644 --- a/packages/openclaw-bridge/src/index.ts +++ b/packages/openclaw-bridge/src/index.ts @@ -99,6 +99,56 @@ app.post('/task', async (req, res) => { } }); +// Submit a task asynchronously — returns immediately, POSTs result to callbackUrl when done. +// +// Request body (same as /task, plus): +// callbackUrl — URL to POST the result to when the agent replies +// callbackData — opaque object forwarded unchanged in the callback body +// +// Response: { ok: true, pending: true } +// Callback body (POST to callbackUrl): +// { ok: true, result: string, callbackData } — on success +// { ok: false, error: string, callbackData } — on LLM/timeout error +// { ok: false, error: "callback_url_missing", ... } — config error (logged only) +app.post('/task-async', async (req, res) => { + const callbackUrl: string | undefined = req.body.callbackUrl; + if (!callbackUrl) { + res.status(400).json({ error: 'callbackUrl is required' }); + return; + } + if (!ocClient.isConnected()) { + res.status(503).json({ error: 'Gateway not connected' }); + return; + } + + const sessionKey = req.body.sessionKey ?? `agent:main:${req.body.sessionId ?? 'main'}`; + const timeoutSeconds: number = req.body.timeoutSeconds ?? 120; // 2 min default for async tasks + const idempotencyKey: string = req.body.idempotencyKey ?? crypto.randomUUID(); + const callbackData = req.body.callbackData ?? {}; + + // Return immediately — LLM runs in background + res.json({ ok: true, pending: true }); + + const postCallback = (body: object) => { + fetch(callbackUrl, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(body), + }).catch((e: Error) => console.error('[bridge] callback POST failed:', e.message)); + }; + + ocClient.chatSendAndWait({ + sessionKey, + message: req.body.prompt, + idempotencyKey, + timeoutMs: timeoutSeconds * 1000, + }).then((reply: string) => { + postCallback({ ok: true, result: reply, callbackData }); + }).catch((err: Error) => { + postCallback({ ok: false, error: err.message, callbackData }); + }); +}); + // List sessions app.get('/sessions', async (_req, res) => { if (!ocClient.isConnected()) { diff --git a/packages/services/agent-service/src/infrastructure/dingtalk/dingtalk-router.service.ts b/packages/services/agent-service/src/infrastructure/dingtalk/dingtalk-router.service.ts index 06ea1f1..47574d0 100644 --- a/packages/services/agent-service/src/infrastructure/dingtalk/dingtalk-router.service.ts +++ b/packages/services/agent-service/src/infrastructure/dingtalk/dingtalk-router.service.ts @@ -80,7 +80,8 @@ const OAUTH_STATE_TTL_MS = 10 * 60 * 1000; // 10 min const TOKEN_REFRESH_BUFFER = 300; // seconds before expiry to proactively refresh const WS_RECONNECT_BASE_MS = 2_000; const WS_RECONNECT_MAX_MS = 60_000; -const TASK_TIMEOUT_S = 55; // seconds — bridge default is 25s; must pass explicitly +const TASK_TIMEOUT_S = 120; // seconds — async bridge timeout (LLM may run >1 min) +const CALLBACK_TIMEOUT_MS = 180_000; // 3 min — max wait for async bridge callback const DEDUP_TTL_MS = 10 * 60 * 1000; const RATE_LIMIT_PER_MIN = 10; const QUEUE_MAX_DEPTH = 5; @@ -95,6 +96,8 @@ export class DingTalkRouterService implements OnModuleInit, OnModuleDestroy { private readonly clientId: string; private readonly clientSecret: string; private readonly enabled: boolean; + /** Base URL for bridge callbacks — same value as AGENT_SERVICE_PUBLIC_URL */ + private readonly agentCallbackBaseUrl: string; // Token private token = ''; @@ -111,20 +114,27 @@ export class DingTalkRouterService implements OnModuleInit, OnModuleDestroy { private cleanupTimer?: NodeJS.Timeout; // State - private readonly bindingCodes = new Map(); // code → entry - private readonly oauthStates = new Map(); // state → entry - private readonly dedup = new Map(); // msgId → ts - private readonly rateWindows = new Map(); // userId → timestamps - private readonly queueTails = new Map>(); // userId → tail - private readonly queueDepths = new Map(); // userId → depth + private readonly bindingCodes = new Map(); // code → entry + private readonly oauthStates = new Map(); // state → entry + private readonly dedup = new Map(); // msgId → ts + private readonly rateWindows = new Map(); // userId → timestamps + private readonly queueTails = new Map>(); // userId → tail + private readonly queueDepths = new Map(); // userId → depth + /** Pending async bridge callbacks: msgId → { resolve, reject, timer } */ + private readonly pendingCallbacks = new Map void; + reject: (e: Error) => void; + timer: NodeJS.Timeout; + }>(); constructor( private readonly configService: ConfigService, private readonly instanceRepo: AgentInstanceRepository, ) { - this.clientId = this.configService.get('IT0_DINGTALK_CLIENT_ID', ''); - this.clientSecret = this.configService.get('IT0_DINGTALK_CLIENT_SECRET', ''); - this.enabled = !!(this.clientId && this.clientSecret); + this.clientId = this.configService.get('IT0_DINGTALK_CLIENT_ID', ''); + this.clientSecret = this.configService.get('IT0_DINGTALK_CLIENT_SECRET', ''); + this.enabled = !!(this.clientId && this.clientSecret); + this.agentCallbackBaseUrl = this.configService.get('AGENT_SERVICE_PUBLIC_URL', ''); } onModuleInit(): void { @@ -146,12 +156,39 @@ export class DingTalkRouterService implements OnModuleInit, OnModuleDestroy { clearTimeout(this.reconnectTimer); clearTimeout(this.tokenRefreshTimer); this.terminateCurrentWs(); + // Reject all pending callbacks so the queue doesn't hang on shutdown + for (const [, cb] of this.pendingCallbacks) { + clearTimeout(cb.timer); + cb.reject(new Error('Service shutting down')); + } + this.pendingCallbacks.clear(); } isEnabled(): boolean { return this.enabled; } + // ── Async bridge callback ────────────────────────────────────────────────── + + /** + * Called by AgentChannelController when the OpenClaw bridge POSTs a callback. + * Resolves (or rejects) the Promise that routeToAgent is awaiting. + */ + resolveCallbackReply(msgId: string, ok: boolean, content: string): void { + const cb = this.pendingCallbacks.get(msgId); + if (!cb) { + this.logger.warn(`Received callback for unknown msgId=${msgId} (already resolved or timed out)`); + return; + } + this.pendingCallbacks.delete(msgId); + clearTimeout(cb.timer); + if (ok) { + cb.resolve(content); + } else { + cb.reject(new Error(content)); + } + } + // ── Binding code API ─────────────────────────────────────────────────────── generateBindingCode(instanceId: string): { code: string; expiresAt: number } { @@ -615,41 +652,60 @@ export class DingTalkRouterService implements OnModuleInit, OnModuleDestroy { return; } - const bridgeUrl = `http://${instance.serverHost}:${instance.hostPort}/task`; - this.logger.log(`Routing msgId=${msg.msgId} → instance ${instance.id} (${instance.name}) @ ${bridgeUrl}`); + const asyncBridgeUrl = `http://${instance.serverHost}:${instance.hostPort}/task-async`; + const callbackUrl = `${this.agentCallbackBaseUrl}/api/v1/agent/channels/dingtalk/bridge-callback`; + this.logger.log( + `Routing msgId=${msg.msgId} → instance ${instance.id} (${instance.name}) ` + + `async @ ${asyncBridgeUrl}, callback=${callbackUrl}`, + ); - // sessionWebhook TTL is ~90 minutes (per DingTalk docs), but delivering the actual LLM - // response synchronously makes the user wait with no feedback. Strategy: - // 1. Immediately send "处理中..." via sessionWebhook — user sees instant acknowledgment - // 2. Await the bridge call (LLM processing) — the serial queue still blocks here, - // preventing concurrent LLM calls for the same user - // 3. Always deliver the actual response via batchSend — decoupled from webhook window + // Async bridge strategy: + // 1. Immediately send "处理中..." via sessionWebhook — instant ack to user + // 2. POST to /task-async → bridge returns immediately, LLM runs in background + // 3. Bridge POSTs result to callbackUrl when done + // 4. resolveCallbackReply() fires → awaited Promise resolves → batchSend + // Serial queue is maintained: routeToAgent awaits the callback Promise, + // so the next queued message only starts after the current one completes. this.reply(msg, '🤔 小虾米正在思考,稍等...'); let reply: string; try { - const result = await this.httpPostJson<{ ok: boolean; result?: unknown; error?: string }>( - bridgeUrl, + // Register callback before posting (avoids race if bridge responds instantly) + const callbackPromise = new Promise((resolve, reject) => { + const timer = setTimeout(() => { + this.pendingCallbacks.delete(msg.msgId); + reject(new Error(`Async bridge callback timeout after ${CALLBACK_TIMEOUT_MS / 1000}s`)); + }, CALLBACK_TIMEOUT_MS); + this.pendingCallbacks.set(msg.msgId, { resolve, reject, timer }); + }); + + // Fire the async bridge request (short timeout — only waiting for initial ACK) + const ack = await this.httpPostJson<{ ok: boolean; pending?: boolean; error?: string }>( + asyncBridgeUrl, { prompt: text, sessionKey: `agent:main:dt-${userId}`, idempotencyKey: msg.msgId, timeoutSeconds: TASK_TIMEOUT_S, + callbackUrl, + callbackData: { staffId, msgId: msg.msgId }, }, - (TASK_TIMEOUT_S + 10) * 1000, + 15_000, // 15s for initial ACK only ); - if (result.ok && result.result !== undefined) { - reply = typeof result.result === 'string' - ? result.result - : JSON.stringify(result.result, null, 2); - this.logger.log(`Bridge OK for instance ${instance.id}, reply length=${reply.length}`); + if (!ack.ok) { + // Bridge rejected the task + this.pendingCallbacks.delete(msg.msgId); + reply = ack.error ?? '智能体拒绝了请求,请稍后重试。'; + this.logger.warn(`Bridge rejected async task for instance ${instance.id}: ${reply}`); } else { - reply = result.error ?? '智能体没有返回内容。'; - this.logger.warn(`Bridge returned error for instance ${instance.id}: ${reply}`); + // Wait for the callback (may take 1–3 minutes for complex tasks) + reply = await callbackPromise; + this.logger.log(`Bridge callback received for instance ${instance.id}, reply length=${reply.length}`); } } catch (e: any) { - this.logger.error(`Bridge call failed for instance ${instance.id}:`, e.message); + this.pendingCallbacks.delete(msg.msgId); + this.logger.error(`Async bridge failed for instance ${instance.id}:`, e.message); reply = '与小龙虾通信时出现错误,请稍后重试。'; } diff --git a/packages/services/agent-service/src/interfaces/rest/controllers/agent-channel.controller.ts b/packages/services/agent-service/src/interfaces/rest/controllers/agent-channel.controller.ts index e03de6e..b6fc8e0 100644 --- a/packages/services/agent-service/src/interfaces/rest/controllers/agent-channel.controller.ts +++ b/packages/services/agent-service/src/interfaces/rest/controllers/agent-channel.controller.ts @@ -4,10 +4,12 @@ import { Get, Param, Query, + Body, NotFoundException, ServiceUnavailableException, BadRequestException, Res, + Logger, } from '@nestjs/common'; import { Response } from 'express'; import { DingTalkRouterService } from '../../../infrastructure/dingtalk/dingtalk-router.service'; @@ -15,6 +17,8 @@ import { AgentInstanceRepository } from '../../../infrastructure/repositories/ag @Controller('api/v1/agent/channels') export class AgentChannelController { + private readonly logger = new Logger(AgentChannelController.name); + constructor( private readonly dingTalkRouter: DingTalkRouterService, private readonly instanceRepo: AgentInstanceRepository, @@ -95,6 +99,33 @@ export class AgentChannelController { } } + /** + * Bridge async-task callback — called by the OpenClaw bridge when an async LLM + * task completes. Unauthenticated; the bridge is an internal service on our + * own infrastructure. The callbackData carries the staffId + msgId that + * DingTalkRouterService is waiting on. + * + * Called by: openclaw-bridge /task-async → POST /api/v1/agent/channels/dingtalk/bridge-callback + */ + @Post('dingtalk/bridge-callback') + handleBridgeCallback( + @Body() body: { + ok: boolean; + result?: string; + error?: string; + callbackData: { staffId: string; msgId: string }; + }, + ) { + const { ok, result, error, callbackData } = body; + const { staffId, msgId } = callbackData ?? {}; + this.logger.log( + `Bridge callback: ok=${ok} msgId=${msgId} staffId=${staffId} ` + + `${ok ? `replyLen=${result?.length ?? 0}` : `error=${error}`}`, + ); + this.dingTalkRouter.resolveCallbackReply(msgId, ok, ok ? (result ?? '') : (error ?? '智能体没有返回内容。')); + return { received: true }; + } + private htmlPage(title: string, message: string, success: boolean): string { const color = success ? '#22C55E' : '#EF4444'; const icon = success ? '✅' : '❌';