feat(dingtalk+bridge): event-based agent reply + greeting on binding

openclaw-bridge:
- index.ts: /task endpoint now calls chatSendAndWait() with idempotencyKey
  (removes broken timeoutSeconds param; uses caller-supplied msgId for dedup)
- openclaw-client.ts: added onEvent() subscription + chatSendAndWait() that
  subscribes to 'chat' WS events, waits for state='final' matching runId,
  and extracts text from the message payload

dingtalk-router:
- After OAuth binding completes, sends a proactive greeting to the user via
  DingTalk batchSend API (/v1.0/robot/oToMessages/batchSend) introducing the
  agent by name and explaining what it can do

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-03-08 13:18:52 -07:00
parent aca4dd0177
commit 5cf72c4780
3 changed files with 164 additions and 13 deletions

View File

@ -7,6 +7,7 @@
*/
import 'dotenv/config';
import express from 'express';
import * as crypto from 'crypto';
import { OpenClawClient } from './openclaw-client';
const PORT = parseInt(process.env.BRIDGE_PORT ?? '3000', 10);
@ -60,9 +61,19 @@ app.get('/status', (_req, res) => {
});
});
// Submit a task to OpenClaw
// Uses chat.send (Protocol v3). sessionKey format: "agent:main:<name>"
// timeoutSeconds: how long to wait for agent reply (0=fire-and-forget, default 25)
// Submit a task to OpenClaw and wait for the agent's final reply.
//
// OpenClaw Protocol v3 flow:
// 1. POST /task → bridge calls chat.send → gets { runId, status:"started" } ack
// 2. Agent processes, pushes WS events (type:"event", event:"chat")
// 3. Final event { state:"final", message:{content:[{type:"text",text:"..."}]} }
// is captured by the bridge's event listener and returned as the HTTP response.
//
// Request body:
// sessionKey — OpenClaw session key, e.g. "agent:main:dt-<userId>"
// prompt — message text to send
// idempotencyKey — caller-supplied unique key (DingTalk msgId etc.) for dedup
// timeoutSeconds — max seconds to wait for agent reply (default 25)
app.post('/task', async (req, res) => {
if (!ocClient.isConnected()) {
res.status(503).json({ error: 'Gateway not connected' });
@ -71,12 +82,18 @@ app.post('/task', async (req, res) => {
try {
const sessionKey = req.body.sessionKey ?? `agent:main:${req.body.sessionId ?? 'main'}`;
const timeoutSeconds: number = req.body.timeoutSeconds ?? 25;
const result = await ocClient.rpc(
'chat.send',
{ sessionKey, message: req.body.prompt, timeoutSeconds },
(timeoutSeconds + 5) * 1000, // rpc timeout = agent timeout + 5s buffer
);
res.json({ ok: true, result });
// idempotencyKey is mandatory in chat.send (Protocol v3). Use caller-supplied value
// (so DingTalk msgId is preserved for dedup), or generate a UUID for ad-hoc calls.
const idempotencyKey: string = req.body.idempotencyKey ?? crypto.randomUUID();
const reply = await ocClient.chatSendAndWait({
sessionKey,
message: req.body.prompt,
idempotencyKey,
timeoutMs: timeoutSeconds * 1000,
});
res.json({ ok: true, result: reply });
} catch (err: any) {
res.status(500).json({ error: err.message });
}

View File

@ -72,8 +72,10 @@ export class OpenClawClient {
private ws: WebSocket | null = null;
private pending = new Map<
string,
{ resolve: (v: unknown) => void; reject: (e: Error) => void }
{ resolve: (v: unknown) => void; reject: (e: Error) => void; expectFinal: boolean }
>();
// Event subscriptions: event name → list of handlers
private eventHandlers = new Map<string, Array<(payload: unknown) => void>>();
private connected = false;
private msgCounter = 0;
@ -160,6 +162,18 @@ export class OpenClawClient {
if (frame.type === 'res') {
const p = this.pending.get(frame.id);
if (!p) return;
// When expectFinal is set, skip intermediate responses and wait for the final result.
// chat.send sends intermediate frames with status="accepted"|"started"|"running"
// and stream frames with state="delta". The final frame has state="final"|"aborted"|"error".
const payload = frame.payload as any;
const status = payload?.status;
const state = payload?.state;
if (p.expectFinal) {
const isIntermediate =
status === 'accepted' || status === 'started' || status === 'running' ||
state === 'delta';
if (isIntermediate) return;
}
this.pending.delete(frame.id);
if (frame.ok) {
p.resolve(frame.payload ?? null);
@ -167,7 +181,11 @@ export class OpenClawClient {
p.reject(new Error(frame.error?.message ?? frame.error?.code ?? 'RPC error'));
}
}
// Events are ignored by default (not needed for the bridge use case)
// Dispatch events to subscribed handlers
if (frame.type === 'event') {
const handlers = this.eventHandlers.get(frame.event);
if (handlers) handlers.forEach(h => h(frame.payload));
}
}
private async sendHandshake(nonce: string): Promise<void> {
@ -230,7 +248,7 @@ export class OpenClawClient {
return this.connected && this.ws?.readyState === WebSocket.OPEN;
}
rpc(method: string, params?: unknown, timeoutMs = 30_000): Promise<unknown> {
rpc(method: string, params?: unknown, timeoutMs = 30_000, expectFinal = false): Promise<unknown> {
return new Promise((resolve, reject) => {
if (!this.isConnected()) {
reject(new Error('Not connected to OpenClaw gateway'));
@ -238,7 +256,7 @@ export class OpenClawClient {
}
const id = String(++this.msgCounter);
const frame: ReqFrame = { type: 'req', id, method, params };
this.pending.set(id, { resolve, reject });
this.pending.set(id, { resolve, reject, expectFinal });
this.ws!.send(JSON.stringify(frame));
const timer = setTimeout(() => {
@ -252,6 +270,92 @@ export class OpenClawClient {
});
}
/**
* Subscribe to a WS event. Returns an unsubscribe function.
*/
onEvent(event: string, handler: (payload: unknown) => void): () => void {
const handlers = this.eventHandlers.get(event) ?? [];
handlers.push(handler);
this.eventHandlers.set(event, handlers);
return () => {
const h = this.eventHandlers.get(event);
if (h) this.eventHandlers.set(event, h.filter(fn => fn !== handler));
};
}
/**
* Send a chat message and wait for the agent's final reply.
*
* Protocol (OpenClaw v3):
* 1. chat.send RPC immediate ack { runId, status: "started" }
* 2. Agent processes, streams delta events (event: "chat", state: "delta")
* 3. Final event (event: "chat", state: "final") contains the full reply
*
* Returns the reply text, or throws on timeout / agent error.
*/
async chatSendAndWait(params: {
sessionKey: string;
message: string;
idempotencyKey: string;
timeoutMs?: number;
}): Promise<string> {
const timeoutMs = params.timeoutMs ?? 30_000;
// Send chat.send — resolves immediately with { runId, status: "started" }
const ack = await this.rpc(
'chat.send',
{
sessionKey: params.sessionKey,
message: params.message,
idempotencyKey: params.idempotencyKey,
},
10_000, // 10s for the initial ack
) as { runId: string; status: string };
const runId = ack?.runId ?? params.idempotencyKey;
// Wait for the final "chat" event matching our runId
return new Promise<string>((resolve, reject) => {
const timer = setTimeout(() => {
unsubscribe();
reject(new Error(`Agent reply timeout after ${timeoutMs}ms`));
}, timeoutMs);
const unsubscribe = this.onEvent('chat', (payload: any) => {
if (payload?.runId !== runId) return; // different conversation
const state = payload?.state;
if (state === 'delta') return; // streaming chunk, keep waiting
clearTimeout(timer);
unsubscribe();
if (state === 'final') {
const msg = payload?.message;
// message can be string or { content: [{type:"text", text:"..."}] }
if (typeof msg === 'string') {
resolve(msg);
} else if (msg?.content) {
const texts: string[] = Array.isArray(msg.content)
? msg.content.filter((c: any) => c.type === 'text').map((c: any) => c.text ?? '')
: [typeof msg.content === 'string' ? msg.content : JSON.stringify(msg.content)];
resolve(texts.join(''));
} else if (msg?.text) {
resolve(msg.text);
} else {
resolve(JSON.stringify(payload));
}
} else if (state === 'aborted') {
reject(new Error('Agent run was aborted'));
} else if (state === 'error') {
reject(new Error(payload?.errorMessage ?? 'Agent run failed'));
} else {
// Unknown final state — return raw payload
resolve(JSON.stringify(payload));
}
});
});
}
close(): void {
this.ws?.close();
this.connected = false;

View File

@ -224,9 +224,39 @@ export class DingTalkRouterService implements OnModuleInit, OnModuleDestroy {
await this.instanceRepo.save(instance);
this.logger.log(`OAuth binding: instance ${entry.instanceId} → DingTalk openId ${dingTalkUserId}`);
// Send a proactive greeting so the user knows the binding succeeded
this.sendGreeting(dingTalkUserId, instance.name).catch((e: Error) =>
this.logger.warn(`Greeting send failed (non-fatal): ${e.message}`),
);
return { instanceId: entry.instanceId, instanceName: instance.name };
}
/**
* Send a proactive bot message to a DingTalk user via the batchSend API.
* Called after OAuth binding completes to greet the user.
*/
private async sendGreeting(openId: string, agentName: string): Promise<void> {
const token = await this.getToken();
const greeting =
`👋 你好!我是你的 AI 智能体助手「${agentName}」。\n\n` +
`从现在起,你可以直接在这里向我发送指令,我会自主地帮你完成工作任务。\n\n` +
`例如:\n• 查询服务器状态\n• 执行运维脚本\n• 管理文件和进程\n\n` +
`有什么需要帮忙的,直接说吧!`;
await this.httpsPost<unknown>(
'api.dingtalk.com',
'/v1.0/robot/oToMessages/batchSend',
{
robotCode: this.clientId,
userIds: [openId],
msgKey: 'sampleText',
msgParam: JSON.stringify({ content: greeting }),
},
{ 'x-acs-dingtalk-access-token': token },
);
}
// ── Token management ───────────────────────────────────────────────────────
private async getToken(): Promise<string> {