it0/packages/openclaw-bridge/CHANNEL_DEV_GUIDE.md

16 KiB
Raw Blame History

OpenClaw Channel 插件开发指南

基于钉钉 Channel 插件(src/channels/dingtalk.ts)的完整开发经验总结。 开发新 Channel 时,本文档是你的第一参考。


一、整体架构

每个 OpenClaw 实例是一个独立 Docker 容器(hailin168/openclaw-bridge:latest)。 容器内有三个进程,由 supervisord 管理:

容器
├── openclaw gateway      端口 18789 (内部)  — 核心 AI Agent 进程
├── it0-bridge            端口 3000  (对外)  — IT0 管理 REST API
└── dingtalk-channel      无对外端口         — 第三方 Channel 进程(可选)

Channel 插件的职责:

  1. 连接第三方平台(接收用户消息)
  2. 转发给 OpenClaw Gateway通过 WebSocket RPC
  3. 将 OpenClaw 响应回发给用户

Channel 插件不需要任何对外端口,完全依赖出向连接。


二、OpenClaw Gateway 协议 (Protocol v3)

⚠️ 这是最重要的部分。协议细节必须准确否则运行时全部失败TypeScript 编译器无法检测。

2.1 Wire 帧格式

// 请求帧Client → Gateway
interface ReqFrame {
  type: 'req';           // 必须是字面量 'req'
  id: string;            // 唯一请求 ID自增字符串即可
  method: string;        // RPC 方法名
  params?: unknown;      // 方法参数
  idempotencyKey?: string; // 可选,幂等键
}

// 响应帧Gateway → Client
interface ResFrame {
  type: 'res';
  id: string;            // 对应请求的 id
  ok: boolean;           // 成功/失败标志
  payload?: unknown;     // 成功时的响应数据
  error?: {
    code: string;
    message: string;
    retryable?: boolean;
    retryAfterMs?: number;
  };
}

// 事件帧Gateway → Client主动推送
interface EventFrame {
  type: 'event';
  event: string;         // 事件名
  payload?: unknown;
  seq?: number;          // 单调递增序号
}

2.2 连接握手(必须完成,否则所有 RPC 都会被拒绝)

OpenClaw 使用 Challenge-Response + Ed25519 签名 认证。流程:

Client ──────── WebSocket Connect ──────────────────→ Gateway
Gateway ──── event: "connect.challenge" { nonce } ──→ Client
Client ──── req: "connect" { auth.token, device } ──→ Gateway
Gateway ─── res: "connect" { ok: true } ────────────→ Client
                  ↑ 这之后才能发送任何 RPC 请求

关键实现细节:

// 1. 用 Node.js 内置 crypto 生成临时 Ed25519 密钥对(每次连接生成新的)
const keyPair = crypto.generateKeyPairSync('ed25519');

// 2. 用私钥对 nonce (hex string → Buffer) 签名
const nonceBuffer = Buffer.from(nonce, 'hex');
const signature = crypto.sign(null, nonceBuffer, keyPair.privateKey);

// 3. 公钥导出为 DER 格式 base64
const pubKeyDer = keyPair.publicKey.export({ type: 'spki', format: 'der' });

// 4. 发送 connect 请求
{
  type: 'req',
  id: '__connect__',
  method: 'connect',
  params: {
    minProtocol: 3, maxProtocol: 3,
    client: { id: deviceId, version: '1.0.0', platform: 'node', mode: 'channel' },
    role: 'operator',
    scopes: ['operator.read', 'operator.write'],
    auth: { token: OPENCLAW_GATEWAY_TOKEN },  // 环境变量注入的 token
    device: {
      id: deviceId,
      publicKey: pubKeyDer.toString('base64'),
      signature: signature.toString('base64'),
      signedAt: Date.now(),
    }
  }
}

握手超时Gateway 有 10s 超时,客户端应设 12s 安全超时。

2.3 关键 RPC 方法(经官方源码验证)

