feat(bridge): DingTalk channel plugin + OpenClaw Protocol v3 rewrite

Core changes:
- src/channels/dingtalk.ts: DingTalk Stream SDK channel (no public IP needed)
  - TokenManager: auto-refresh with refreshPromise mutex (prevents race condition)
  - UserQueue: per-user serial queue, max depth 5
  - MsgDedup: O(1) Map<string,timestamp> with 10min TTL + 10k cap
  - RateLimiter: sliding window 10 msg/min per user
  - ResilientOcClient: 10s heartbeat poll + atomic reconnect guard
  - DingTalkStream: exponential backoff reconnect (2s→60s), immediate ACK
  - replyToUser: sessionWebhook expiry check + 4800-char chunking

- src/openclaw-client.ts: rewritten for correct Protocol v3 wire format
  - Request frame: { type:"req", id, method, params }
  - Challenge-response Ed25519 handshake (connect.challenge → connect req)
  - Correct rpc() with configurable timeoutMs

- src/index.ts: fixed RPC method names
  - agent.run → chat.send with { sessionKey, message, timeoutSeconds }
  - metrics.get → gateway.status

- Dockerfile: adds start-dingtalk.sh COPY + chmod
- supervisord.conf: dingtalk-channel program block (autorestart=unexpected)
- start-dingtalk.sh: exits 0 if DINGTALK_CLIENT_ID unset (no restart loop)
- CHANNEL_DEV_GUIDE.md: full dev guide for future channel integrations

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-03-08 05:10:01 -07:00
parent 688219ab74
commit b0801e0983
8 changed files with 2399 additions and 52 deletions

View File

