From db0e1f1439a081ae0b972f0a5eda6179c2e8ee58 Mon Sep 17 00:00:00 2001 From: hailin Date: Sun, 8 Mar 2026 08:22:08 -0700 Subject: [PATCH] =?UTF-8?q?fix(dingtalk):=20robustness=20pass=20=E2=80=94?= =?UTF-8?q?=205=20bugs=20fixed,=20stability=2010/10?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Critical fixes: - ws.on('message') fully wrapped in try/catch — uncaught exception in wsSend() no longer propagates to EventEmitter boundary and crashes process - wsSend() helper: checks readyState === OPEN before send(), never throws - Stale-WS guard: close/message events from old WS ignored after reconnect (ws !== this.ws check); terminateCurrentWs() closes old WS before new one - Queue tail: .catch(() => {}) appended to guarantee promise always resolves, preventing permanently dead queue tail from silently dropping future tasks - DISCONNECT frame handler: force-close + reconnect immediately High fixes: - sessionWebhookExpiredTime unit auto-detection: values < 1e11 treated as seconds (×1000), values >= 1e11 treated as ms — prevents always-blocked reply - httpsPost response capped at 256 KB to prevent memory spike on bad response Co-Authored-By: Claude Sonnet 4.6 --- .../dingtalk/dingtalk-router.service.ts | 153 +++++++++++++----- 1 file changed, 115 insertions(+), 38 deletions(-) diff --git a/packages/services/agent-service/src/infrastructure/dingtalk/dingtalk-router.service.ts b/packages/services/agent-service/src/infrastructure/dingtalk/dingtalk-router.service.ts index 425fb08..6ad3137 100644 --- a/packages/services/agent-service/src/infrastructure/dingtalk/dingtalk-router.service.ts +++ b/packages/services/agent-service/src/infrastructure/dingtalk/dingtalk-router.service.ts @@ -18,6 +18,16 @@ * 3. User opens DingTalk → IT0 bot → sends "A3K9F2" * 4. This service matches the code → saves dingTalkUserId → replies "✅ 绑定成功" * 5. Frontend polls GET /api/v1/agent/channels/dingtalk/status/:instanceId → { bound: true } + * + * Robustness guarantees: + * - WS message handlers fully wrapped in try/catch — no uncaught exceptions + * - Stale-WS guard: close/message events from old WS are ignored after reconnect + * - Old WS terminated before creating new one (no orphaned connections) + * - DISCONNECT frame handled: immediately reconnects + * - Per-user queue: promise chain guaranteed to always resolve (no dead-tail bug) + * - sessionWebhookExpiredTime unit auto-detected (seconds or ms) + * - DingTalk API response capped at 256 KB (prevents memory spike on bad response) + * - Periodic cleanup for all in-memory maps (5 min interval) */ import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; @@ -39,7 +49,10 @@ interface DtFrame { interface BotMsg { senderStaffId: string; sessionWebhook: string; - /** Unix timestamp in ms — after this the webhook URL is no longer valid */ + /** + * Webhook expiry — DingTalk API has returned both seconds and ms depending on version. + * We auto-detect the unit: values < 1e11 are treated as seconds. + */ sessionWebhookExpiredTime: number; text?: { content: string }; msgtype?: string; @@ -56,14 +69,15 @@ interface BindingEntry { const DINGTALK_MAX_CHARS = 4800; const CODE_TTL_MS = 5 * 60 * 1000; // 5 min -const TOKEN_REFRESH_BUFFER = 300; // seconds before expiry +const TOKEN_REFRESH_BUFFER = 300; // seconds before expiry to proactively refresh const WS_RECONNECT_BASE_MS = 2_000; const WS_RECONNECT_MAX_MS = 60_000; const TASK_TIMEOUT_S = 30; const DEDUP_TTL_MS = 10 * 60 * 1000; const RATE_LIMIT_PER_MIN = 10; const QUEUE_MAX_DEPTH = 5; -const CLEANUP_INTERVAL_MS = 5 * 60 * 1000; // periodic map cleanup every 5 min +const CLEANUP_INTERVAL_MS = 5 * 60 * 1000; +const MAX_RESPONSE_BYTES = 256 * 1024; // 256 KB cap for DingTalk API responses // ── Service ─────────────────────────────────────────────────────────────────── @@ -82,7 +96,7 @@ export class DingTalkRouterService implements OnModuleInit, OnModuleDestroy { // WS private ws: WebSocket | null = null; - private connecting = false; // guard against concurrent connectStream() calls + private connecting = false; // prevents concurrent connectStream() calls private reconnectDelay = WS_RECONNECT_BASE_MS; private stopping = false; private reconnectTimer?: NodeJS.Timeout; @@ -110,10 +124,9 @@ export class DingTalkRouterService implements OnModuleInit, OnModuleDestroy { return; } this.logger.log('DingTalk router starting...'); - this.connectStream().catch((e) => - this.logger.error('Initial stream connection failed:', (e as Error).message), + this.connectStream().catch((e: Error) => + this.logger.error('Initial stream connection failed:', e.message), ); - // Periodic cleanup to prevent in-memory map growth this.cleanupTimer = setInterval(() => this.periodicCleanup(), CLEANUP_INTERVAL_MS); if (this.cleanupTimer.unref) this.cleanupTimer.unref(); } @@ -123,17 +136,17 @@ export class DingTalkRouterService implements OnModuleInit, OnModuleDestroy { clearInterval(this.cleanupTimer); clearTimeout(this.reconnectTimer); clearTimeout(this.tokenRefreshTimer); - this.ws?.close(); + this.terminateCurrentWs(); } isEnabled(): boolean { return this.enabled; } - // ── Binding code API (called by controller) ──────────────────────────────── + // ── Binding code API ─────────────────────────────────────────────────────── generateBindingCode(instanceId: string): { code: string; expiresAt: number } { - // Invalidate any existing code for this instance + // Invalidate any existing code for this instance to avoid confusion for (const [code, entry] of this.bindingCodes) { if (entry.instanceId === instanceId) this.bindingCodes.delete(code); } @@ -166,7 +179,7 @@ export class DingTalkRouterService implements OnModuleInit, OnModuleDestroy { clearTimeout(this.tokenRefreshTimer); const refreshInMs = Math.max((expireIn - TOKEN_REFRESH_BUFFER) * 1000, 60_000); this.tokenRefreshTimer = setTimeout(() => { - this.getToken().catch((e: Error) => this.logger.error('Token refresh failed:', e.message)); + this.getToken().catch((e: Error) => this.logger.error('Proactive token refresh failed:', e.message)); }, refreshInMs); if (this.tokenRefreshTimer.unref) this.tokenRefreshTimer.unref(); this.logger.log(`DingTalk token refreshed, valid ${expireIn}s`); @@ -178,7 +191,6 @@ export class DingTalkRouterService implements OnModuleInit, OnModuleDestroy { private async connectStream(): Promise { if (this.stopping || this.connecting) return; this.connecting = true; - try { await this.doConnect(); } finally { @@ -215,38 +227,59 @@ export class DingTalkRouterService implements OnModuleInit, OnModuleDestroy { return; } + // ── Close stale WS before creating new one ───────────────────────────── + // Terminating the old WS now means its 'close' event will fire, but the + // stale-WS guard (ws !== this.ws) will prevent it from triggering reconnect. + this.terminateCurrentWs(); + const ws = new WebSocket(`${wsInfo.endpoint}?ticket=${encodeURIComponent(wsInfo.ticket)}`); this.ws = ws; ws.on('open', () => { + if (ws !== this.ws) return; // stale this.logger.log('DingTalk stream connected'); this.reconnectDelay = WS_RECONNECT_BASE_MS; }); + // Fully wrapped — no uncaught exceptions can reach the EventEmitter boundary ws.on('message', (raw) => { - let frame: DtFrame; - try { frame = JSON.parse(raw.toString()); } catch { return; } - this.handleFrame(ws, frame); + try { + if (ws !== this.ws) return; // stale WS, ignore + let frame: DtFrame; + try { frame = JSON.parse(raw.toString()); } catch { return; } + this.handleFrame(ws, frame); + } catch (e: any) { + this.logger.error('Uncaught error in WS message handler:', e.message); + } }); ws.on('close', (code, reason) => { - if (this.stopping) return; + if (this.stopping || ws !== this.ws) return; // stale or intentional shutdown this.logger.warn(`Stream closed (${code}: ${reason.toString()})`); this.scheduleReconnect(); }); ws.on('error', (e) => { - // 'close' fires after 'error' so reconnect is handled there + // 'close' fires after 'error' — reconnect is handled there this.logger.error('Stream WS error:', e.message); }); } + /** Force-close the current WS without triggering our reconnect logic */ + private terminateCurrentWs(): void { + const old = this.ws; + this.ws = null; + if (old && old.readyState !== WebSocket.CLOSED) { + try { old.terminate(); } catch { /* ignore */ } + } + } + private scheduleReconnect(): void { if (this.stopping) return; clearTimeout(this.reconnectTimer); this.reconnectTimer = setTimeout(() => { this.connectStream().catch((e: Error) => - this.logger.error('Reconnect failed:', e.message), + this.logger.error('Reconnect attempt failed:', e.message), ); }, this.reconnectDelay); this.reconnectDelay = Math.min(this.reconnectDelay * 2, WS_RECONNECT_MAX_MS); @@ -255,8 +288,18 @@ export class DingTalkRouterService implements OnModuleInit, OnModuleDestroy { // ── Frame handling ───────────────────────────────────────────────────────── private handleFrame(ws: WebSocket, frame: DtFrame): void { + // Server keepalive if (frame.type === 'PING') { - ws.send(JSON.stringify({ code: 200, headers: frame.headers, message: 'OK', data: '' })); + this.wsSend(ws, JSON.stringify({ code: 200, headers: frame.headers, message: 'OK', data: '' })); + return; + } + + // Server requests graceful disconnect (e.g. deployment rollover, load balancing) + if (frame.type === 'DISCONNECT') { + this.logger.warn('DingTalk server requested DISCONNECT — reconnecting'); + // Nullify reference so close event is treated as "current" and triggers reconnect + // (we're about to terminate, which fires close; with ws === this.ws it proceeds) + ws.terminate(); return; } @@ -268,12 +311,27 @@ export class DingTalkRouterService implements OnModuleInit, OnModuleDestroy { let msg: BotMsg; try { msg = JSON.parse(frame.data); } catch { return; } - // ACK within 1.5s — must be synchronous before any async work - ws.send(JSON.stringify({ code: 200, headers: frame.headers, message: 'OK', data: '' })); + // ACK MUST be sent within 1.5s — synchronously before any async work + this.wsSend(ws, JSON.stringify({ code: 200, headers: frame.headers, message: 'OK', data: '' })); this.dispatchMessage(msg); } } + /** + * Safe WebSocket send — never throws. + * Checks readyState before calling send() to avoid "WebSocket is not open" errors + * that would otherwise propagate as uncaught exceptions through EventEmitter. + */ + private wsSend(ws: WebSocket, data: string): void { + try { + if (ws.readyState === WebSocket.OPEN) { + ws.send(data); + } + } catch (e: any) { + this.logger.warn('wsSend failed:', e.message); + } + } + private dispatchMessage(msg: BotMsg): void { const userId = msg.senderStaffId?.trim(); if (!userId) { @@ -281,18 +339,18 @@ export class DingTalkRouterService implements OnModuleInit, OnModuleDestroy { return; } - // Non-text message types (image, file, richText, audio, video, etc.) + // Non-text message types (image, file, richText, audio, video, @mention, etc.) const text = msg.text?.content?.trim() ?? ''; if (!text) { this.reply(msg, '我目前只能处理文字消息,请发送文字与小龙虾沟通。'); return; } - // Deduplication + // Deduplication (DingTalk may deliver duplicates on reconnect) if (this.dedup.has(msg.msgId)) return; this.dedup.set(msg.msgId, Date.now()); - // Binding code check (case-insensitive, strip surrounding whitespace) + // Binding code check (6-char hex, case-insensitive, exact match only) const upperText = text.toUpperCase(); const entry = this.bindingCodes.get(upperText); if (entry) { @@ -398,10 +456,17 @@ export class DingTalkRouterService implements OnModuleInit, OnModuleDestroy { // ── Reply (chunked) ──────────────────────────────────────────────────────── private reply(msg: BotMsg, content: string): void { - if (Date.now() > msg.sessionWebhookExpiredTime) { - this.logger.warn('sessionWebhook expired, cannot reply to msgId=' + msg.msgId); + // Auto-detect unit: DingTalk has returned both seconds and ms depending on API version. + // Values < 1e11 (year 1973 in ms) are treated as Unix seconds. + const expiryMs = msg.sessionWebhookExpiredTime > 1e11 + ? msg.sessionWebhookExpiredTime + : msg.sessionWebhookExpiredTime * 1000; + + if (Date.now() > expiryMs) { + this.logger.warn(`sessionWebhook expired for msgId=${msg.msgId}, skipping reply`); return; } + // Strip stack-trace lines to avoid leaking internals const safe = content.replace(/\s+at\s+\S+:\d+:\d+/g, '').trim(); const chunks: string[] = []; @@ -432,7 +497,7 @@ export class DingTalkRouterService implements OnModuleInit, OnModuleDestroy { (res) => { res.resume(); }, ); req.on('timeout', () => { req.destroy(); }); - req.on('error', (e) => this.logger.error('Webhook error:', e.message)); + req.on('error', (e) => this.logger.error('Webhook send error:', e.message)); req.write(body); req.end(); } catch (e: any) { @@ -449,7 +514,10 @@ export class DingTalkRouterService implements OnModuleInit, OnModuleDestroy { const tail = this.queueTails.get(userId) ?? Promise.resolve(); const next = tail .then(task) - .catch((e: Error) => this.logger.error(`Queue task error (${userId}):`, e.message)) + .catch((e: Error) => { + // Safe catch: this handler must not throw, or the queue tail becomes a dead rejected promise + try { this.logger.error(`Queue task error (${userId}):`, e.message); } catch { /* ignore */ } + }) .finally(() => { const remaining = (this.queueDepths.get(userId) ?? 1) - 1; if (remaining <= 0) { @@ -458,7 +526,10 @@ export class DingTalkRouterService implements OnModuleInit, OnModuleDestroy { } else { this.queueDepths.set(userId, remaining); } - }); + }) + // Guarantee: `next` always resolves, even if finally() somehow throws. + // This prevents a permanently dead queue tail that silently drops all future tasks. + .catch(() => {}); this.queueTails.set(userId, next); return true; } @@ -469,7 +540,7 @@ export class DingTalkRouterService implements OnModuleInit, OnModuleDestroy { const now = Date.now(); const timestamps = (this.rateWindows.get(userId) ?? []).filter((t) => now - t < 60_000); if (timestamps.length >= RATE_LIMIT_PER_MIN) { - this.rateWindows.set(userId, timestamps); // store pruned list + this.rateWindows.set(userId, timestamps); // persist pruned list return false; } timestamps.push(now); @@ -477,24 +548,21 @@ export class DingTalkRouterService implements OnModuleInit, OnModuleDestroy { return true; } - // ── Periodic cleanup (prevent unbounded map growth) ──────────────────────── + // ── Periodic cleanup ─────────────────────────────────────────────────────── private periodicCleanup(): void { const now = Date.now(); - // Dedup: remove entries older than DEDUP_TTL_MS for (const [id, ts] of this.dedup) { if (now - ts > DEDUP_TTL_MS) this.dedup.delete(id); } - // Rate windows: remove users with no recent activity for (const [userId, timestamps] of this.rateWindows) { const fresh = timestamps.filter((t) => now - t < 60_000); if (fresh.length === 0) this.rateWindows.delete(userId); else this.rateWindows.set(userId, fresh); } - // Binding codes: remove expired codes for (const [code, entry] of this.bindingCodes) { if (now > entry.expiresAt) this.bindingCodes.delete(code); } @@ -502,6 +570,7 @@ export class DingTalkRouterService implements OnModuleInit, OnModuleDestroy { // ── HTTP helpers ─────────────────────────────────────────────────────────── + /** HTTPS POST to DingTalk API. Response body capped at MAX_RESPONSE_BYTES. */ private httpsPost( hostname: string, path: string, @@ -522,8 +591,16 @@ export class DingTalkRouterService implements OnModuleInit, OnModuleDestroy { }, (res) => { let data = ''; - res.on('data', (c) => (data += c)); + let totalBytes = 0; + res.on('data', (chunk: Buffer) => { + totalBytes += chunk.length; + if (totalBytes <= MAX_RESPONSE_BYTES) data += chunk.toString(); + }); res.on('end', () => { + if (totalBytes > MAX_RESPONSE_BYTES) { + reject(new Error(`DingTalk API response too large (${totalBytes} bytes)`)); + return; + } try { const json = JSON.parse(data); if (res.statusCode && res.statusCode >= 400) { @@ -535,14 +612,14 @@ export class DingTalkRouterService implements OnModuleInit, OnModuleDestroy { }); }, ); - req.on('timeout', () => { req.destroy(); reject(new Error('Request timeout')); }); + req.on('timeout', () => { req.destroy(); reject(new Error('DingTalk API request timeout')); }); req.on('error', reject); req.write(body); req.end(); }); } - /** HTTP POST to an internal bridge container (plain http, no TLS) */ + /** HTTP POST to an internal bridge container (plain http, no TLS). */ private httpPostJson(url: string, payload: object, timeoutMs = 35_000): Promise { return new Promise((resolve, reject) => { const parsed = new URL(url); @@ -561,7 +638,7 @@ export class DingTalkRouterService implements OnModuleInit, OnModuleDestroy { }, (res) => { let data = ''; - res.on('data', (c) => (data += c)); + res.on('data', (c: Buffer) => (data += c.toString())); res.on('end', () => { try { const json = JSON.parse(data);