功能 方法名 关键参数
发消息给 Agent chat.send { sessionKey, message, timeoutSeconds }
列出会话 sessions.list { limit, kinds?, activeMinutes? }
获取会话历史 sessions.history { sessionKey, messageLimit }
中止当前任务 chat.abort { sessionKey }
网关状态 gateway.status
健康检查 gateway.health

以下方法不存在,不要使用:

  • agent.run — 根本不存在
  • metrics.get — 不存在,用 gateway.status 替代

2.4 Session Key 格式

格式:"type:agentName:sessionName"

agent:main:main           — 默认主 session
agent:main:dt-{userId}    — 钉钉用户专属 session每用户独立
agent:main:tg-{chatId}    — Telegram chat 专属 session
agent:main:wechat-{openId}  — 微信用户专属 session

每个 OpenClaw 实例默认只有一个 agent "main"。 Session name 部分可以自定义Gateway 会自动创建不存在的 session。

2.5 chat.send 的响应结构

// payload 结构
{
  runId: string;
  status: 'started' | 'in_flight' | 'ok' | 'timeout';
  reply?: string;   // 当 timeoutSeconds > 0 且 status=ok 时有值
  error?: string;   // 当 status=timeout 时有值
}

// 从 payload 提取回复文本的正确方式:
const payload = result as { status: string; reply?: string };
const replyText = payload.reply ?? '(任务已提交,正在处理)';

三、钉钉 Channel 实现要点