@ -0,0 +1,471 @@
# OpenClaw Channel 插件开发指南
> 基于钉钉 Channel 插件(`src/channels/dingtalk.ts`)的完整开发经验总结。
> 开发新 Channel 时,本文档是你的第一参考。
---
## 一、整体架构
每个 OpenClaw 实例是一个独立 Docker 容器(`hailin168/openclaw-bridge:latest`)。
容器内有三个进程,由 supervisord 管理:
```
容器
├── openclaw gateway 端口 18789 (内部) — 核心 AI Agent 进程
├── it0-bridge 端口 3000 (对外) — IT0 管理 REST API
└── dingtalk-channel 无对外端口 — 第三方 Channel 进程(可选)
```
**Channel 插件的职责:**
1. 连接第三方平台(接收用户消息)
2. 转发给 OpenClaw Gateway通过 WebSocket RPC
3. 将 OpenClaw 响应回发给用户
Channel 插件不需要任何对外端口,完全依赖出向连接。
---
## 二、OpenClaw Gateway 协议 (Protocol v3)
> ⚠️ 这是最重要的部分。协议细节必须准确否则运行时全部失败TypeScript 编译器无法检测。
### 2.1 Wire 帧格式
```typescript
// 请求帧Client → Gateway
interface ReqFrame {
type: 'req'; // 必须是字面量 'req'
id: string; // 唯一请求 ID自增字符串即可
method: string; // RPC 方法名
params?: unknown; // 方法参数
idempotencyKey?: string; // 可选,幂等键
}
// 响应帧Gateway → Client
interface ResFrame {
type: 'res';
id: string; // 对应请求的 id
ok: boolean; // 成功/失败标志
payload?: unknown; // 成功时的响应数据
error?: {
code: string;
message: string;
retryable?: boolean;
retryAfterMs?: number;
};
}
// 事件帧Gateway → Client主动推送
interface EventFrame {
type: 'event';
event: string; // 事件名
payload?: unknown;
seq?: number; // 单调递增序号
}
```
### 2.2 连接握手(必须完成,否则所有 RPC 都会被拒绝)
OpenClaw 使用 **Challenge-Response + Ed25519 签名** 认证。流程:
```
Client ──────── WebSocket Connect ──────────────────→ Gateway
Gateway ──── event: "connect.challenge" { nonce } ──→ Client
Client ──── req: "connect" { auth.token, device } ──→ Gateway
Gateway ─── res: "connect" { ok: true } ────────────→ Client
↑ 这之后才能发送任何 RPC 请求
```
关键实现细节:
```typescript
// 1. 用 Node.js 内置 crypto 生成临时 Ed25519 密钥对(每次连接生成新的)
const keyPair = crypto.generateKeyPairSync('ed25519');
// 2. 用私钥对 nonce (hex string → Buffer) 签名
const nonceBuffer = Buffer.from(nonce, 'hex');
const signature = crypto.sign(null, nonceBuffer, keyPair.privateKey);
// 3. 公钥导出为 DER 格式 base64
const pubKeyDer = keyPair.publicKey.export({ type: 'spki', format: 'der' });
// 4. 发送 connect 请求
{
type: 'req',
id: '__connect__',
method: 'connect',
params: {
minProtocol: 3, maxProtocol: 3,
client: { id: deviceId, version: '1.0.0', platform: 'node', mode: 'channel' },
role: 'operator',
scopes: ['operator.read', 'operator.write'],
auth: { token: OPENCLAW_GATEWAY_TOKEN }, // 环境变量注入的 token
device: {
id: deviceId,
publicKey: pubKeyDer.toString('base64'),
signature: signature.toString('base64'),
signedAt: Date.now(),
}
}
}
```
握手超时Gateway 有 10s 超时,客户端应设 12s 安全超时。
### 2.3 关键 RPC 方法(经官方源码验证)
| 功能 | 方法名 | 关键参数 |
|------|--------|---------|
| 发消息给 Agent | `chat.send` | `{ sessionKey, message, timeoutSeconds }` |
| 列出会话 | `sessions.list` | `{ limit, kinds?, activeMinutes? }` |
| 获取会话历史 | `sessions.history` | `{ sessionKey, messageLimit }` |
| 中止当前任务 | `chat.abort` | `{ sessionKey }` |
| 网关状态 | `gateway.status` | 无 |
| 健康检查 | `gateway.health` | 无 |
> ❌ **以下方法不存在,不要使用:**
> - `agent.run` — 根本不存在
> - `metrics.get` — 不存在,用 `gateway.status` 替代
### 2.4 Session Key 格式
格式:`"type:agentName:sessionName"`
```
agent:main:main — 默认主 session
agent:main:dt-{userId} — 钉钉用户专属 session每用户独立
agent:main:tg-{chatId} — Telegram chat 专属 session
agent:main:wechat-{openId} — 微信用户专属 session
```
每个 OpenClaw 实例默认只有一个 agent "main"。
Session name 部分可以自定义Gateway 会自动创建不存在的 session。
### 2.5 `chat.send` 的响应结构
```typescript
// payload 结构
{
runId: string;
status: 'started' | 'in_flight' | 'ok' | 'timeout';
reply?: string; // 当 timeoutSeconds > 0 且 status=ok 时有值
error?: string; // 当 status=timeout 时有值
}
// 从 payload 提取回复文本的正确方式:
const payload = result as { status: string; reply?: string };
const replyText = payload.reply ?? '(任务已提交,正在处理)';
```
---
## 三、钉钉 Channel 实现要点
### 3.1 使用 Stream 模式(无需公网 IP
钉钉官方提供两种接入方式:
- **HTTP 回调模式**:需要公网 IP + 备案,不适合容器部署
- **Stream 模式**:主动连接钉钉服务器,容器内部署完美适配 ✓
Stream 模式连接流程:
```
1. POST https://api.dingtalk.com/v1.0/oauth2/accessToken
→ { accessToken, expireIn: 7200 }
2. POST https://api.dingtalk.com/v1.0/gateway/connections/open
Headers: x-acs-dingtalk-access-token: {accessToken}
Body: { clientId, clientSecret, subscriptions: [{type:'CALLBACK', topic:'/v1.0/im/bot/messages/get'}] }
→ { endpoint, ticket }
3. WSS connect: {endpoint}?ticket={ticket}
→ 保持长连接,接收消息
```
### 3.2 消息帧格式
```typescript
// 服务器推送的消息帧
{
type: 'CALLBACK', // or 'PING', 'EVENT', 'SYSTEM'
headers: { topic: '/v1.0/im/bot/messages/get', ... },
data: string // JSON string
}
// data 解析后的 bot 消息结构
{
senderStaffId: string, // 发送者工号
sessionWebhook: string, // 用于回复的 Webhook URL有过期时间
sessionWebhookExpiredTime: number, // 毫秒时间戳
text: { content: string }, // 消息文本
conversationId: string, // 群/单聊会话 ID
msgId: string // 消息唯一 ID用于去重
}
// 必须在收到消息后 1.5 秒内 ACK否则钉钉会重发
ws.send(JSON.stringify({ code: 200, headers: frame.headers, message: 'OK', data: '' }));
```
### 3.3 已验证的钉钉 API 限制
| 限制 | 数值 |
|------|------|
| Access Token 有效期 | 7200 秒2小时|
| 单条消息字符上限 | 5000 字符 |
| ACK 超时 | 1.5 秒 |
| sessionWebhook 有效期 | 约 24 小时(群消息更短)|
### 3.4 回复钉钉消息(通过 sessionWebhook
```typescript
function sendWebhook(webhook: string, content: string): void {
const url = new URL(webhook);
const body = JSON.stringify({ msgtype: 'text', text: { content } });
// 用 Node.js 内置 https 模块,无外部依赖
const req = https.request({ hostname: url.hostname, path: url.pathname + url.search, method: 'POST', ... });
}
```
超过 4800 字符需分块发送,每块间隔 200ms 保证顺序。
---
## 四、生产级鲁棒性模式(从错误中总结)
### 4.1 Token 自动刷新 + 防竞态
```typescript
class TokenManager {
private refreshPromise: Promise<string> | null = null; // 互斥锁
async get(): Promise<string> {
if (Date.now() < this.expiresAt - BUFFER_MS) return this.token;
// 关键:多个并发调用只触发一次刷新
if (this.refreshPromise) return this.refreshPromise;
this.refreshPromise = this.doRefresh().finally(() => { this.refreshPromise = null; });
return this.refreshPromise;
}
}
```
### 4.2 消息去重O(1) + TTL
```typescript
class MsgDedup {
private readonly seen = new Map<string, number>(); // msgId → timestamp
has(msgId: string): boolean {
const t = this.seen.get(msgId);
if (t === undefined) return false;
if (Date.now() - t > this.ttlMs) { this.seen.delete(msgId); return false; }
return true;
}
}
// ❌ 不要用 Array.includes() — O(n)5000条时慢到不可用
```
### 4.3 每用户串行队列 + 深度上限
```typescript
class UserQueue {
private readonly tails = new Map<string, Promise<void>>();
private readonly depths = new Map<string, number>();
enqueue(userId: string, task: () => Promise<void>): boolean {
if ((this.depths.get(userId) ?? 0) >= MAX_DEPTH) return false; // 拒绝
// Promise 链串行执行,不同用户并发互不影响
...
}
}
// MAX_DEPTH = 5用户排队超5条立即告知防内存泄漏
```
### 4.4 WebSocket 断线指数退避重连
```typescript
private reconnectDelay = 2_000;
private scheduleReconnect(): void {
setTimeout(() => this.start(), this.reconnectDelay);
this.reconnectDelay = Math.min(this.reconnectDelay * 2, 60_000);
// 成功连接后在 ws.on('open') 里重置this.reconnectDelay = 2_000;
}
```
### 4.5 OpenClaw Gateway 重连防多开
```typescript
private setupAutoReconnect(): void {
const check = setInterval(() => {
if (this.client.isConnected() || this.reconnecting) return; // 原子守门
this.reconnecting = true;
this.client = new OpenClawClient(...); // 先替换,再重连
const attempt = async () => {
try { await this.client.connect(); this.reconnecting = false; }
catch { setTimeout(attempt, delay *= 1.5); } // 递归重试,不叠加
};
attempt();
}, 10_000);
check.unref(); // 不阻止进程退出
}
// ❌ 不要在 catch 里直接重调 this.connect(),会叠加多个重连 loop
```
### 4.6 优雅关闭
```typescript
process.once('SIGTERM', () => {
stream.stop();
tokenMgr.destroy(); // 清除 setTimeout 防止内存泄漏
setTimeout(() => process.exit(0), 3_000); // 3秒内完成清理
});
```
---
## 五、supervisord 集成模式
容器内可选 Channel 的标准模式:
```ini
[program:dingtalk-channel]
command=/app/bridge/start-dingtalk.sh ; 包装脚本,未配置时 exit 0
autostart=true
autorestart=unexpected ; ← 关键:不是 true
exitcodes=0 ; ← 关键exit 0 = 正常退出,不重启
startretries=5
startsecs=10
```
包装脚本模板 (`start-xxx.sh`)
```bash
#!/bin/sh
if [ -z "$XXX_CLIENT_ID" ]; then
echo "[xxx] Not configured — channel disabled."
exit 0 # supervisord 看到 exit 0 → EXITED不重启
fi
exec node /app/bridge/dist/channels/xxx.js
```
---
## 六、开发新 Channel 的步骤
### Step 1: 确认平台接入机制
| 问题 | 必须确认 |
|------|---------|
| 是否需要公网 IP | 需要 → 考虑反向代理不需要WebSocket Stream→ 最佳 |
| 是否有官方 SDK | 评估SDK 稳定性 vs 自实现控制力 |
| Token 有效期多长? | 决定刷新策略 |
| 消息字符上限? | 决定分块策略 |
| 是否有消息重投? | 决定去重策略 |
| ACK 超时多长? | 必须在超时前 ACK将 AI 处理异步化 |
### Step 2: 创建文件
```
packages/openclaw-bridge/src/channels/xxx.ts — 主进程
packages/openclaw-bridge/start-xxx.sh — 启动包装脚本
```
### Step 3: 复用现有基础设施
```typescript
import { OpenClawClient } from '../openclaw-client'; // ✓ 直接复用Protocol v3 已正确实现
// 复用以下 classTokenManager、UserQueue、MsgDedup、RateLimiter
// 只需自己实现:平台 WS 连接 + 消息收发逻辑
```
### Step 4: 向 OpenClaw 发消息的标准代码
```typescript
const result = await ocClient.rpc('chat.send', {
sessionKey: `agent:main:${PLATFORM_PREFIX}-${userId}`, // 确保跨平台 session 不冲突
message: userText,
timeoutSeconds: 25,
}, 30_000);
const payload = result as { status: string; reply?: string; error?: string };
if (payload.status === 'ok' && payload.reply) {
await replyToUser(payload.reply);
} else if (payload.status === 'timeout') {
await replyToUser('处理超时,请稍后重试。');
}
```
### Step 5: 更新 supervisord.conf 和 Dockerfile
`supervisord.conf` 末尾追加新 program 块(参考钉钉模板)。
`Dockerfile` 中 COPY 新的启动脚本并 chmod +x。
### Step 6: 更新 agent-instance-deploy.service.ts
`runDeploy``envParts` 数组中追加新平台的环境变量:
```typescript
if (xxxClientId) envParts.push(`-e XXX_CLIENT_ID=${xxxClientId}`);
```
### Step 7: 更新 system-prompt-builder.ts
教 iAgent 如何引导用户完成该 Channel 的绑定配置步骤。
---
## 七、各平台接入难度预估
| 平台 | 接入方式 | 需要公网IP | 预估难度 | 备注 |
|------|---------|-----------|---------|------|
| **钉钉** | Stream SDK (WSS) | 否 | ⭐⭐ | 已实现,可直接参考 |
| **飞书Lark** | 事件订阅 WebSocket | 否 | ⭐⭐ | 官方有 Stream SDK与钉钉结构类似 |
| **Telegram** | Long Polling / Webhook | 否polling/ 是webhook| ⭐ | 最简单Bot API 文档极好 |
| **WhatsApp** | Cloud API Webhook | 是 | ⭐⭐⭐ | 需 Meta 审核,有 webhook 验证 |
| **Slack** | Socket Mode | 否 | ⭐⭐ | Socket Mode 无需公网,文档好 |
| **Discord** | Gateway WebSocket | 否 | ⭐⭐ | 官方 gateway 长连接,文档清晰 |
| **微信** | 服务号 / 企业微信 | 是(公网 + 备案)| ⭐⭐⭐⭐ | 最麻烦,需域名验证 |
| **Line** | Messaging API Webhook | 是 | ⭐⭐⭐ | 与 WhatsApp 类似 |
**优先推荐顺序**(按落地难度由易到难):
1. Telegram — 最快1天内可完成
2. 飞书 — 国内办公场景,与钉钉代码高度相似
3. Slack — 国际企业场景
4. Discord — 开发者/社区场景
---
## 八、经验教训(避坑清单)
1. **永远先读官方源码,不要猜 API 方法名**
- 本项目曾把 `chat.send` 猜成 `agent.run``gateway.status` 猜成 `metrics.get`TypeScript 编译通过但 runtime 全部失败
- 查源码路径github.com/openclaw/openclaw 的 `src/gateway/server-methods/`
2. **Channel token 必须有自动续期机制**
- 钉钉 Access Token 2小时过期不续期则服务必然中断
- 在 `get()` 中加 Promise 互斥锁,防止并发双重刷新
3. **永远在收到消息后立即 ACK再异步处理 AI**
- 钉钉 ACK 超时 1.5 秒AI 响应可能需要 30 秒
- 先 `ws.send(ACK)`,再 `queue.enqueue()` 异步处理
4. **消息去重用 Map不用 Array**
- `Array.includes()` 是 O(n)5000条消息时每次查找慢 5ms
- `Map.get()` 是 O(1),加 TTL 可精确控制内存
5. **supervisord 的 autorestart 要用 unexpected不是 true**
- `autorestart=true` + 未配置时 exit 0 → 无限重启循环
- `autorestart=unexpected` + `exitcodes=0` → 正常退出不重启
6. **每用户消息队列必须有深度上限**
- 用户刷消息时,无限队列会耗尽内存
- 建议上限 5 条,超出告知用户"请等待"
7. **OpenClaw 重连不要用嵌套 setInterval + async**
- 多个 interval tick 可能同时触发多个重连 goroutine
- 用 flag 原子保护 + 单一递归 `attempt()` 函数
8. **回复消息要检查 webhook 过期时间**
- 某些平台的 webhook 有效期很短(几分钟到几小时)
- AI 处理完后要先检查 `webhookExpiredTime`,过期则不发
9. **容器内没有 curl用 wget 或 Node.js https 模块**
- `hailin168/openclaw-bridge` 镜像内无 curl
- `wget -q -O- --post-data=...` 是 iAgent 内部调用 API 的正确方式

