feat(dingtalk): async callback pattern for LLM tasks (no 55s timeout)

Bridge:
- Add /task-async endpoint: returns immediately, POSTs result to callbackUrl
- Supports arbitrarily long LLM tasks (2 min default timeout)

Agent-service:
- Add POST /api/v1/agent/channels/dingtalk/bridge-callback endpoint
- DingTalkRouterService: pendingCallbacks map + resolveCallbackReply()
- routeToAgent: fire /task-async, register callback Promise, await result
- Serial queue preserved: next message starts only after callback resolves
- CALLBACK_TIMEOUT_MS = 3 min (was effectively 55s before)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-03-09 00:36:00 -07:00
parent be477c73c6
commit 865b246345
3 changed files with 166 additions and 29 deletions

View File

@ -99,6 +99,56 @@ app.post('/task', async (req, res) => {
} }
}); });
// Submit a task asynchronously — returns immediately, POSTs result to callbackUrl when done.
//
// Request body (same as /task, plus):
// callbackUrl — URL to POST the result to when the agent replies
// callbackData — opaque object forwarded unchanged in the callback body
//
// Response: { ok: true, pending: true }
// Callback body (POST to callbackUrl):
// { ok: true, result: string, callbackData } — on success
// { ok: false, error: string, callbackData } — on LLM/timeout error
// { ok: false, error: "callback_url_missing", ... } — config error (logged only)
app.post('/task-async', async (req, res) => {
const callbackUrl: string | undefined = req.body.callbackUrl;
if (!callbackUrl) {
res.status(400).json({ error: 'callbackUrl is required' });
return;
}
if (!ocClient.isConnected()) {
res.status(503).json({ error: 'Gateway not connected' });
return;
}
const sessionKey = req.body.sessionKey ?? `agent:main:${req.body.sessionId ?? 'main'}`;
const timeoutSeconds: number = req.body.timeoutSeconds ?? 120; // 2 min default for async tasks
const idempotencyKey: string = req.body.idempotencyKey ?? crypto.randomUUID();
const callbackData = req.body.callbackData ?? {};
// Return immediately — LLM runs in background
res.json({ ok: true, pending: true });
const postCallback = (body: object) => {
fetch(callbackUrl, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(body),
}).catch((e: Error) => console.error('[bridge] callback POST failed:', e.message));
};
ocClient.chatSendAndWait({
sessionKey,
message: req.body.prompt,
idempotencyKey,
timeoutMs: timeoutSeconds * 1000,
}).then((reply: string) => {
postCallback({ ok: true, result: reply, callbackData });
}).catch((err: Error) => {
postCallback({ ok: false, error: err.message, callbackData });
});
});
// List sessions // List sessions
app.get('/sessions', async (_req, res) => { app.get('/sessions', async (_req, res) => {
if (!ocClient.isConnected()) { if (!ocClient.isConnected()) {

View File

@ -80,7 +80,8 @@ const OAUTH_STATE_TTL_MS = 10 * 60 * 1000; // 10 min
const TOKEN_REFRESH_BUFFER = 300; // seconds before expiry to proactively refresh const TOKEN_REFRESH_BUFFER = 300; // seconds before expiry to proactively refresh
const WS_RECONNECT_BASE_MS = 2_000; const WS_RECONNECT_BASE_MS = 2_000;
const WS_RECONNECT_MAX_MS = 60_000; const WS_RECONNECT_MAX_MS = 60_000;
const TASK_TIMEOUT_S = 55; // seconds — bridge default is 25s; must pass explicitly const TASK_TIMEOUT_S = 120; // seconds — async bridge timeout (LLM may run >1 min)
const CALLBACK_TIMEOUT_MS = 180_000; // 3 min — max wait for async bridge callback
const DEDUP_TTL_MS = 10 * 60 * 1000; const DEDUP_TTL_MS = 10 * 60 * 1000;
const RATE_LIMIT_PER_MIN = 10; const RATE_LIMIT_PER_MIN = 10;
const QUEUE_MAX_DEPTH = 5; const QUEUE_MAX_DEPTH = 5;
@ -95,6 +96,8 @@ export class DingTalkRouterService implements OnModuleInit, OnModuleDestroy {
private readonly clientId: string; private readonly clientId: string;
private readonly clientSecret: string; private readonly clientSecret: string;
private readonly enabled: boolean; private readonly enabled: boolean;
/** Base URL for bridge callbacks — same value as AGENT_SERVICE_PUBLIC_URL */
private readonly agentCallbackBaseUrl: string;
// Token // Token
private token = ''; private token = '';
@ -117,6 +120,12 @@ export class DingTalkRouterService implements OnModuleInit, OnModuleDestroy {
private readonly rateWindows = new Map<string, number[]>(); // userId → timestamps private readonly rateWindows = new Map<string, number[]>(); // userId → timestamps
private readonly queueTails = new Map<string, Promise<void>>(); // userId → tail private readonly queueTails = new Map<string, Promise<void>>(); // userId → tail
private readonly queueDepths = new Map<string, number>(); // userId → depth private readonly queueDepths = new Map<string, number>(); // userId → depth
/** Pending async bridge callbacks: msgId → { resolve, reject, timer } */
private readonly pendingCallbacks = new Map<string, {
resolve: (reply: string) => void;
reject: (e: Error) => void;
timer: NodeJS.Timeout;
}>();
constructor( constructor(
private readonly configService: ConfigService, private readonly configService: ConfigService,
@ -125,6 +134,7 @@ export class DingTalkRouterService implements OnModuleInit, OnModuleDestroy {
this.clientId = this.configService.get<string>('IT0_DINGTALK_CLIENT_ID', ''); this.clientId = this.configService.get<string>('IT0_DINGTALK_CLIENT_ID', '');
this.clientSecret = this.configService.get<string>('IT0_DINGTALK_CLIENT_SECRET', ''); this.clientSecret = this.configService.get<string>('IT0_DINGTALK_CLIENT_SECRET', '');
this.enabled = !!(this.clientId && this.clientSecret); this.enabled = !!(this.clientId && this.clientSecret);
this.agentCallbackBaseUrl = this.configService.get<string>('AGENT_SERVICE_PUBLIC_URL', '');
} }
onModuleInit(): void { onModuleInit(): void {
@ -146,12 +156,39 @@ export class DingTalkRouterService implements OnModuleInit, OnModuleDestroy {
clearTimeout(this.reconnectTimer); clearTimeout(this.reconnectTimer);
clearTimeout(this.tokenRefreshTimer); clearTimeout(this.tokenRefreshTimer);
this.terminateCurrentWs(); this.terminateCurrentWs();
// Reject all pending callbacks so the queue doesn't hang on shutdown
for (const [, cb] of this.pendingCallbacks) {
clearTimeout(cb.timer);
cb.reject(new Error('Service shutting down'));
}
this.pendingCallbacks.clear();
} }
isEnabled(): boolean { isEnabled(): boolean {
return this.enabled; return this.enabled;
} }
// ── Async bridge callback ──────────────────────────────────────────────────
/**
* Called by AgentChannelController when the OpenClaw bridge POSTs a callback.
* Resolves (or rejects) the Promise that routeToAgent is awaiting.
*/
resolveCallbackReply(msgId: string, ok: boolean, content: string): void {
const cb = this.pendingCallbacks.get(msgId);
if (!cb) {
this.logger.warn(`Received callback for unknown msgId=${msgId} (already resolved or timed out)`);
return;
}
this.pendingCallbacks.delete(msgId);
clearTimeout(cb.timer);
if (ok) {
cb.resolve(content);
} else {
cb.reject(new Error(content));
}
}
// ── Binding code API ─────────────────────────────────────────────────────── // ── Binding code API ───────────────────────────────────────────────────────
generateBindingCode(instanceId: string): { code: string; expiresAt: number } { generateBindingCode(instanceId: string): { code: string; expiresAt: number } {
@ -615,41 +652,60 @@ export class DingTalkRouterService implements OnModuleInit, OnModuleDestroy {
return; return;
} }
const bridgeUrl = `http://${instance.serverHost}:${instance.hostPort}/task`; const asyncBridgeUrl = `http://${instance.serverHost}:${instance.hostPort}/task-async`;
this.logger.log(`Routing msgId=${msg.msgId} → instance ${instance.id} (${instance.name}) @ ${bridgeUrl}`); const callbackUrl = `${this.agentCallbackBaseUrl}/api/v1/agent/channels/dingtalk/bridge-callback`;
this.logger.log(
`Routing msgId=${msg.msgId} → instance ${instance.id} (${instance.name}) ` +
`async @ ${asyncBridgeUrl}, callback=${callbackUrl}`,
);
// sessionWebhook TTL is ~90 minutes (per DingTalk docs), but delivering the actual LLM // Async bridge strategy:
// response synchronously makes the user wait with no feedback. Strategy: // 1. Immediately send "处理中..." via sessionWebhook — instant ack to user
// 1. Immediately send "处理中..." via sessionWebhook — user sees instant acknowledgment // 2. POST to /task-async → bridge returns immediately, LLM runs in background
// 2. Await the bridge call (LLM processing) — the serial queue still blocks here, // 3. Bridge POSTs result to callbackUrl when done
// preventing concurrent LLM calls for the same user // 4. resolveCallbackReply() fires → awaited Promise resolves → batchSend
// 3. Always deliver the actual response via batchSend — decoupled from webhook window // Serial queue is maintained: routeToAgent awaits the callback Promise,
// so the next queued message only starts after the current one completes.
this.reply(msg, '🤔 小虾米正在思考,稍等...'); this.reply(msg, '🤔 小虾米正在思考,稍等...');
let reply: string; let reply: string;
try { try {
const result = await this.httpPostJson<{ ok: boolean; result?: unknown; error?: string }>( // Register callback before posting (avoids race if bridge responds instantly)
bridgeUrl, const callbackPromise = new Promise<string>((resolve, reject) => {
const timer = setTimeout(() => {
this.pendingCallbacks.delete(msg.msgId);
reject(new Error(`Async bridge callback timeout after ${CALLBACK_TIMEOUT_MS / 1000}s`));
}, CALLBACK_TIMEOUT_MS);
this.pendingCallbacks.set(msg.msgId, { resolve, reject, timer });
});
// Fire the async bridge request (short timeout — only waiting for initial ACK)
const ack = await this.httpPostJson<{ ok: boolean; pending?: boolean; error?: string }>(
asyncBridgeUrl,
{ {
prompt: text, prompt: text,
sessionKey: `agent:main:dt-${userId}`, sessionKey: `agent:main:dt-${userId}`,
idempotencyKey: msg.msgId, idempotencyKey: msg.msgId,
timeoutSeconds: TASK_TIMEOUT_S, timeoutSeconds: TASK_TIMEOUT_S,
callbackUrl,
callbackData: { staffId, msgId: msg.msgId },
}, },
(TASK_TIMEOUT_S + 10) * 1000, 15_000, // 15s for initial ACK only
); );
if (result.ok && result.result !== undefined) { if (!ack.ok) {
reply = typeof result.result === 'string' // Bridge rejected the task
? result.result this.pendingCallbacks.delete(msg.msgId);
: JSON.stringify(result.result, null, 2); reply = ack.error ?? '智能体拒绝了请求,请稍后重试。';
this.logger.log(`Bridge OK for instance ${instance.id}, reply length=${reply.length}`); this.logger.warn(`Bridge rejected async task for instance ${instance.id}: ${reply}`);
} else { } else {
reply = result.error ?? '智能体没有返回内容。'; // Wait for the callback (may take 13 minutes for complex tasks)
this.logger.warn(`Bridge returned error for instance ${instance.id}: ${reply}`); reply = await callbackPromise;
this.logger.log(`Bridge callback received for instance ${instance.id}, reply length=${reply.length}`);
} }
} catch (e: any) { } catch (e: any) {
this.logger.error(`Bridge call failed for instance ${instance.id}:`, e.message); this.pendingCallbacks.delete(msg.msgId);
this.logger.error(`Async bridge failed for instance ${instance.id}:`, e.message);
reply = '与小龙虾通信时出现错误,请稍后重试。'; reply = '与小龙虾通信时出现错误,请稍后重试。';
} }

View File

@ -4,10 +4,12 @@ import {
Get, Get,
Param, Param,
Query, Query,
Body,
NotFoundException, NotFoundException,
ServiceUnavailableException, ServiceUnavailableException,
BadRequestException, BadRequestException,
Res, Res,
Logger,
} from '@nestjs/common'; } from '@nestjs/common';
import { Response } from 'express'; import { Response } from 'express';
import { DingTalkRouterService } from '../../../infrastructure/dingtalk/dingtalk-router.service'; import { DingTalkRouterService } from '../../../infrastructure/dingtalk/dingtalk-router.service';
@ -15,6 +17,8 @@ import { AgentInstanceRepository } from '../../../infrastructure/repositories/ag
@Controller('api/v1/agent/channels') @Controller('api/v1/agent/channels')
export class AgentChannelController { export class AgentChannelController {
private readonly logger = new Logger(AgentChannelController.name);
constructor( constructor(
private readonly dingTalkRouter: DingTalkRouterService, private readonly dingTalkRouter: DingTalkRouterService,
private readonly instanceRepo: AgentInstanceRepository, private readonly instanceRepo: AgentInstanceRepository,
@ -95,6 +99,33 @@ export class AgentChannelController {
} }
} }
/**
* Bridge async-task callback called by the OpenClaw bridge when an async LLM
* task completes. Unauthenticated; the bridge is an internal service on our
* own infrastructure. The callbackData carries the staffId + msgId that
* DingTalkRouterService is waiting on.
*
* Called by: openclaw-bridge /task-async POST <IT0_AGENT_SERVICE_URL>/api/v1/agent/channels/dingtalk/bridge-callback
*/
@Post('dingtalk/bridge-callback')
handleBridgeCallback(
@Body() body: {
ok: boolean;
result?: string;
error?: string;
callbackData: { staffId: string; msgId: string };
},
) {
const { ok, result, error, callbackData } = body;
const { staffId, msgId } = callbackData ?? {};
this.logger.log(
`Bridge callback: ok=${ok} msgId=${msgId} staffId=${staffId} ` +
`${ok ? `replyLen=${result?.length ?? 0}` : `error=${error}`}`,
);
this.dingTalkRouter.resolveCallbackReply(msgId, ok, ok ? (result ?? '') : (error ?? '智能体没有返回内容。'));
return { received: true };
}
private htmlPage(title: string, message: string, success: boolean): string { private htmlPage(title: string, message: string, success: boolean): string {
const color = success ? '#22C55E' : '#EF4444'; const color = success ? '#22C55E' : '#EF4444';
const icon = success ? '✅' : '❌'; const icon = success ? '✅' : '❌';