it0/packages/services/agent-service/src/infrastructure/dingtalk/dingtalk-router.service.ts

930 lines
38 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* DingTalk Router Service
*
* IT0 unified DingTalk bot — one central connection for all agent instances.
*
* Responsibilities:
* 1. Maintain a DingTalk Stream connection using IT0's official App credentials.
* 2. Handle binding codes: user sends code → maps their DingTalk ID to an agent instance.
* 3. Route regular messages: looks up bound instance → POSTs to bridge /task → replies.
*
* Required env vars:
* IT0_DINGTALK_CLIENT_ID — IT0 official DingTalk AppKey
* IT0_DINGTALK_CLIENT_SECRET — IT0 official DingTalk AppSecret
*
* Binding flow:
* 1. Frontend calls POST /api/v1/agent/channels/dingtalk/bind/:instanceId
* 2. Backend returns { code: "A3K9F2", expiresAt }
* 3. User opens DingTalk → IT0 bot → sends "A3K9F2"
* 4. This service matches the code → saves dingTalkUserId → replies "✅ 绑定成功"
* 5. Frontend polls GET /api/v1/agent/channels/dingtalk/status/:instanceId → { bound: true }
*
* Robustness guarantees:
* - WS message handlers fully wrapped in try/catch — no uncaught exceptions
* - Stale-WS guard: close/message events from old WS are ignored after reconnect
* - Old WS terminated before creating new one (no orphaned connections)
* - DISCONNECT frame handled: immediately reconnects
* - Per-user queue: promise chain guaranteed to always resolve (no dead-tail bug)
* - sessionWebhookExpiredTime unit auto-detected (seconds or ms)
* - DingTalk API response capped at 256 KB (prevents memory spike on bad response)
* - Bridge (OpenClaw) response also capped at 256 KB
* - Dual routing: senderStaffId (OAuth binding) + senderId (code binding) both handled
* - Bridge task timeout explicitly set to 55s (bridge default 25s is too short for LLM)
* - sessionWebhook expiry fallback: if webhook expires before LLM replies, uses batchSend
* - Periodic cleanup for all in-memory maps (5 min interval)
*/
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import * as https from 'https';
import * as crypto from 'crypto';
import * as http from 'http';
import WebSocket from 'ws';
import { AgentInstanceRepository } from '../repositories/agent-instance.repository';
// ── Types ─────────────────────────────────────────────────────────────────────
interface DtFrame {
type: string;
headers: Record<string, string>;
data?: string;
}
interface BotMsg {
/** openId — per-app unique, consistent with OAuth /contact/users/me response */
senderId: string;
/** staffId — enterprise employee ID (kept for backward compat) */
senderStaffId: string;
sessionWebhook: string;
/**
* Webhook expiry — DingTalk API has returned both seconds and ms depending on version.
* We auto-detect the unit: values < 1e11 are treated as seconds.
*/
sessionWebhookExpiredTime: number;
text?: { content: string };
msgtype?: string;
conversationId: string;
msgId: string;
}
interface BindingEntry {
instanceId: string;
expiresAt: number;
}
// ── Constants ─────────────────────────────────────────────────────────────────
const DINGTALK_MAX_CHARS = 4800;
const CODE_TTL_MS = 15 * 60 * 1000; // 15 min
const OAUTH_STATE_TTL_MS = 10 * 60 * 1000; // 10 min
const TOKEN_REFRESH_BUFFER = 300; // seconds before expiry to proactively refresh
const WS_RECONNECT_BASE_MS = 2_000;
const WS_RECONNECT_MAX_MS = 60_000;
const TASK_TIMEOUT_S = 55; // seconds — bridge default is 25s; must pass explicitly
const DEDUP_TTL_MS = 10 * 60 * 1000;
const RATE_LIMIT_PER_MIN = 10;
const QUEUE_MAX_DEPTH = 5;
const CLEANUP_INTERVAL_MS = 5 * 60 * 1000;
const MAX_RESPONSE_BYTES = 256 * 1024; // 256 KB cap for DingTalk API responses
// ── Service ───────────────────────────────────────────────────────────────────
@Injectable()
export class DingTalkRouterService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(DingTalkRouterService.name);
private readonly clientId: string;
private readonly clientSecret: string;
private readonly enabled: boolean;
// Token
private token = '';
private tokenExpiresAt = 0;
private tokenRefreshTimer?: NodeJS.Timeout;
private tokenRefreshPromise: Promise<string> | null = null;
// WS
private ws: WebSocket | null = null;
private connecting = false; // prevents concurrent connectStream() calls
private reconnectDelay = WS_RECONNECT_BASE_MS;
private stopping = false;
private reconnectTimer?: NodeJS.Timeout;
private cleanupTimer?: NodeJS.Timeout;
// State
private readonly bindingCodes = new Map<string, BindingEntry>(); // code → entry
private readonly oauthStates = new Map<string, { instanceId: string; expiresAt: number }>(); // state → entry
private readonly dedup = new Map<string, number>(); // msgId → ts
private readonly rateWindows = new Map<string, number[]>(); // userId → timestamps
private readonly queueTails = new Map<string, Promise<void>>(); // userId → tail
private readonly queueDepths = new Map<string, number>(); // userId → depth
constructor(
private readonly configService: ConfigService,
private readonly instanceRepo: AgentInstanceRepository,
) {
this.clientId = this.configService.get<string>('IT0_DINGTALK_CLIENT_ID', '');
this.clientSecret = this.configService.get<string>('IT0_DINGTALK_CLIENT_SECRET', '');
this.enabled = !!(this.clientId && this.clientSecret);
}
onModuleInit(): void {
if (!this.enabled) {
this.logger.warn('IT0_DINGTALK_CLIENT_ID/SECRET not set — DingTalk router disabled');
return;
}
this.logger.log('DingTalk router starting...');
this.connectStream().catch((e: Error) =>
this.logger.error('Initial stream connection failed:', e.message),
);
this.cleanupTimer = setInterval(() => this.periodicCleanup(), CLEANUP_INTERVAL_MS);
if (this.cleanupTimer.unref) this.cleanupTimer.unref();
}
onModuleDestroy(): void {
this.stopping = true;
clearInterval(this.cleanupTimer);
clearTimeout(this.reconnectTimer);
clearTimeout(this.tokenRefreshTimer);
this.terminateCurrentWs();
}
isEnabled(): boolean {
return this.enabled;
}
// ── Binding code API ───────────────────────────────────────────────────────
generateBindingCode(instanceId: string): { code: string; expiresAt: number } {
// Invalidate any existing code for this instance to avoid confusion
for (const [code, entry] of this.bindingCodes) {
if (entry.instanceId === instanceId) this.bindingCodes.delete(code);
}
const code = crypto.randomBytes(3).toString('hex').toUpperCase(); // e.g. "A3K9F2"
const expiresAt = Date.now() + CODE_TTL_MS;
this.bindingCodes.set(code, { instanceId, expiresAt });
return { code, expiresAt };
}
// ── OAuth API ──────────────────────────────────────────────────────────────
/**
* Generate a DingTalk OAuth authorization URL.
* The state token encodes the instanceId so the callback can complete binding
* without user interaction (one-tap "Authorize" in DingTalk on the same phone).
*/
generateOAuthUrl(instanceId: string): { oauthUrl: string; state: string } {
// Invalidate old states for this instance
for (const [s, entry] of this.oauthStates) {
if (entry.instanceId === instanceId) this.oauthStates.delete(s);
}
const state = crypto.randomBytes(16).toString('hex');
const expiresAt = Date.now() + OAUTH_STATE_TTL_MS;
this.oauthStates.set(state, { instanceId, expiresAt });
const baseUrl = this.configService.get<string>('IT0_BASE_URL', 'https://it0api.szaiai.com');
const redirectUri = `${baseUrl}/api/v1/agent/channels/dingtalk/oauth/callback`;
const params = new URLSearchParams({
client_id: this.clientId,
redirect_uri: redirectUri,
response_type: 'code',
scope: 'openid',
state,
prompt: 'consent',
});
return { state, oauthUrl: `https://login.dingtalk.com/oauth2/auth?${params.toString()}` };
}
/**
* Complete the OAuth flow: exchange auth code → get user openId/unionId → save to DB.
* Called by the public callback endpoint (no JWT).
*
* ID types in DingTalk:
* openId — per-app unique (from OAuth /v1.0/contact/users/me). Matches bot senderId.
* unionId — cross-app unique (from same endpoint).
* userId — enterprise staff ID (from oapi/topapi/user/getbyunionid). Required for batchSend.
*
* We store openId (matches senderId in incoming bot messages for routing).
* We convert unionId → userId for proactive greeting via batchSend.
*/
async completeOAuthBinding(code: string, state: string): Promise<{ instanceId: string; instanceName: string }> {
const entry = this.oauthStates.get(state);
if (!entry) throw new Error('无效或已过期的授权状态,请重新绑定');
if (Date.now() > entry.expiresAt) {
this.oauthStates.delete(state);
throw new Error('授权已超时,请重新绑定');
}
this.oauthStates.delete(state);
// Exchange auth code for user access token
const tokenResult = await this.httpsPost<{ accessToken: string; expireIn: number }>(
'api.dingtalk.com', '/v1.0/oauth2/userAccessToken',
{ clientId: this.clientId, clientSecret: this.clientSecret, code, grantType: 'authorization_code' },
);
// Get user's openId and unionId
const userInfo = await this.httpsGet<{ openId: string; unionId: string }>(
'api.dingtalk.com', '/v1.0/contact/users/me',
{ 'x-acs-dingtalk-access-token': tokenResult.accessToken },
);
const openId = userInfo.openId;
const unionId = userInfo.unionId;
if (!openId) throw new Error('无法获取钉钉用户身份,请重试');
this.logger.log(
`OAuth user info: openId=${openId} unionId=${unionId ?? 'none'} for instance=${entry.instanceId}`,
);
// Resolve userId (staffId) from unionId — required for proactive batchSend.
// Uses old oapi.dingtalk.com topapi endpoint with corp app access token.
// Non-fatal if this fails (e.g. user not in enterprise, permission not granted).
let userId: string | undefined;
if (unionId) {
try {
const appToken = await this.getToken();
const result = await this.httpsPost<{ errcode: number; errmsg: string; result: { userid: string } }>(
'oapi.dingtalk.com',
`/topapi/user/getbyunionid?access_token=${encodeURIComponent(appToken)}`,
{ unionid: unionId },
);
if (result.errcode === 0 && result.result?.userid) {
userId = result.result.userid;
this.logger.log(`unionId→userId resolved: ${unionId}${userId}`);
} else {
this.logger.warn(`getbyunionid failed: errcode=${result.errcode} errmsg=${result.errmsg}`);
}
} catch (e: any) {
this.logger.warn(`Cannot resolve unionId→userId (non-fatal): ${e.message}`);
}
}
const instance = await this.instanceRepo.findById(entry.instanceId);
if (!instance) throw new Error('智能体实例不存在');
// Store userId (staffId) as the binding identifier if resolved — this matches senderStaffId
// in incoming DingTalk bot messages, enabling correct routing.
// Fall back to openId if userId could not be resolved (routing may still work if
// DingTalk delivers openId in senderId for this app type).
const bindingId = userId ?? openId;
instance.dingTalkUserId = bindingId;
await this.instanceRepo.save(instance);
this.logger.log(
`OAuth binding saved: instance ${entry.instanceId} → dingTalkUserId=${bindingId} ` +
`(${userId ? 'staffId/userId' : 'openId fallback — staffId not resolved'})`,
);
// Send proactive greeting using userId (staffId). Skip if not resolved.
this.sendGreeting(userId, openId, instance.name).catch((e: Error) =>
this.logger.warn(`Greeting send failed (non-fatal): ${e.message}`),
);
return { instanceId: entry.instanceId, instanceName: instance.name };
}
/**
* Send a proactive bot message to a DingTalk user via the batchSend API.
* Called after OAuth binding completes to greet the user.
*
* batchSend requires the enterprise userId (staffId), NOT openId.
* If userId is unavailable (e.g. user has no enterprise context), we skip the
* proactive message. The user will get a greeting on their first incoming message.
*
* @param userId - enterprise userId (staffId), if resolved; undefined means skip
* @param openId - for logging context only
*/
private async sendGreeting(userId: string | undefined, openId: string, agentName: string): Promise<void> {
if (!userId) {
// batchSend requires staffId; without it, proactive greeting is not possible.
this.logger.warn(
`Skipping proactive greeting for openId=${openId}: userId not resolved ` +
`(ensure app has Contact.User.Read permission and user is an enterprise member)`,
);
return;
}
const token = await this.getToken();
const greeting =
`👋 你好!我是你的 AI 智能体助手「${agentName}」。\n\n` +
`从现在起,你可以直接在这里向我发送指令,我会自主地帮你完成工作任务。\n\n` +
`例如:\n• 查询服务器状态\n• 执行运维脚本\n• 管理文件和进程\n\n` +
`有什么需要帮忙的,直接说吧!`;
this.logger.log(`Sending greeting to userId=${userId} (openId=${openId}) for agent "${agentName}"`);
await this.httpsPost<unknown>(
'api.dingtalk.com',
'/v1.0/robot/oToMessages/batchSend',
{
robotCode: this.clientId,
userIds: [userId],
msgKey: 'sampleText',
msgParam: JSON.stringify({ content: greeting }),
},
{ 'x-acs-dingtalk-access-token': token },
);
this.logger.log(`Greeting sent successfully to userId=${userId}`);
}
// ── Token management ───────────────────────────────────────────────────────
private async getToken(): Promise<string> {
if (this.token && Date.now() < this.tokenExpiresAt - TOKEN_REFRESH_BUFFER * 1000) {
return this.token;
}
if (this.tokenRefreshPromise) return this.tokenRefreshPromise;
this.tokenRefreshPromise = this.refreshToken().finally(() => {
this.tokenRefreshPromise = null;
});
return this.tokenRefreshPromise;
}
private async refreshToken(): Promise<string> {
const { accessToken, expireIn } = await this.httpsPost<{ accessToken: string; expireIn: number }>(
'api.dingtalk.com', '/v1.0/oauth2/accessToken',
{ appKey: this.clientId, appSecret: this.clientSecret },
);
this.token = accessToken;
this.tokenExpiresAt = Date.now() + expireIn * 1000;
clearTimeout(this.tokenRefreshTimer);
const refreshInMs = Math.max((expireIn - TOKEN_REFRESH_BUFFER) * 1000, 60_000);
this.tokenRefreshTimer = setTimeout(() => {
this.getToken().catch((e: Error) => this.logger.error('Proactive token refresh failed:', e.message));
}, refreshInMs);
if (this.tokenRefreshTimer.unref) this.tokenRefreshTimer.unref();
this.logger.log(`DingTalk token refreshed, valid ${expireIn}s`);
return this.token;
}
// ── Stream connection ──────────────────────────────────────────────────────
private async connectStream(): Promise<void> {
if (this.stopping || this.connecting) return;
this.connecting = true;
try {
await this.doConnect();
} finally {
this.connecting = false;
}
}
private async doConnect(): Promise<void> {
let token: string;
try {
token = await this.getToken();
} catch (e: any) {
this.logger.error('Cannot get DingTalk token:', e.message);
this.scheduleReconnect();
return;
}
let wsInfo: { endpoint: string; ticket: string };
try {
wsInfo = await this.httpsPost<{ endpoint: string; ticket: string }>(
'api.dingtalk.com', '/v1.0/gateway/connections/open',
{
clientId: this.clientId,
clientSecret: this.clientSecret,
subscriptions: [{ type: 'CALLBACK', topic: '/v1.0/im/bot/messages/get' }],
ua: 'it0-dingtalk-router/1.0',
localIp: '127.0.0.1',
},
{ 'x-acs-dingtalk-access-token': token },
);
} catch (e: any) {
this.logger.error('Failed to get stream endpoint:', e.message);
this.scheduleReconnect();
return;
}
// ── Close stale WS before creating new one ─────────────────────────────
// Terminating the old WS now means its 'close' event will fire, but the
// stale-WS guard (ws !== this.ws) will prevent it from triggering reconnect.
this.terminateCurrentWs();
const ws = new WebSocket(`${wsInfo.endpoint}?ticket=${encodeURIComponent(wsInfo.ticket)}`);
this.ws = ws;
ws.on('open', () => {
if (ws !== this.ws) return; // stale
this.logger.log('DingTalk stream connected');
this.reconnectDelay = WS_RECONNECT_BASE_MS;
});
// Fully wrapped — no uncaught exceptions can reach the EventEmitter boundary
ws.on('message', (raw) => {
try {
if (ws !== this.ws) return; // stale WS, ignore
let frame: DtFrame;
try { frame = JSON.parse(raw.toString()); } catch { return; }
this.handleFrame(ws, frame);
} catch (e: any) {
this.logger.error('Uncaught error in WS message handler:', e.message);
}
});
ws.on('close', (code, reason) => {
if (this.stopping || ws !== this.ws) return; // stale or intentional shutdown
this.logger.warn(`Stream closed (${code}: ${reason.toString()})`);
this.scheduleReconnect();
});
ws.on('error', (e) => {
// 'close' fires after 'error' — reconnect is handled there
this.logger.error('Stream WS error:', e.message);
});
}
/** Force-close the current WS without triggering our reconnect logic */
private terminateCurrentWs(): void {
const old = this.ws;
this.ws = null;
if (old && old.readyState !== WebSocket.CLOSED) {
try { old.terminate(); } catch { /* ignore */ }
}
}
private scheduleReconnect(): void {
if (this.stopping) return;
clearTimeout(this.reconnectTimer);
this.reconnectTimer = setTimeout(() => {
this.connectStream().catch((e: Error) =>
this.logger.error('Reconnect attempt failed:', e.message),
);
}, this.reconnectDelay);
this.reconnectDelay = Math.min(this.reconnectDelay * 2, WS_RECONNECT_MAX_MS);
}
// ── Frame handling ─────────────────────────────────────────────────────────
private handleFrame(ws: WebSocket, frame: DtFrame): void {
// Server keepalive
if (frame.type === 'PING') {
this.wsSend(ws, JSON.stringify({ code: 200, headers: frame.headers, message: 'OK', data: '' }));
return;
}
// Server requests graceful disconnect (e.g. deployment rollover, load balancing)
if (frame.type === 'DISCONNECT') {
this.logger.warn('DingTalk server requested DISCONNECT — reconnecting');
// Nullify reference so close event is treated as "current" and triggers reconnect
// (we're about to terminate, which fires close; with ws === this.ws it proceeds)
ws.terminate();
return;
}
if (
frame.type === 'CALLBACK' &&
frame.headers?.['topic'] === '/v1.0/im/bot/messages/get' &&
frame.data
) {
let msg: BotMsg;
try { msg = JSON.parse(frame.data); } catch { return; }
// ACK MUST be sent within 1.5s — synchronously before any async work
this.wsSend(ws, JSON.stringify({ code: 200, headers: frame.headers, message: 'OK', data: '' }));
this.dispatchMessage(msg);
}
}
/**
* Safe WebSocket send — never throws.
* Checks readyState before calling send() to avoid "WebSocket is not open" errors
* that would otherwise propagate as uncaught exceptions through EventEmitter.
*/
private wsSend(ws: WebSocket, data: string): void {
try {
if (ws.readyState === WebSocket.OPEN) {
ws.send(data);
}
} catch (e: any) {
this.logger.warn('wsSend failed:', e.message);
}
}
private dispatchMessage(msg: BotMsg): void {
// senderId = openId (per-app), consistent with OAuth binding identifier.
// Fall back to senderStaffId for legacy messages that lack senderId.
const userId = (msg.senderId ?? msg.senderStaffId)?.trim();
if (!userId) {
this.logger.warn('Received message with no sender ID, ignoring');
return;
}
this.logger.log(
`DingTalk message: senderId=${msg.senderId ?? 'none'} senderStaffId=${msg.senderStaffId ?? 'none'} text="${(msg.text?.content ?? '').slice(0, 60)}"`,
);
// Non-text message types (image, file, richText, audio, video, @mention, etc.)
const text = msg.text?.content?.trim() ?? '';
if (!text) {
this.reply(msg, '我目前只能处理文字消息,请发送文字与小龙虾沟通。');
return;
}
// Deduplication (DingTalk may deliver duplicates on reconnect)
if (this.dedup.has(msg.msgId)) return;
this.dedup.set(msg.msgId, Date.now());
// Binding code check (6-char hex, case-insensitive, exact match only)
const upperText = text.toUpperCase();
const entry = this.bindingCodes.get(upperText);
if (entry) {
if (Date.now() > entry.expiresAt) {
this.bindingCodes.delete(upperText);
this.reply(msg, '验证码已过期,请重新在 IT0 App 中生成新的验证码。');
return;
}
this.bindingCodes.delete(upperText);
this.completeBinding(entry.instanceId, userId, msg).catch((e: Error) =>
this.logger.error('completeBinding unhandled error:', e.message),
);
return;
}
// Rate limit
if (!this.rateAllow(userId)) {
this.reply(msg, '消息频率过高请稍后再试每分钟最多10条。');
return;
}
// Route to agent container (serial per-user queue)
const accepted = this.enqueue(userId, () => this.routeToAgent(userId, text, msg));
if (!accepted) {
this.reply(msg, '当前请求排队已满最多5条请稍后再试。');
}
}
// ── Binding completion ─────────────────────────────────────────────────────
private async completeBinding(instanceId: string, dingTalkUserId: string, msg: BotMsg): Promise<void> {
try {
const instance = await this.instanceRepo.findById(instanceId);
if (!instance) {
this.reply(msg, '绑定失败:智能体实例不存在,请重新操作。');
return;
}
instance.dingTalkUserId = dingTalkUserId;
await this.instanceRepo.save(instance);
this.logger.log(`Bound instance ${instanceId} to DingTalk user ${dingTalkUserId}`);
this.reply(
msg,
`✅ 绑定成功!\n\n你的小龙虾「${instance.name}」已与钉钉绑定。\n\n现在直接发消息给我我会帮你转达给它`,
);
} catch (e: any) {
this.logger.error('Binding failed:', e.message);
this.reply(msg, '绑定时出现错误,请稍后重试。');
}
}
// ── Message routing ────────────────────────────────────────────────────────
private async routeToAgent(userId: string, text: string, msg: BotMsg): Promise<void> {
// Two binding paths store different DingTalk ID types:
// OAuth binding → stores staffId (resolved via unionId→userId at auth time)
// Code binding → stores senderId ($:LWCP_v1:$... format from bot message)
//
// DingTalk's Stream API senderId ($:LWCP_v1:$...) is NOT the same as the OAuth
// openId returned by /v1.0/contact/users/me. They are different ID encodings.
// Therefore we try senderStaffId first (hits OAuth-bound instances immediately),
// then fall back to senderId (hits code-bound instances).
const staffId = msg.senderStaffId?.trim();
let instance = staffId ? await this.instanceRepo.findByDingTalkUserId(staffId) : null;
if (!instance) {
// Code-bound instances store senderId directly; also catches any future cases
// where DingTalk aligns senderId with the stored identifier.
instance = await this.instanceRepo.findByDingTalkUserId(userId);
if (instance) {
this.logger.log(`Routed via senderId=${userId} (senderStaffId=${staffId ?? 'none'} not matched)`);
}
}
if (!instance) {
this.logger.warn(`No binding found for senderStaffId=${staffId ?? 'none'} or senderId=${userId}`);
this.reply(
msg,
'你还没有绑定小龙虾。\n\n请在 IT0 App 中创建一只小龙虾,然后点击「绑定钉钉」获取验证码。',
);
return;
}
if (instance.status !== 'running') {
this.reply(msg, `小龙虾「${instance.name}」当前状态为 ${instance.status},暂时无法接收指令。`);
return;
}
if (!instance.serverHost) {
this.logger.error(`Instance ${instance.id} has no serverHost configured`);
this.reply(msg, '小龙虾配置异常(缺少服务器地址),请联系管理员。');
return;
}
const bridgeUrl = `http://${instance.serverHost}:${instance.hostPort}/task`;
// sessionWebhook TTL is ~90 minutes (per DingTalk docs), but delivering the actual LLM
// response synchronously makes the user wait with no feedback. Strategy:
// 1. Immediately send "处理中..." via sessionWebhook — user sees instant acknowledgment
// 2. Await the bridge call (LLM processing) — the serial queue still blocks here,
// preventing concurrent LLM calls for the same user
// 3. Always deliver the actual response via batchSend — decoupled from webhook window
this.reply(msg, '🤔 小虾米正在思考,稍等...');
let reply: string;
try {
const result = await this.httpPostJson<{ ok: boolean; result?: unknown; error?: string }>(
bridgeUrl,
{
prompt: text,
sessionKey: `agent:main:dt-${userId}`,
idempotencyKey: msg.msgId,
timeoutSeconds: TASK_TIMEOUT_S,
},
(TASK_TIMEOUT_S + 10) * 1000,
);
if (result.ok && result.result !== undefined) {
reply = typeof result.result === 'string'
? result.result
: JSON.stringify(result.result, null, 2);
} else {
reply = result.error ?? '智能体没有返回内容。';
}
} catch (e: any) {
this.logger.error(`Bridge call failed for instance ${instance.id}:`, e.message);
reply = '与小龙虾通信时出现错误,请稍后重试。';
}
await this.batchSend(staffId, reply, msg.msgId);
}
/** Send a proactive message to a DingTalk user via batchSend. Used for LLM replies
* so that users receive the response regardless of sessionWebhook state. */
private batchSend(staffId: string | undefined, content: string, msgId: string): Promise<void> {
if (!staffId) {
this.logger.warn(`batchSend skipped — no staffId for msgId=${msgId}`);
return Promise.resolve();
}
// Chunk content to stay within DingTalk's message size limit
const safe = content.replace(/\s+at\s+\S+:\d+:\d+/g, '').trim() || '(空响应)';
const chunks: string[] = [];
for (let i = 0; i < safe.length; i += DINGTALK_MAX_CHARS) {
chunks.push(safe.slice(i, i + DINGTALK_MAX_CHARS));
}
return this.getToken()
.then(async (token) => {
for (const chunk of chunks) {
await this.httpsPost<unknown>(
'api.dingtalk.com',
'/v1.0/robot/oToMessages/batchSend',
{
robotCode: this.clientId,
userIds: [staffId],
msgKey: 'sampleText',
msgParam: JSON.stringify({ content: chunk }),
},
{ 'x-acs-dingtalk-access-token': token },
);
}
})
.catch((e: Error) =>
this.logger.error(`batchSend failed for msgId=${msgId}:`, e.message),
);
}
// ── Reply (chunked) ────────────────────────────────────────────────────────
private reply(msg: BotMsg, content: string): void {
// Auto-detect unit: DingTalk has returned both seconds and ms depending on API version.
// Values < 1e11 (year 1973 in ms) are treated as Unix seconds.
const expiryMs = msg.sessionWebhookExpiredTime > 1e11
? msg.sessionWebhookExpiredTime
: msg.sessionWebhookExpiredTime * 1000;
if (Date.now() > expiryMs) {
this.logger.warn(`sessionWebhook expired for msgId=${msg.msgId}, skipping reply`);
return;
}
// Strip stack-trace lines to avoid leaking internals
const safe = content.replace(/\s+at\s+\S+:\d+:\d+/g, '').trim();
const chunks: string[] = [];
for (let i = 0; i < safe.length; i += DINGTALK_MAX_CHARS) {
chunks.push(safe.slice(i, i + DINGTALK_MAX_CHARS));
}
if (chunks.length === 0) chunks.push('(空响应)');
chunks.forEach((chunk, idx) =>
setTimeout(() => this.sendWebhook(msg.sessionWebhook, chunk), idx * 300),
);
}
private sendWebhook(webhook: string, content: string): void {
try {
const url = new URL(webhook);
const body = JSON.stringify({ msgtype: 'text', text: { content } });
const req = https.request(
{
hostname: url.hostname,
path: url.pathname + url.search,
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(body),
},
timeout: 10_000,
},
(res) => { res.resume(); },
);
req.on('timeout', () => { req.destroy(); });
req.on('error', (e) => this.logger.error('Webhook send error:', e.message));
req.write(body);
req.end();
} catch (e: any) {
this.logger.error('sendWebhook failed:', e.message);
}
}
// ── Per-user serial queue ──────────────────────────────────────────────────
private enqueue(userId: string, task: () => Promise<void>): boolean {
const depth = this.queueDepths.get(userId) ?? 0;
if (depth >= QUEUE_MAX_DEPTH) return false;
this.queueDepths.set(userId, depth + 1);
const tail = this.queueTails.get(userId) ?? Promise.resolve();
const next = tail
.then(task)
.catch((e: Error) => {
// Safe catch: this handler must not throw, or the queue tail becomes a dead rejected promise
try { this.logger.error(`Queue task error (${userId}):`, e.message); } catch { /* ignore */ }
})
.finally(() => {
const remaining = (this.queueDepths.get(userId) ?? 1) - 1;
if (remaining <= 0) {
this.queueDepths.delete(userId);
this.queueTails.delete(userId);
} else {
this.queueDepths.set(userId, remaining);
}
})
// Guarantee: `next` always resolves, even if finally() somehow throws.
// This prevents a permanently dead queue tail that silently drops all future tasks.
.catch(() => {});
this.queueTails.set(userId, next);
return true;
}
// ── Rate limiter (sliding window) ─────────────────────────────────────────
private rateAllow(userId: string): boolean {
const now = Date.now();
const timestamps = (this.rateWindows.get(userId) ?? []).filter((t) => now - t < 60_000);
if (timestamps.length >= RATE_LIMIT_PER_MIN) {
this.rateWindows.set(userId, timestamps); // persist pruned list
return false;
}
timestamps.push(now);
this.rateWindows.set(userId, timestamps);
return true;
}
// ── Periodic cleanup ───────────────────────────────────────────────────────
private periodicCleanup(): void {
const now = Date.now();
for (const [id, ts] of this.dedup) {
if (now - ts > DEDUP_TTL_MS) this.dedup.delete(id);
}
for (const [userId, timestamps] of this.rateWindows) {
const fresh = timestamps.filter((t) => now - t < 60_000);
if (fresh.length === 0) this.rateWindows.delete(userId);
else this.rateWindows.set(userId, fresh);
}
for (const [code, entry] of this.bindingCodes) {
if (now > entry.expiresAt) this.bindingCodes.delete(code);
}
for (const [state, entry] of this.oauthStates) {
if (now > entry.expiresAt) this.oauthStates.delete(state);
}
}
// ── HTTP helpers ───────────────────────────────────────────────────────────
/** HTTPS GET to DingTalk API. Response body capped at MAX_RESPONSE_BYTES. */
private httpsGet<T>(hostname: string, path: string, headers: Record<string, string> = {}): Promise<T> {
return new Promise((resolve, reject) => {
const req = https.request(
{ hostname, path, method: 'GET', headers: { ...headers }, timeout: 10_000 },
(res) => {
let data = ''; let totalBytes = 0;
res.on('data', (chunk: Buffer) => { totalBytes += chunk.length; if (totalBytes <= MAX_RESPONSE_BYTES) data += chunk.toString(); });
res.on('end', () => {
if (totalBytes > MAX_RESPONSE_BYTES) { reject(new Error(`Response too large (${totalBytes} bytes)`)); return; }
try {
const json = JSON.parse(data);
if (res.statusCode && res.statusCode >= 400) reject(new Error(`HTTP ${res.statusCode}: ${data.slice(0, 200)}`));
else resolve(json as T);
} catch (e) { reject(e); }
});
},
);
req.on('timeout', () => { req.destroy(); reject(new Error('DingTalk API GET timeout')); });
req.on('error', reject);
req.end();
});
}
/** HTTPS POST to DingTalk API. Response body capped at MAX_RESPONSE_BYTES. */
private httpsPost<T>(
hostname: string,
path: string,
payload: object,
extraHeaders: Record<string, string> = {},
): Promise<T> {
return new Promise((resolve, reject) => {
const body = JSON.stringify(payload);
const req = https.request(
{
hostname, path, method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(body),
...extraHeaders,
},
timeout: 10_000,
},
(res) => {
let data = '';
let totalBytes = 0;
res.on('data', (chunk: Buffer) => {
totalBytes += chunk.length;
if (totalBytes <= MAX_RESPONSE_BYTES) data += chunk.toString();
});
res.on('end', () => {
if (totalBytes > MAX_RESPONSE_BYTES) {
reject(new Error(`DingTalk API response too large (${totalBytes} bytes)`));
return;
}
try {
const json = JSON.parse(data);
if (res.statusCode && res.statusCode >= 400) {
reject(new Error(`HTTP ${res.statusCode}: ${data.slice(0, 200)}`));
} else {
resolve(json as T);
}
} catch (e) { reject(e); }
});
},
);
req.on('timeout', () => { req.destroy(); reject(new Error('DingTalk API request timeout')); });
req.on('error', reject);
req.write(body);
req.end();
});
}
/** HTTP POST to an internal bridge container (plain http, no TLS). */
private httpPostJson<T>(url: string, payload: object, timeoutMs = 35_000): Promise<T> {
return new Promise((resolve, reject) => {
const parsed = new URL(url);
const body = JSON.stringify(payload);
const req = http.request(
{
hostname: parsed.hostname,
port: parseInt(parsed.port, 10),
path: parsed.pathname,
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(body),
},
timeout: timeoutMs,
},
(res) => {
let data = '';
let totalBytes = 0;
res.on('data', (c: Buffer) => {
totalBytes += c.length;
if (totalBytes <= MAX_RESPONSE_BYTES) data += c.toString();
});
res.on('end', () => {
if (totalBytes > MAX_RESPONSE_BYTES) {
reject(new Error(`Bridge response too large (${totalBytes} bytes)`));
return;
}
try {
const json = JSON.parse(data);
if (res.statusCode && res.statusCode >= 400) {
reject(new Error(`Bridge HTTP ${res.statusCode}: ${data.slice(0, 200)}`));
} else {
resolve(json as T);
}
} catch (e) { reject(e); }
});
},
);
req.on('timeout', () => { req.destroy(); reject(new Error('Bridge request timeout')); });
req.on('error', reject);
req.write(body);
req.end();
});
}
}