View File

@ -53,6 +53,10 @@ COPY --from=bridge-builder --chown=node:node /build/bridge/dist ./dist
COPY --from=bridge-builder --chown=node:node /build/bridge/node_modules ./node_modules
COPY --from=bridge-builder --chown=node:node /build/bridge/package.json ./
# DingTalk startup wrapper
COPY --chown=node:node start-dingtalk.sh ./start-dingtalk.sh
RUN chmod +x /app/bridge/start-dingtalk.sh
# ── supervisord config ────────────────────────────────────────
COPY supervisord.conf /etc/supervisor/conf.d/openclaw-bridge.conf

1207
packages/openclaw-bridge/package-lock.json generated Normal file

File diff suppressed because it is too large Load Diff

View File

@ -6,6 +6,7 @@
"scripts": {
"build": "tsc",
"start": "node dist/index.js",
"start:dingtalk": "node dist/channels/dingtalk.js",
"dev": "ts-node src/index.ts"
},
"dependencies": {

View File

@ -0,0 +1,547 @@
/**
* DingTalk Channel for OpenClaw Production Grade
*
* Robustness features:
* Access token auto-refresh (5 min before expiry)
* DingTalk Stream WS reconnect with exponential backoff
* OpenClaw gateway auto-reconnect on disconnect
* Per-user serial message queue (no cross-user interference)
* Message deduplication (msgId-based, bounded memory)
* Per-user rate limiting (10 msg/min)
* sessionWebhook expiry guard
* Reply chunking (DingTalk 5000-char limit)
* Sanitized error messages (no internal details exposed)
* Graceful shutdown (SIGTERM/SIGINT)
*
* Required env vars:
* DINGTALK_CLIENT_ID DingTalk AppKey
* DINGTALK_CLIENT_SECRET DingTalk AppSecret
* OPENCLAW_GATEWAY_URL ws://127.0.0.1:18789 (default)
* OPENCLAW_GATEWAY_TOKEN Internal gateway auth token
*/
import 'dotenv/config';
import * as https from 'https';
import WebSocket from 'ws';
import { OpenClawClient } from '../openclaw-client';
// ── Config ────────────────────────────────────────────────────────────────────
const CLIENT_ID = process.env.DINGTALK_CLIENT_ID ?? '';
const CLIENT_SECRET = process.env.DINGTALK_CLIENT_SECRET ?? '';
const OC_GATEWAY = process.env.OPENCLAW_GATEWAY_URL ?? 'ws://127.0.0.1:18789';
const OC_TOKEN = process.env.OPENCLAW_GATEWAY_TOKEN ?? '';
const DINGTALK_MAX_CHARS = 4800; // leave headroom under 5000
const RATE_LIMIT_PER_MIN = 10;
const TOKEN_REFRESH_BUFFER = 300; // seconds before expiry to refresh
const WS_RECONNECT_BASE_MS = 2_000;
const WS_RECONNECT_MAX_MS = 60_000;
const OC_RECONNECT_BASE_MS = 3_000;
const OC_RECONNECT_MAX_MS = 30_000;
const OC_GATEWAY_RETRIES = 40;
if (!CLIENT_ID || !CLIENT_SECRET) {
console.log('[dingtalk] Not configured — channel disabled.');
process.exit(0);
}
// ── Token Manager — auto-refresh + mutex (no concurrent double-refresh) ───
class TokenManager {
private token = '';
private expiresAt = 0;
private refreshTimer?: NodeJS.Timeout;
private refreshPromise: Promise<string> | null = null; // mutex
async get(): Promise<string> {
if (Date.now() < this.expiresAt - TOKEN_REFRESH_BUFFER * 1000) {
return this.token;
}
// If a refresh is already in-flight, piggyback on it instead of starting another
if (this.refreshPromise) return this.refreshPromise;
this.refreshPromise = this.doRefresh().finally(() => { this.refreshPromise = null; });
return this.refreshPromise;
}
private async doRefresh(): Promise<string> {
const { accessToken, expireIn } = await httpPost<{ accessToken: string; expireIn: number }>(
'api.dingtalk.com',
'/v1.0/oauth2/accessToken',
{ appKey: CLIENT_ID, appSecret: CLIENT_SECRET },
);
this.token = accessToken;
this.expiresAt = Date.now() + expireIn * 1000;
clearTimeout(this.refreshTimer);
const refreshInMs = Math.max((expireIn - TOKEN_REFRESH_BUFFER) * 1000, 60_000);
this.refreshTimer = setTimeout(() => {
this.get().catch((e) => console.error('[dingtalk] Token refresh failed:', e.message));
}, refreshInMs);
console.log(`[dingtalk] Token refreshed, valid for ${expireIn}s`);
return this.token;
}
destroy(): void {
clearTimeout(this.refreshTimer);
}
}
// ── Per-user Serial Queue — max depth guard prevents memory runaway ────────
const USER_QUEUE_MAX_DEPTH = 5; // max pending tasks per user
class UserQueue {
private readonly tails = new Map<string, Promise<void>>();
private readonly depths = new Map<string, number>();
/** Returns false if queue is full (caller should reject message) */
enqueue(userId: string, task: () => Promise<void>): boolean {
const depth = this.depths.get(userId) ?? 0;
if (depth >= USER_QUEUE_MAX_DEPTH) return false;
this.depths.set(userId, depth + 1);
const tail = this.tails.get(userId) ?? Promise.resolve();
const next = tail
.then(task)
.catch((e) => console.error(`[dingtalk] Queue task error (${userId}):`, e.message))
.finally(() => {
const remaining = (this.depths.get(userId) ?? 1) - 1;
if (remaining <= 0) {
this.depths.delete(userId);
this.tails.delete(userId);
} else {
this.depths.set(userId, remaining);
}
});
this.tails.set(userId, next);
return true;
}
}
// ── Message Deduplication — Map<msgId, timestamp>, O(1) lookup + TTL evict
class MsgDedup {
private readonly seen = new Map<string, number>(); // msgId → insertedAt ms
private readonly ttlMs: number;
private readonly maxSize: number;
constructor(ttlMs = 10 * 60 * 1000, maxSize = 10_000) { // 10 min TTL, 10k cap
this.ttlMs = ttlMs;
this.maxSize = maxSize;
}
has(msgId: string): boolean {
const t = this.seen.get(msgId);
if (t === undefined) return false;
if (Date.now() - t > this.ttlMs) { this.seen.delete(msgId); return false; }
return true;
}
add(msgId: string): void {
if (this.seen.size >= this.maxSize) this.evict();
this.seen.set(msgId, Date.now());
}
private evict(): void {
const cutoff = Date.now() - this.ttlMs;
for (const [id, t] of this.seen) {
if (t < cutoff) this.seen.delete(id);
}
// If still over limit, remove oldest entries
if (this.seen.size >= this.maxSize) {
const oldest = [...this.seen.entries()]
.sort((a, b) => a[1] - b[1])
.slice(0, Math.floor(this.maxSize / 2));
for (const [id] of oldest) this.seen.delete(id);
}
}
}
// ── Per-user Rate Limiter — sliding window, 10 msg/min ───────────────────
class RateLimiter {
private readonly windows = new Map<string, number[]>();
allow(userId: string): boolean {
const now = Date.now();
const window = 60_000;
const timestamps = (this.windows.get(userId) ?? []).filter((t) => now - t < window);
if (timestamps.length >= RATE_LIMIT_PER_MIN) return false;
timestamps.push(now);
this.windows.set(userId, timestamps);
return true;
}
}
// ── OpenClaw Client Wrapper — auto-reconnect on disconnect ────────────────
class ResilientOcClient {
private client: OpenClawClient;
private reconnecting = false;
constructor() {
this.client = new OpenClawClient(OC_GATEWAY, OC_TOKEN);
}
async connect(): Promise<void> {
let delay = OC_RECONNECT_BASE_MS;
for (let i = 0; i < OC_GATEWAY_RETRIES; i++) {
try {
await this.client.connect();
console.log('[dingtalk] Connected to OpenClaw gateway');
this.setupAutoReconnect();
return;
} catch {
console.warn(`[dingtalk] OC gateway not ready (${i + 1}/${OC_GATEWAY_RETRIES}), retry in ${delay}ms`);
await sleep(delay);
delay = Math.min(delay * 1.5, OC_RECONNECT_MAX_MS);
}
}
throw new Error('Could not connect to OpenClaw gateway');
}
isConnected(): boolean {
return this.client.isConnected();
}
async rpc(method: string, params?: unknown, timeoutMs?: number): Promise<unknown> {
return this.client.rpc(method, params, timeoutMs);
}
private setupAutoReconnect(): void {
// Poll every 10s; the reconnecting flag prevents loop stacking
const check = setInterval(() => {
if (this.client.isConnected() || this.reconnecting) return;
// Atomically claim the reconnect slot
this.reconnecting = true;
console.warn('[dingtalk] OC gateway disconnected — reconnecting...');
// Replace client before connect() so stale callbacks are dropped
this.client = new OpenClawClient(OC_GATEWAY, OC_TOKEN);
let delay = OC_RECONNECT_BASE_MS;
const attempt = async (): Promise<void> => {
try {
await this.client.connect();
console.log('[dingtalk] OC gateway reconnected');
this.reconnecting = false;
// setupAutoReconnect is already running via the outer interval — no re-register needed
} catch {
delay = Math.min(delay * 1.5, OC_RECONNECT_MAX_MS);
console.warn(`[dingtalk] OC reconnect failed, retry in ${delay}ms`);
setTimeout(attempt, delay);
}
};
attempt();
}, 10_000);
check.unref();
}
}
// ── DingTalk Reply — chunked + webhook expiry guard ───────────────────────
function replyToUser(sessionWebhook: string, webhookExpiry: number, content: string): void {
if (Date.now() > webhookExpiry) {
console.warn('[dingtalk] sessionWebhook expired, cannot reply');
return;
}
// Sanitize: never expose stack traces or internal paths
const safe = content.replace(/\s+at\s+\S+:\d+:\d+/g, '').trim();
// Chunk into 4800-char segments
const chunks: string[] = [];
for (let i = 0; i < safe.length; i += DINGTALK_MAX_CHARS) {
chunks.push(safe.slice(i, i + DINGTALK_MAX_CHARS));
}
if (chunks.length === 0) chunks.push('(空响应)');
// Send chunks sequentially with 200ms gap to preserve order
chunks.forEach((chunk, idx) => {
setTimeout(() => sendWebhook(sessionWebhook, chunk), idx * 200);
});
}
function sendWebhook(webhook: string, content: string): void {
const url = new URL(webhook);
const body = JSON.stringify({ msgtype: 'text', text: { content } });
const req = https.request(
{
hostname: url.hostname,
path: url.pathname + url.search,
method: 'POST',
headers: { 'Content-Type': 'application/json', 'Content-Length': Buffer.byteLength(body) },
timeout: 10_000,
},
(res) => {
res.resume();
if (res.statusCode && res.statusCode >= 400) {
console.error(`[dingtalk] Webhook reply ${res.statusCode}`);
}
},
);
req.on('timeout', () => { req.destroy(); console.error('[dingtalk] Webhook reply timeout'); });
req.on('error', (e) => console.error('[dingtalk] Webhook error:', e.message));
req.write(body);
req.end();
}
// ── DingTalk Stream Connection — reconnect with exponential backoff ───────
interface DtFrame {
type: string;
headers: Record<string, string>;
data?: string;
}
interface BotMsg {
senderStaffId: string;
sessionWebhook: string;
sessionWebhookExpiredTime: number;
text?: { content: string };
conversationId: string;
msgId: string;
}
class DingTalkStream {
private ws: WebSocket | null = null;
private reconnectDelay = WS_RECONNECT_BASE_MS;
private stopping = false;
constructor(
private readonly tokenMgr: TokenManager,
private readonly ocClient: ResilientOcClient,
private readonly queue: UserQueue,
private readonly dedup: MsgDedup,
private readonly rateLimit: RateLimiter,
) {}
async start(): Promise<void> {
if (this.stopping) return;
let token: string;
try {
token = await this.tokenMgr.get();
} catch (e: any) {
console.error('[dingtalk] Cannot get access token:', e.message);
this.scheduleReconnect();
return;
}
const subBody = JSON.stringify({
clientId: CLIENT_ID,
clientSecret: CLIENT_SECRET,
subscriptions: [
{ type: 'CALLBACK', topic: '/v1.0/im/bot/messages/get' },
],
ua: 'it0-dingtalk-channel/1.0',
localIp: '127.0.0.1',
});
let wsInfo: { endpoint: string; ticket: string };
try {
wsInfo = await httpPost<{ endpoint: string; ticket: string }>(
'api.dingtalk.com',
'/v1.0/gateway/connections/open',
JSON.parse(subBody),
{ 'x-acs-dingtalk-access-token': token },
);
} catch (e: any) {
console.error('[dingtalk] Failed to get stream endpoint:', e.message);
this.scheduleReconnect();
return;
}
const ws = new WebSocket(`${wsInfo.endpoint}?ticket=${encodeURIComponent(wsInfo.ticket)}`);
this.ws = ws;
ws.on('open', () => {
console.log('[dingtalk] Stream connected');
this.reconnectDelay = WS_RECONNECT_BASE_MS; // reset backoff on success
});
ws.on('message', (raw) => {
let frame: DtFrame;
try { frame = JSON.parse(raw.toString()); } catch { return; }
this.handleFrame(ws, frame);
});
ws.on('close', (code, reason) => {
if (this.stopping) return;
console.warn(`[dingtalk] Stream closed (${code}: ${reason.toString()})`);
this.scheduleReconnect();
});
ws.on('error', (e) => {
console.error('[dingtalk] Stream WS error:', e.message);
// 'close' will fire after this, triggering reconnect
});
}
private handleFrame(ws: WebSocket, frame: DtFrame): void {
// Heartbeat
if (frame.type === 'PING') {
ws.send(JSON.stringify({ code: 200, headers: frame.headers, message: 'OK', data: '' }));
return;
}
// Bot message
if (frame.type === 'CALLBACK' &&
frame.headers?.['topic'] === '/v1.0/im/bot/messages/get' &&
frame.data) {
let msg: BotMsg;
try { msg = JSON.parse(frame.data); } catch { return; }
// Ack DingTalk immediately (must be within 1.5s)
ws.send(JSON.stringify({ code: 200, headers: frame.headers, message: 'OK', data: '' }));
this.dispatchMessage(msg);
}
}
private dispatchMessage(msg: BotMsg): void {
const userId = msg.senderStaffId;
const prompt = msg.text?.content?.trim() ?? '';
if (!prompt) return;
// Dedup
if (this.dedup.has(msg.msgId)) {
console.log(`[dingtalk] Duplicate msgId ${msg.msgId}, skipped`);
return;
}
this.dedup.add(msg.msgId);
// Rate limit
if (!this.rateLimit.allow(userId)) {
console.warn(`[dingtalk] Rate limit hit for ${userId}`);
replyToUser(msg.sessionWebhook, msg.sessionWebhookExpiredTime,
'消息频率过高请稍后再试每分钟最多10条。');
return;
}
console.log(`[dingtalk] Queuing msg from ${userId}: "${prompt.slice(0, 60)}"`);
// Enqueue into per-user serial queue (max depth guard)
const accepted = this.queue.enqueue(userId, async () => {
if (!this.ocClient.isConnected()) {
replyToUser(msg.sessionWebhook, msg.sessionWebhookExpiredTime,
'智能体正在启动,请稍等片刻后重试。');
return;
}
let result: unknown;
try {
// chat.send — Protocol v3 correct method name
// sessionKey "agent:main:dt-<userId>" = per-user session isolation
result = await this.ocClient.rpc('chat.send', {
sessionKey: `agent:main:dt-${userId}`,
message: prompt,
timeoutSeconds: 25,
}, 30_000); // rpc timeout = 25s agent + 5s buffer
} catch (e: any) {
console.error(`[dingtalk] OC rpc error for ${userId}:`, e.message);
replyToUser(msg.sessionWebhook, msg.sessionWebhookExpiredTime,
'处理请求时出现错误,请稍后重试。');
return;
}
const reply = typeof result === 'string' ? result : JSON.stringify(result, null, 2);
replyToUser(msg.sessionWebhook, msg.sessionWebhookExpiredTime, reply);
});
if (!accepted) {
console.warn(`[dingtalk] Queue full for ${userId}, rejecting message`);
replyToUser(msg.sessionWebhook, msg.sessionWebhookExpiredTime,
'当前请求排队已满最多5条请等待前面的任务完成后再发送。');
}
}
private scheduleReconnect(): void {
if (this.stopping) return;
console.log(`[dingtalk] Reconnecting in ${this.reconnectDelay}ms...`);
setTimeout(() => this.start(), this.reconnectDelay);
this.reconnectDelay = Math.min(this.reconnectDelay * 2, WS_RECONNECT_MAX_MS);
}
stop(): void {
this.stopping = true;
this.ws?.close();
}
}
// ── HTTP helper (no external deps) ───────────────────────────────────────────
function httpPost<T>(
hostname: string,
path: string,
payload: object,
extraHeaders: Record<string, string> = {},
): Promise<T> {
return new Promise((resolve, reject) => {
const body = JSON.stringify(payload);
const req = https.request(
{
hostname,
path,
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(body),
...extraHeaders,
},
timeout: 10_000,
},
(res) => {
let data = '';
res.on('data', (c) => (data += c));
res.on('end', () => {
try {
const json = JSON.parse(data);
if (res.statusCode && res.statusCode >= 400) {
reject(new Error(`HTTP ${res.statusCode}: ${data.slice(0, 200)}`));
} else {
resolve(json as T);
}
} catch (e) {
reject(e);
}
});
},
);
req.on('timeout', () => { req.destroy(); reject(new Error('Request timeout')); });
req.on('error', reject);
req.write(body);
req.end();
});
}
function sleep(ms: number): Promise<void> {
return new Promise((r) => setTimeout(r, ms));
}
// ── Main ──────────────────────────────────────────────────────────────────────
const tokenMgr = new TokenManager();
const ocClient = new ResilientOcClient();
const queue = new UserQueue();
const dedup = new MsgDedup();
const rateLimit = new RateLimiter();
const stream = new DingTalkStream(tokenMgr, ocClient, queue, dedup, rateLimit);
// Graceful shutdown
function shutdown(signal: string): void {
console.log(`[dingtalk] Received ${signal}, shutting down gracefully...`);
stream.stop();
tokenMgr.destroy();
setTimeout(() => process.exit(0), 3_000);
}
process.once('SIGTERM', () => shutdown('SIGTERM'));
process.once('SIGINT', () => shutdown('SIGINT'));
console.log('[dingtalk] Starting DingTalk channel...');
ocClient.connect()
.then(() => stream.start())
.catch((e) => {
console.error('[dingtalk] Fatal startup error:', e.message);
process.exit(1);
});