3.1 使用 Stream 模式(无需公网 IP

钉钉官方提供两种接入方式:

  • HTTP 回调模式:需要公网 IP + 备案,不适合容器部署
  • Stream 模式:主动连接钉钉服务器,容器内部署完美适配 ✓

Stream 模式连接流程:

1. POST https://api.dingtalk.com/v1.0/oauth2/accessToken
   → { accessToken, expireIn: 7200 }

2. POST https://api.dingtalk.com/v1.0/gateway/connections/open
   Headers: x-acs-dingtalk-access-token: {accessToken}
   Body: { clientId, clientSecret, subscriptions: [{type:'CALLBACK', topic:'/v1.0/im/bot/messages/get'}] }
   → { endpoint, ticket }

3. WSS connect: {endpoint}?ticket={ticket}
   → 保持长连接,接收消息

3.2 消息帧格式

// 服务器推送的消息帧
{
  type: 'CALLBACK',   // or 'PING', 'EVENT', 'SYSTEM'
  headers: { topic: '/v1.0/im/bot/messages/get', ... },
  data: string   // JSON string
}

// data 解析后的 bot 消息结构
{
  senderStaffId: string,       // 发送者工号
  sessionWebhook: string,      // 用于回复的 Webhook URL有过期时间
  sessionWebhookExpiredTime: number,  // 毫秒时间戳
  text: { content: string },   // 消息文本
  conversationId: string,      // 群/单聊会话 ID
  msgId: string                // 消息唯一 ID用于去重
}

// 必须在收到消息后 1.5 秒内 ACK否则钉钉会重发
ws.send(JSON.stringify({ code: 200, headers: frame.headers, message: 'OK', data: '' }));

3.3 已验证的钉钉 API 限制

限制 数值
Access Token 有效期 7200 秒2小时
单条消息字符上限 5000 字符
ACK 超时 1.5 秒
sessionWebhook 有效期 约 24 小时(群消息更短)

3.4 回复钉钉消息(通过 sessionWebhook

function sendWebhook(webhook: string, content: string): void {
  const url = new URL(webhook);
  const body = JSON.stringify({ msgtype: 'text', text: { content } });
  // 用 Node.js 内置 https 模块,无外部依赖
  const req = https.request({ hostname: url.hostname, path: url.pathname + url.search, method: 'POST', ... });
}

超过 4800 字符需分块发送,每块间隔 200ms 保证顺序。


四、生产级鲁棒性模式(从错误中总结)

4.1 Token 自动刷新 + 防竞态

class TokenManager {
  private refreshPromise: Promise<string> | null = null; // 互斥锁

  async get(): Promise<string> {
    if (Date.now() < this.expiresAt - BUFFER_MS) return this.token;
    // 关键:多个并发调用只触发一次刷新
    if (this.refreshPromise) return this.refreshPromise;
    this.refreshPromise = this.doRefresh().finally(() => { this.refreshPromise = null; });
    return this.refreshPromise;
  }
}

4.2 消息去重O(1) + TTL

class MsgDedup {
  private readonly seen = new Map<string, number>(); // msgId → timestamp

  has(msgId: string): boolean {
    const t = this.seen.get(msgId);
    if (t === undefined) return false;
    if (Date.now() - t > this.ttlMs) { this.seen.delete(msgId); return false; }
    return true;
  }
}
// ❌ 不要用 Array.includes() — O(n)5000条时慢到不可用

4.3 每用户串行队列 + 深度上限

class UserQueue {
  private readonly tails  = new Map<string, Promise<void>>();
  private readonly depths = new Map<string, number>();

  enqueue(userId: string, task: () => Promise<void>): boolean {
    if ((this.depths.get(userId) ?? 0) >= MAX_DEPTH) return false; // 拒绝
    // Promise 链串行执行,不同用户并发互不影响
    ...
  }
}
// MAX_DEPTH = 5用户排队超5条立即告知防内存泄漏

4.4 WebSocket 断线指数退避重连

private reconnectDelay = 2_000;

private scheduleReconnect(): void {
  setTimeout(() => this.start(), this.reconnectDelay);
  this.reconnectDelay = Math.min(this.reconnectDelay * 2, 60_000);
  // 成功连接后在 ws.on('open') 里重置this.reconnectDelay = 2_000;
}

4.5 OpenClaw Gateway 重连防多开

private setupAutoReconnect(): void {
  const check = setInterval(() => {
    if (this.client.isConnected() || this.reconnecting) return; // 原子守门
    this.reconnecting = true;
    this.client = new OpenClawClient(...); // 先替换,再重连
    const attempt = async () => {
      try { await this.client.connect(); this.reconnecting = false; }
      catch { setTimeout(attempt, delay *= 1.5); }  // 递归重试,不叠加
    };
    attempt();
  }, 10_000);
  check.unref(); // 不阻止进程退出
}
// ❌ 不要在 catch 里直接重调 this.connect(),会叠加多个重连 loop

4.6 优雅关闭

process.once('SIGTERM', () => {
  stream.stop();
  tokenMgr.destroy(); // 清除 setTimeout 防止内存泄漏
  setTimeout(() => process.exit(0), 3_000); // 3秒内完成清理
});

五、supervisord 集成模式

容器内可选 Channel 的标准模式:

[program:dingtalk-channel]
command=/app/bridge/start-dingtalk.sh   ; 包装脚本,未配置时 exit 0
autostart=true
autorestart=unexpected   ; ← 关键:不是 true
exitcodes=0              ; ← 关键exit 0 = 正常退出,不重启
startretries=5
startsecs=10

包装脚本模板 (start-xxx.sh)

#!/bin/sh
if [ -z "$XXX_CLIENT_ID" ]; then
  echo "[xxx] Not configured — channel disabled."
  exit 0      # supervisord 看到 exit 0 → EXITED不重启
fi
exec node /app/bridge/dist/channels/xxx.js

六、开发新 Channel 的步骤

Step 1: 确认平台接入机制

问题 必须确认
是否需要公网 IP 需要 → 考虑反向代理不需要WebSocket Stream→ 最佳
是否有官方 SDK 评估SDK 稳定性 vs 自实现控制力
Token 有效期多长? 决定刷新策略
消息字符上限? 决定分块策略
是否有消息重投? 决定去重策略
ACK 超时多长? 必须在超时前 ACK将 AI 处理异步化

Step 2: 创建文件

packages/openclaw-bridge/src/channels/xxx.ts     — 主进程
packages/openclaw-bridge/start-xxx.sh             — 启动包装脚本

Step 3: 复用现有基础设施

import { OpenClawClient } from '../openclaw-client'; // ✓ 直接复用Protocol v3 已正确实现
// 复用以下 classTokenManager、UserQueue、MsgDedup、RateLimiter
// 只需自己实现:平台 WS 连接 + 消息收发逻辑

Step 4: 向 OpenClaw 发消息的标准代码

const result = await ocClient.rpc('chat.send', {
  sessionKey: `agent:main:${PLATFORM_PREFIX}-${userId}`,  // 确保跨平台 session 不冲突
  message: userText,
  timeoutSeconds: 25,
}, 30_000);

const payload = result as { status: string; reply?: string; error?: string };
if (payload.status === 'ok' && payload.reply) {
  await replyToUser(payload.reply);
} else if (payload.status === 'timeout') {
  await replyToUser('处理超时,请稍后重试。');
}

Step 5: 更新 supervisord.conf 和 Dockerfile

supervisord.conf 末尾追加新 program 块(参考钉钉模板)。 在 Dockerfile 中 COPY 新的启动脚本并 chmod +x。

Step 6: 更新 agent-instance-deploy.service.ts

runDeployenvParts 数组中追加新平台的环境变量:

if (xxxClientId) envParts.push(`-e XXX_CLIENT_ID=${xxxClientId}`);

Step 7: 更新 system-prompt-builder.ts

教 iAgent 如何引导用户完成该 Channel 的绑定配置步骤。


七、各平台接入难度预估

平台 接入方式 需要公网IP 预估难度 备注
钉钉 Stream SDK (WSS) 已实现,可直接参考
飞书Lark 事件订阅 WebSocket 官方有 Stream SDK与钉钉结构类似
Telegram Long Polling / Webhook polling/ 是webhook 最简单Bot API 文档极好
WhatsApp Cloud API Webhook 需 Meta 审核,有 webhook 验证
Slack Socket Mode Socket Mode 无需公网,文档好
Discord Gateway WebSocket 官方 gateway 长连接,文档清晰
微信 服务号 / 企业微信 是(公网 + 备案) 最麻烦,需域名验证
Line Messaging API Webhook 与 WhatsApp 类似

优先推荐顺序(按落地难度由易到难):

  1. Telegram — 最快1天内可完成
  2. 飞书 — 国内办公场景,与钉钉代码高度相似
  3. Slack — 国际企业场景
  4. Discord — 开发者/社区场景

八、经验教训(避坑清单)

  1. 永远先读官方源码,不要猜 API 方法名

    • 本项目曾把 chat.send 猜成 agent.rungateway.status 猜成 metrics.getTypeScript 编译通过但 runtime 全部失败
    • 查源码路径github.com/openclaw/openclaw 的 src/gateway/server-methods/
  2. Channel token 必须有自动续期机制

    • 钉钉 Access Token 2小时过期不续期则服务必然中断
    • get() 中加 Promise 互斥锁,防止并发双重刷新
  3. 永远在收到消息后立即 ACK再异步处理 AI

    • 钉钉 ACK 超时 1.5 秒AI 响应可能需要 30 秒
    • ws.send(ACK),再 queue.enqueue() 异步处理
  4. 消息去重用 Map不用 Array

    • Array.includes() 是 O(n)5000条消息时每次查找慢 5ms
    • Map.get() 是 O(1),加 TTL 可精确控制内存
  5. supervisord 的 autorestart 要用 unexpected不是 true

    • autorestart=true + 未配置时 exit 0 → 无限重启循环
    • autorestart=unexpected + exitcodes=0 → 正常退出不重启
  6. 每用户消息队列必须有深度上限

    • 用户刷消息时,无限队列会耗尽内存
    • 建议上限 5 条,超出告知用户"请等待"
  7. OpenClaw 重连不要用嵌套 setInterval + async

    • 多个 interval tick 可能同时触发多个重连 goroutine
    • 用 flag 原子保护 + 单一递归 attempt() 函数
  8. 回复消息要检查 webhook 过期时间

    • 某些平台的 webhook 有效期很短(几分钟到几小时)
    • AI 处理完后要先检查 webhookExpiredTime,过期则不发
  9. 容器内没有 curl用 wget 或 Node.js https 模块

    • hailin168/openclaw-bridge 镜像内无 curl
    • wget -q -O- --post-data=... 是 iAgent 内部调用 API 的正确方式