it0/packages/openclaw-bridge/src/openclaw-client.ts

370 lines
12 KiB
TypeScript

/**
* OpenClaw Gateway WebSocket client — Protocol v3
*
* Correct wire format (verified from openclaw/openclaw source):
* Request: { type:"req", id, method, params?, idempotencyKey? }
* Response: { type:"res", id, ok:boolean, payload?, error? }
* Event: { type:"event", event, payload?, seq? }
*
* Authentication (v3 device-signature protocol):
* 1. Server sends "connect.challenge" event with { nonce, timestamp }
* 2. Client builds payload:
* "v3|{deviceId}|{clientId}|{clientMode}|{role}|{scopes}|{signedAtMs}|{token}|{nonce}|{platform}|"
* 3. Client signs payload (UTF-8) with ephemeral Ed25519 key
* 4. Client sends "connect" req with:
* - client.id = "gateway-client", client.mode = "backend", client.platform = "node"
* - auth.token = OPENCLAW_GATEWAY_TOKEN
* - device.id = SHA256(rawPubKey, hex), device.nonce = nonce
* - device.publicKey = rawPubKey (base64url, 32 bytes)
* - device.signature = base64url(signed payload)
* - device.signedAt = Date.now()
* 5. Server responds with ok:true → connection is ready
*/
import WebSocket from 'ws';
import * as crypto from 'crypto';
// ── Wire types ────────────────────────────────────────────────────────────────
interface ReqFrame {
type: 'req';
id: string;
method: string;
params?: unknown;
idempotencyKey?: string;
}
interface ResFrame {
type: 'res';
id: string;
ok: boolean;
payload?: unknown;
error?: { code: string; message: string; retryable?: boolean; retryAfterMs?: number };
}
interface EventFrame {
type: 'event';
event: string;
payload?: unknown;
seq?: number;
}
type Frame = ReqFrame | ResFrame | EventFrame;
// ── Helpers (mirror openclaw/openclaw internals) ──────────────────────────────
function base64UrlEncode(buf: Buffer): string {
return buf.toString('base64').replace(/\+/g, '-').replace(/\//g, '_').replace(/=+$/g, '');
}
// Ed25519 SPKI prefix (ASN.1 DER) — 12 bytes
const ED25519_SPKI_PREFIX = Buffer.from('302a300506032b6570032100', 'hex');
function getRawPublicKey(keyObject: crypto.KeyObject): Buffer {
const spki = keyObject.export({ type: 'spki', format: 'der' }) as Buffer;
// Raw key is the 32 bytes after the 12-byte SPKI prefix
return spki.subarray(ED25519_SPKI_PREFIX.length);
}
// ── Client ────────────────────────────────────────────────────────────────────
export class OpenClawClient {
private ws: WebSocket | null = null;
private pending = new Map<
string,
{ resolve: (v: unknown) => void; reject: (e: Error) => void; expectFinal: boolean }
>();
// Event subscriptions: event name → list of handlers
private eventHandlers = new Map<string, Array<(payload: unknown) => void>>();
private connected = false;
private msgCounter = 0;
// Ephemeral Ed25519 key pair — generated once per client instance
private readonly keyPair = crypto.generateKeyPairSync('ed25519');
// Raw 32-byte public key (base64url) — used as device.publicKey
private readonly rawPubKeyB64Url: string;
// device.id = SHA256(rawPubKey, hex) — required by openclaw gateway
private readonly deviceId: string;
constructor(
private readonly gatewayUrl: string,
private readonly token: string,
) {
const rawPub = getRawPublicKey(this.keyPair.publicKey);
this.rawPubKeyB64Url = base64UrlEncode(rawPub);
this.deviceId = crypto.createHash('sha256').update(rawPub).digest('hex');
}
connect(): Promise<void> {
return new Promise((resolve, reject) => {
const ws = new WebSocket(this.gatewayUrl);
this.ws = ws;
// Whether the handshake promise has settled (prevents double-settle)
let settled = false;
const settle = (err?: Error) => {
if (settled) return;
settled = true;
if (err) reject(err);
else resolve();
};
ws.on('message', (raw) => {
let frame: Frame;
try {
frame = JSON.parse(raw.toString());
} catch {
return;
}
this.handleFrame(frame, settle);
});
ws.once('error', (err) => {
if (!this.connected) settle(err);
});
ws.on('close', () => {
this.connected = false;
for (const [, p] of this.pending) {
p.reject(new Error('OpenClaw gateway disconnected'));
}
this.pending.clear();
});
// Safety timeout for handshake (gateway has 10s timeout itself)
setTimeout(() => settle(new Error('Handshake timeout')), 12_000);
});
}
private handleFrame(
frame: Frame,
settle: (err?: Error) => void,
): void {
// Step 1: Server sends challenge
if (frame.type === 'event' && frame.event === 'connect.challenge') {
const { nonce } = frame.payload as { nonce: string; timestamp: number };
this.sendHandshake(nonce).catch((e) => settle(e));
return;
}
// Step 2: Server acknowledges handshake
if (frame.type === 'res' && frame.id === '__connect__') {
if (frame.ok) {
this.connected = true;
settle();
} else {
settle(new Error(`OpenClaw handshake rejected: ${frame.error?.message ?? frame.error?.code}`));
}
return;
}
// Regular RPC responses
if (frame.type === 'res') {
const p = this.pending.get(frame.id);
if (!p) return;
// When expectFinal is set, skip intermediate responses and wait for the final result.
// chat.send sends intermediate frames with status="accepted"|"started"|"running"
// and stream frames with state="delta". The final frame has state="final"|"aborted"|"error".
const payload = frame.payload as any;
const status = payload?.status;
const state = payload?.state;
if (p.expectFinal) {
const isIntermediate =
status === 'accepted' || status === 'started' || status === 'running' ||
state === 'delta';
if (isIntermediate) return;
}
this.pending.delete(frame.id);
if (frame.ok) {
p.resolve(frame.payload ?? null);
} else {
p.reject(new Error(frame.error?.message ?? frame.error?.code ?? 'RPC error'));
}
}
// Dispatch events to subscribed handlers
if (frame.type === 'event') {
const handlers = this.eventHandlers.get(frame.event);
if (handlers) handlers.forEach(h => h(frame.payload));
}
}
private async sendHandshake(nonce: string): Promise<void> {
const signedAt = Date.now();
const scopes = ['operator.read', 'operator.write'];
const clientId = 'gateway-client';
const clientMode = 'backend';
const role = 'operator';
const platform = 'node';
// V3 payload (matches buildDeviceAuthPayloadV3 in openclaw source):
// "v3|{deviceId}|{clientId}|{clientMode}|{role}|{scopes}|{signedAtMs}|{token}|{nonce}|{platform}|{deviceFamily}"
const payload = [
'v3',
this.deviceId,
clientId,
clientMode,
role,
scopes.join(','),
String(signedAt),
this.token,
nonce,
platform,
'', // deviceFamily (empty)
].join('|');
// Sign as UTF-8 string (not raw hex bytes)
const sig = crypto.sign(null, Buffer.from(payload, 'utf8'), this.keyPair.privateKey);
const req: ReqFrame = {
type: 'req',
id: '__connect__',
method: 'connect',
params: {
minProtocol: 3,
maxProtocol: 3,
client: {
id: clientId,
version: '1.0.0',
platform,
mode: clientMode,
},
role,
scopes,
auth: { token: this.token },
device: {
id: this.deviceId,
nonce,
publicKey: this.rawPubKeyB64Url,
signature: base64UrlEncode(sig),
signedAt,
},
},
};
this.ws!.send(JSON.stringify(req));
}
isConnected(): boolean {
return this.connected && this.ws?.readyState === WebSocket.OPEN;
}
rpc(method: string, params?: unknown, timeoutMs = 30_000, expectFinal = false): Promise<unknown> {
return new Promise((resolve, reject) => {
if (!this.isConnected()) {
reject(new Error('Not connected to OpenClaw gateway'));
return;
}
const id = String(++this.msgCounter);
const frame: ReqFrame = { type: 'req', id, method, params };
this.pending.set(id, { resolve, reject, expectFinal });
this.ws!.send(JSON.stringify(frame));
const timer = setTimeout(() => {
if (this.pending.has(id)) {
this.pending.delete(id);
reject(new Error(`RPC timeout: ${method}`));
}
}, timeoutMs);
// Don't keep process alive just for the timer
if (timer.unref) timer.unref();
});
}
/**
* Subscribe to a WS event. Returns an unsubscribe function.
*/
onEvent(event: string, handler: (payload: unknown) => void): () => void {
const handlers = this.eventHandlers.get(event) ?? [];
handlers.push(handler);
this.eventHandlers.set(event, handlers);
return () => {
const h = this.eventHandlers.get(event);
if (h) this.eventHandlers.set(event, h.filter(fn => fn !== handler));
};
}
/**
* Send a chat message and wait for the agent's final reply.
*
* Protocol (OpenClaw v3):
* 1. chat.send RPC → immediate ack { runId, status: "started" }
* 2. Agent processes, streams delta events (event: "chat", state: "delta")
* 3. Final event (event: "chat", state: "final") contains the full reply
*
* Returns the reply text, or throws on timeout / agent error.
*/
async chatSendAndWait(params: {
sessionKey: string;
message: string;
idempotencyKey: string;
timeoutMs?: number;
/** Optional media attachments (images, PDFs, etc.) in OpenClaw format. */
attachments?: Array<{ name: string; mimeType: string; media: string }>;
}): Promise<string> {
const timeoutMs = params.timeoutMs ?? 30_000;
// Send chat.send — resolves immediately with { runId, status: "started" }
const chatSendParams: Record<string, unknown> = {
sessionKey: params.sessionKey,
message: params.message,
idempotencyKey: params.idempotencyKey,
};
if (params.attachments && params.attachments.length > 0) {
chatSendParams['attachments'] = params.attachments;
}
const ack = await this.rpc(
'chat.send',
chatSendParams,
10_000, // 10s for the initial ack
) as { runId: string; status: string };
const runId = ack?.runId ?? params.idempotencyKey;
// Wait for the final "chat" event matching our runId
return new Promise<string>((resolve, reject) => {
const timer = setTimeout(() => {
unsubscribe();
reject(new Error(`Agent reply timeout after ${timeoutMs}ms`));
}, timeoutMs);
const unsubscribe = this.onEvent('chat', (payload: any) => {
if (payload?.runId !== runId) return; // different conversation
const state = payload?.state;
if (state === 'delta') return; // streaming chunk, keep waiting
clearTimeout(timer);
unsubscribe();
if (state === 'final') {
const msg = payload?.message;
// message can be string or { content: [{type:"text", text:"..."}] }
if (typeof msg === 'string') {
resolve(msg);
} else if (msg?.content) {
const texts: string[] = Array.isArray(msg.content)
? msg.content.filter((c: any) => c.type === 'text').map((c: any) => c.text ?? '')
: [typeof msg.content === 'string' ? msg.content : JSON.stringify(msg.content)];
resolve(texts.join(''));
} else if (msg?.text) {
resolve(msg.text);
} else {
resolve(JSON.stringify(payload));
}
} else if (state === 'aborted') {
reject(new Error('Agent run was aborted'));
} else if (state === 'error') {
reject(new Error(payload?.errorMessage ?? 'Agent run failed'));
} else {
// Unknown final state — return raw payload
resolve(JSON.stringify(payload));
}
});
});
}
close(): void {
this.ws?.close();
this.connected = false;
}
}