View File

@ -61,16 +61,21 @@ app.get('/status', (_req, res) => {
});
// Submit a task to OpenClaw
// Uses chat.send (Protocol v3). sessionKey format: "agent:main:<name>"
// timeoutSeconds: how long to wait for agent reply (0=fire-and-forget, default 25)
app.post('/task', async (req, res) => {
if (!ocClient.isConnected()) {
res.status(503).json({ error: 'Gateway not connected' });
return;
}
try {
const result = await ocClient.rpc('agent.run', {
prompt: req.body.prompt,
sessionId: req.body.sessionId,
});
const sessionKey = req.body.sessionKey ?? `agent:main:${req.body.sessionId ?? 'main'}`;
const timeoutSeconds: number = req.body.timeoutSeconds ?? 25;
const result = await ocClient.rpc(
'chat.send',
{ sessionKey, message: req.body.prompt, timeoutSeconds },
(timeoutSeconds + 5) * 1000, // rpc timeout = agent timeout + 5s buffer
);
res.json({ ok: true, result });
} catch (err: any) {
res.status(500).json({ error: err.message });
@ -84,21 +89,21 @@ app.get('/sessions', async (_req, res) => {
return;
}
try {
const result = await ocClient.rpc('sessions.list');
const result = await ocClient.rpc('sessions.list', { limit: 100 });
res.json(result);
} catch (err: any) {
res.status(500).json({ error: err.message });
}
});
// Basic metrics (token usage etc.) — OpenClaw stores these in memory
// Gateway status (metrics.get does not exist in OpenClaw Protocol v3)
app.get('/metrics', async (_req, res) => {
if (!ocClient.isConnected()) {
res.status(503).json({ error: 'Gateway not connected' });
return;
}
try {
const result = await ocClient.rpc('metrics.get');
const result = await ocClient.rpc('gateway.status');
res.json(result);
} catch (err: any) {
res.status(500).json({ error: err.message });

View File

@ -1,103 +1,207 @@
/**
* OpenClaw Gateway WebSocket client.
* Connects to the local OpenClaw gateway (ws://127.0.0.1:18789)
* and provides a simple RPC interface.
* OpenClaw Gateway WebSocket client Protocol v3
*
* Correct wire format (verified from openclaw/openclaw source):
* Request: { type:"req", id, method, params?, idempotencyKey? }
* Response: { type:"res", id, ok:boolean, payload?, error? }
* Event: { type:"event", event, payload?, seq? }
*
* Authentication:
* 1. Server sends "connect.challenge" event with { nonce, timestamp }
* 2. Client signs the nonce with an ephemeral Ed25519 key
* 3. Client sends "connect" req with auth.token + device.{id,publicKey,signature,signedAt}
* 4. Server responds with ok:true connection is ready
*/
import WebSocket from 'ws';
interface RpcFrame {
import WebSocket from 'ws';
import * as crypto from 'crypto';
// ── Wire types ────────────────────────────────────────────────────────────────
interface ReqFrame {
type: 'req';
id: string;
method: string;
params?: unknown;
idempotencyKey?: string;
}
interface RpcResponse {
interface ResFrame {
type: 'res';
id: string;
result?: unknown;
error?: { message: string };
ok: boolean;
payload?: unknown;
error?: { code: string; message: string; retryable?: boolean; retryAfterMs?: number };
}
interface EventFrame {
type: 'event';
event: string;
payload?: unknown;
seq?: number;
}
type Frame = ReqFrame | ResFrame | EventFrame;
// ── Client ────────────────────────────────────────────────────────────────────
export class OpenClawClient {
private ws: WebSocket | null = null;
private pending = new Map<string, { resolve: (v: unknown) => void; reject: (e: Error) => void }>();
private readonly gatewayUrl: string;
private readonly token: string;
private msgCounter = 0;
private pending = new Map<
string,
{ resolve: (v: unknown) => void; reject: (e: Error) => void }
>();
private connected = false;
private msgCounter = 0;
constructor(gatewayUrl: string, token: string) {
this.gatewayUrl = gatewayUrl;
this.token = token;
}
// Ephemeral Ed25519 key pair — generated once per client instance
private readonly deviceId = crypto.randomUUID();
private readonly keyPair = crypto.generateKeyPairSync('ed25519');
constructor(
private readonly gatewayUrl: string,
private readonly token: string,
) {}
connect(): Promise<void> {
return new Promise((resolve, reject) => {
this.ws = new WebSocket(this.gatewayUrl, {
headers: { Authorization: `Bearer ${this.token}` },
});
const ws = new WebSocket(this.gatewayUrl);
this.ws = ws;
this.ws.once('open', () => {
this.connected = true;
resolve();
});
// Whether the handshake promise has settled (prevents double-settle)
let settled = false;
const settle = (err?: Error) => {
if (settled) return;
settled = true;
if (err) reject(err);
else resolve();
};
this.ws.once('error', (err) => {
if (!this.connected) reject(err);
});
this.ws.on('message', (raw) => {
ws.on('message', (raw) => {
let frame: Frame;
try {
const frame: RpcResponse = JSON.parse(raw.toString());
const pending = this.pending.get(frame.id);
if (!pending) return;
this.pending.delete(frame.id);
if (frame.error) {
pending.reject(new Error(frame.error.message));
} else {
pending.resolve(frame.result);
}
frame = JSON.parse(raw.toString());
} catch {
// ignore malformed frames
return;
}
this.handleFrame(frame, settle);
});
this.ws.on('close', () => {
ws.once('error', (err) => {
if (!this.connected) settle(err);
});
ws.on('close', () => {
this.connected = false;
// Reject all pending RPCs
for (const [, p] of this.pending) {
p.reject(new Error('OpenClaw gateway disconnected'));
}
this.pending.clear();
});
// Safety timeout for handshake (gateway has 10s timeout itself)
setTimeout(() => settle(new Error('Handshake timeout')), 12_000);
});
}
private handleFrame(
frame: Frame,
settle: (err?: Error) => void,
): void {
// Step 1: Server sends challenge
if (frame.type === 'event' && frame.event === 'connect.challenge') {
const { nonce } = frame.payload as { nonce: string; timestamp: number };
this.sendHandshake(nonce).catch((e) => settle(e));
return;
}
// Step 2: Server acknowledges handshake
if (frame.type === 'res' && frame.id === '__connect__') {
if (frame.ok) {
this.connected = true;
settle();
} else {
settle(new Error(`OpenClaw handshake rejected: ${frame.error?.message ?? frame.error?.code}`));
}
return;
}
// Regular RPC responses
if (frame.type === 'res') {
const p = this.pending.get(frame.id);
if (!p) return;
this.pending.delete(frame.id);
if (frame.ok) {
p.resolve(frame.payload ?? null);
} else {
p.reject(new Error(frame.error?.message ?? frame.error?.code ?? 'RPC error'));
}
}
// Events are ignored by default (not needed for the bridge use case)
}
private async sendHandshake(nonce: string): Promise<void> {
// Sign the nonce (hex string → Buffer) with our ephemeral private key
const nonceBuffer = Buffer.from(nonce, 'hex');
const signature = crypto.sign(null, nonceBuffer, this.keyPair.privateKey);
const pubKeyDer = this.keyPair.publicKey.export({ type: 'spki', format: 'der' });
const req: ReqFrame = {
type: 'req',
id: '__connect__',
method: 'connect',
params: {
minProtocol: 3,
maxProtocol: 3,
client: {
id: this.deviceId,
version: '1.0.0',
platform: 'node',
mode: 'channel',
},
role: 'operator',
scopes: ['operator.read', 'operator.write'],
auth: { token: this.token },
device: {
id: this.deviceId,
publicKey: pubKeyDer.toString('base64'),
signature: signature.toString('base64'),
signedAt: Date.now(),
},
},
};
this.ws!.send(JSON.stringify(req));
}
isConnected(): boolean {
return this.connected && this.ws?.readyState === WebSocket.OPEN;
}
rpc(method: string, params?: unknown): Promise<unknown> {
rpc(method: string, params?: unknown, timeoutMs = 30_000): Promise<unknown> {
return new Promise((resolve, reject) => {
if (!this.isConnected()) {
reject(new Error('Not connected to OpenClaw gateway'));
return;
}
const id = String(++this.msgCounter);
const frame: RpcFrame = { id, method, params };
const frame: ReqFrame = { type: 'req', id, method, params };
this.pending.set(id, { resolve, reject });
this.ws!.send(JSON.stringify(frame));
// Timeout after 30s
setTimeout(() => {
const timer = setTimeout(() => {
if (this.pending.has(id)) {
this.pending.delete(id);
reject(new Error(`RPC timeout: ${method}`));
}
}, 30_000);
}, timeoutMs);
// Don't keep process alive just for the timer
if (timer.unref) timer.unref();
});
}
close(): void {
this.ws?.close();
this.connected = false;
}
}

View File

@ -0,0 +1,8 @@
#!/bin/sh
# Start DingTalk channel only if credentials are configured.
# When not configured, exits cleanly so supervisord marks it as EXITED (not ERROR).
if [ -z "$DINGTALK_CLIENT_ID" ]; then
echo "[dingtalk] DINGTALK_CLIENT_ID not set — channel disabled, exiting."
exit 0
fi
exec node /app/bridge/dist/channels/dingtalk.js