From 5cf72c4780e53d21f56dc441dbef139c90e3d249 Mon Sep 17 00:00:00 2001 From: hailin Date: Sun, 8 Mar 2026 13:18:52 -0700 Subject: [PATCH] feat(dingtalk+bridge): event-based agent reply + greeting on binding openclaw-bridge: - index.ts: /task endpoint now calls chatSendAndWait() with idempotencyKey (removes broken timeoutSeconds param; uses caller-supplied msgId for dedup) - openclaw-client.ts: added onEvent() subscription + chatSendAndWait() that subscribes to 'chat' WS events, waits for state='final' matching runId, and extracts text from the message payload dingtalk-router: - After OAuth binding completes, sends a proactive greeting to the user via DingTalk batchSend API (/v1.0/robot/oToMessages/batchSend) introducing the agent by name and explaining what it can do Co-Authored-By: Claude Sonnet 4.6 --- packages/openclaw-bridge/src/index.ts | 35 ++++-- .../openclaw-bridge/src/openclaw-client.ts | 112 +++++++++++++++++- .../dingtalk/dingtalk-router.service.ts | 30 +++++ 3 files changed, 164 insertions(+), 13 deletions(-) diff --git a/packages/openclaw-bridge/src/index.ts b/packages/openclaw-bridge/src/index.ts index e9fccee..fb8019e 100644 --- a/packages/openclaw-bridge/src/index.ts +++ b/packages/openclaw-bridge/src/index.ts @@ -7,6 +7,7 @@ */ import 'dotenv/config'; import express from 'express'; +import * as crypto from 'crypto'; import { OpenClawClient } from './openclaw-client'; const PORT = parseInt(process.env.BRIDGE_PORT ?? '3000', 10); @@ -60,9 +61,19 @@ app.get('/status', (_req, res) => { }); }); -// Submit a task to OpenClaw -// Uses chat.send (Protocol v3). sessionKey format: "agent:main:" -// timeoutSeconds: how long to wait for agent reply (0=fire-and-forget, default 25) +// Submit a task to OpenClaw and wait for the agent's final reply. +// +// OpenClaw Protocol v3 flow: +// 1. POST /task → bridge calls chat.send → gets { runId, status:"started" } ack +// 2. Agent processes, pushes WS events (type:"event", event:"chat") +// 3. Final event { state:"final", message:{content:[{type:"text",text:"..."}]} } +// is captured by the bridge's event listener and returned as the HTTP response. +// +// Request body: +// sessionKey — OpenClaw session key, e.g. "agent:main:dt-" +// prompt — message text to send +// idempotencyKey — caller-supplied unique key (DingTalk msgId etc.) for dedup +// timeoutSeconds — max seconds to wait for agent reply (default 25) app.post('/task', async (req, res) => { if (!ocClient.isConnected()) { res.status(503).json({ error: 'Gateway not connected' }); @@ -71,12 +82,18 @@ app.post('/task', async (req, res) => { try { const sessionKey = req.body.sessionKey ?? `agent:main:${req.body.sessionId ?? 'main'}`; const timeoutSeconds: number = req.body.timeoutSeconds ?? 25; - const result = await ocClient.rpc( - 'chat.send', - { sessionKey, message: req.body.prompt, timeoutSeconds }, - (timeoutSeconds + 5) * 1000, // rpc timeout = agent timeout + 5s buffer - ); - res.json({ ok: true, result }); + // idempotencyKey is mandatory in chat.send (Protocol v3). Use caller-supplied value + // (so DingTalk msgId is preserved for dedup), or generate a UUID for ad-hoc calls. + const idempotencyKey: string = req.body.idempotencyKey ?? crypto.randomUUID(); + + const reply = await ocClient.chatSendAndWait({ + sessionKey, + message: req.body.prompt, + idempotencyKey, + timeoutMs: timeoutSeconds * 1000, + }); + + res.json({ ok: true, result: reply }); } catch (err: any) { res.status(500).json({ error: err.message }); } diff --git a/packages/openclaw-bridge/src/openclaw-client.ts b/packages/openclaw-bridge/src/openclaw-client.ts index 6ce14c4..95c4c8e 100644 --- a/packages/openclaw-bridge/src/openclaw-client.ts +++ b/packages/openclaw-bridge/src/openclaw-client.ts @@ -72,8 +72,10 @@ export class OpenClawClient { private ws: WebSocket | null = null; private pending = new Map< string, - { resolve: (v: unknown) => void; reject: (e: Error) => void } + { resolve: (v: unknown) => void; reject: (e: Error) => void; expectFinal: boolean } >(); + // Event subscriptions: event name → list of handlers + private eventHandlers = new Map void>>(); private connected = false; private msgCounter = 0; @@ -160,6 +162,18 @@ export class OpenClawClient { if (frame.type === 'res') { const p = this.pending.get(frame.id); if (!p) return; + // When expectFinal is set, skip intermediate responses and wait for the final result. + // chat.send sends intermediate frames with status="accepted"|"started"|"running" + // and stream frames with state="delta". The final frame has state="final"|"aborted"|"error". + const payload = frame.payload as any; + const status = payload?.status; + const state = payload?.state; + if (p.expectFinal) { + const isIntermediate = + status === 'accepted' || status === 'started' || status === 'running' || + state === 'delta'; + if (isIntermediate) return; + } this.pending.delete(frame.id); if (frame.ok) { p.resolve(frame.payload ?? null); @@ -167,7 +181,11 @@ export class OpenClawClient { p.reject(new Error(frame.error?.message ?? frame.error?.code ?? 'RPC error')); } } - // Events are ignored by default (not needed for the bridge use case) + // Dispatch events to subscribed handlers + if (frame.type === 'event') { + const handlers = this.eventHandlers.get(frame.event); + if (handlers) handlers.forEach(h => h(frame.payload)); + } } private async sendHandshake(nonce: string): Promise { @@ -230,7 +248,7 @@ export class OpenClawClient { return this.connected && this.ws?.readyState === WebSocket.OPEN; } - rpc(method: string, params?: unknown, timeoutMs = 30_000): Promise { + rpc(method: string, params?: unknown, timeoutMs = 30_000, expectFinal = false): Promise { return new Promise((resolve, reject) => { if (!this.isConnected()) { reject(new Error('Not connected to OpenClaw gateway')); @@ -238,7 +256,7 @@ export class OpenClawClient { } const id = String(++this.msgCounter); const frame: ReqFrame = { type: 'req', id, method, params }; - this.pending.set(id, { resolve, reject }); + this.pending.set(id, { resolve, reject, expectFinal }); this.ws!.send(JSON.stringify(frame)); const timer = setTimeout(() => { @@ -252,6 +270,92 @@ export class OpenClawClient { }); } + /** + * Subscribe to a WS event. Returns an unsubscribe function. + */ + onEvent(event: string, handler: (payload: unknown) => void): () => void { + const handlers = this.eventHandlers.get(event) ?? []; + handlers.push(handler); + this.eventHandlers.set(event, handlers); + return () => { + const h = this.eventHandlers.get(event); + if (h) this.eventHandlers.set(event, h.filter(fn => fn !== handler)); + }; + } + + /** + * Send a chat message and wait for the agent's final reply. + * + * Protocol (OpenClaw v3): + * 1. chat.send RPC → immediate ack { runId, status: "started" } + * 2. Agent processes, streams delta events (event: "chat", state: "delta") + * 3. Final event (event: "chat", state: "final") contains the full reply + * + * Returns the reply text, or throws on timeout / agent error. + */ + async chatSendAndWait(params: { + sessionKey: string; + message: string; + idempotencyKey: string; + timeoutMs?: number; + }): Promise { + const timeoutMs = params.timeoutMs ?? 30_000; + + // Send chat.send — resolves immediately with { runId, status: "started" } + const ack = await this.rpc( + 'chat.send', + { + sessionKey: params.sessionKey, + message: params.message, + idempotencyKey: params.idempotencyKey, + }, + 10_000, // 10s for the initial ack + ) as { runId: string; status: string }; + + const runId = ack?.runId ?? params.idempotencyKey; + + // Wait for the final "chat" event matching our runId + return new Promise((resolve, reject) => { + const timer = setTimeout(() => { + unsubscribe(); + reject(new Error(`Agent reply timeout after ${timeoutMs}ms`)); + }, timeoutMs); + + const unsubscribe = this.onEvent('chat', (payload: any) => { + if (payload?.runId !== runId) return; // different conversation + const state = payload?.state; + if (state === 'delta') return; // streaming chunk, keep waiting + + clearTimeout(timer); + unsubscribe(); + + if (state === 'final') { + const msg = payload?.message; + // message can be string or { content: [{type:"text", text:"..."}] } + if (typeof msg === 'string') { + resolve(msg); + } else if (msg?.content) { + const texts: string[] = Array.isArray(msg.content) + ? msg.content.filter((c: any) => c.type === 'text').map((c: any) => c.text ?? '') + : [typeof msg.content === 'string' ? msg.content : JSON.stringify(msg.content)]; + resolve(texts.join('')); + } else if (msg?.text) { + resolve(msg.text); + } else { + resolve(JSON.stringify(payload)); + } + } else if (state === 'aborted') { + reject(new Error('Agent run was aborted')); + } else if (state === 'error') { + reject(new Error(payload?.errorMessage ?? 'Agent run failed')); + } else { + // Unknown final state — return raw payload + resolve(JSON.stringify(payload)); + } + }); + }); + } + close(): void { this.ws?.close(); this.connected = false; diff --git a/packages/services/agent-service/src/infrastructure/dingtalk/dingtalk-router.service.ts b/packages/services/agent-service/src/infrastructure/dingtalk/dingtalk-router.service.ts index 6377133..df006dd 100644 --- a/packages/services/agent-service/src/infrastructure/dingtalk/dingtalk-router.service.ts +++ b/packages/services/agent-service/src/infrastructure/dingtalk/dingtalk-router.service.ts @@ -224,9 +224,39 @@ export class DingTalkRouterService implements OnModuleInit, OnModuleDestroy { await this.instanceRepo.save(instance); this.logger.log(`OAuth binding: instance ${entry.instanceId} → DingTalk openId ${dingTalkUserId}`); + + // Send a proactive greeting so the user knows the binding succeeded + this.sendGreeting(dingTalkUserId, instance.name).catch((e: Error) => + this.logger.warn(`Greeting send failed (non-fatal): ${e.message}`), + ); + return { instanceId: entry.instanceId, instanceName: instance.name }; } + /** + * Send a proactive bot message to a DingTalk user via the batchSend API. + * Called after OAuth binding completes to greet the user. + */ + private async sendGreeting(openId: string, agentName: string): Promise { + const token = await this.getToken(); + const greeting = + `👋 你好!我是你的 AI 智能体助手「${agentName}」。\n\n` + + `从现在起,你可以直接在这里向我发送指令,我会自主地帮你完成工作任务。\n\n` + + `例如:\n• 查询服务器状态\n• 执行运维脚本\n• 管理文件和进程\n\n` + + `有什么需要帮忙的,直接说吧!`; + await this.httpsPost( + 'api.dingtalk.com', + '/v1.0/robot/oToMessages/batchSend', + { + robotCode: this.clientId, + userIds: [openId], + msgKey: 'sampleText', + msgParam: JSON.stringify({ content: greeting }), + }, + { 'x-acs-dingtalk-access-token': token }, + ); + } + // ── Token management ─────────────────────────────────────────────────────── private async getToken(): Promise {