From 0d5441f720aeaf82145590a744112c62019a7c4c Mon Sep 17 00:00:00 2001 From: hailin Date: Tue, 10 Mar 2026 05:54:42 -0700 Subject: [PATCH] fix(wecom): token mutex, leader lease, backoff, watchdog MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Four additional robustness fixes: 1. **Token refresh mutex** — tokenRefreshPromise deduplicates concurrent refresh calls. All callers share one in-flight HTTP request instead of each firing their own, eliminating the race condition. 2. **Distributed leader lease** — service_state table used for a TTL-based leader election (LEADER_LEASE_TTL_S=90s). Only one agent-service instance polls at a time; others skip until the lease expires. Lease auto-released on graceful shutdown. 3. **Exponential backoff** — consecutive poll errors increment a counter; next delay = min(10s × 2^(n-1), 5min). Prevents log spam and reduces load during sustained WeCom API outages. Counter resets on any successful poll. 4. **Watchdog timer** — setInterval every 2min checks lastPollAt. If poll loop has been silent for >5min, clears the timer and reschedules immediately, recovering from any silent crash. Co-Authored-By: Claude Sonnet 4.6 --- .../wecom/wecom-router.service.ts | 395 ++++++++++-------- 1 file changed, 218 insertions(+), 177 deletions(-) diff --git a/packages/services/agent-service/src/infrastructure/wecom/wecom-router.service.ts b/packages/services/agent-service/src/infrastructure/wecom/wecom-router.service.ts index 9fb345f..cb29826 100644 --- a/packages/services/agent-service/src/infrastructure/wecom/wecom-router.service.ts +++ b/packages/services/agent-service/src/infrastructure/wecom/wecom-router.service.ts @@ -14,19 +14,13 @@ * IT0_WECOM_KF_SECRET — Customer service app secret (客服应用的 Secret) * IT0_WECOM_KF_OPEN_KFID — Customer service account ID (open_kfid,以 wkf 开头) * - * Binding flow (code): - * 1. Frontend calls POST /api/v1/agent/channels/wecom/bind/:instanceId - * 2. Backend returns { code: "A3K9F2", expiresAt, kfUrl } - * 3. User opens WeChat → opens 微信客服 link → sends "A3K9F2" - * 4. This service matches code → saves wecomExternalUserId → replies "✅ 绑定成功" - * * Robustness guarantees: - * - sync_msg cursor persisted to DB (survives restart — no duplicate on resume) - * - Per-user serial queue prevents concurrent LLM calls for same user - * - Dedup map prevents duplicate message processing within session - * - Rate limit: 10 messages/minute per user - * - send_msg retries once on error (2s delay) - * - CALLBACK_TIMEOUT_MS safety valve if bridge crashes (180s) + * - sync_msg cursor persisted to DB (survives restart) + * - Token refresh mutex — concurrent callers share one in-flight refresh + * - Distributed leader lease — only one agent-service instance polls at a time + * - Exponential backoff on consecutive poll errors (up to 5 min) + * - Watchdog timer restarts the poll loop if it silently dies + * - Per-user serial queue, dedup, rate limit, send retry * - Periodic cleanup of in-memory maps (5-min interval) */ @@ -54,21 +48,36 @@ interface BindingEntry { // ── Constants ───────────────────────────────────────────────────────────────── -const WECOM_MAX_CHARS = 2048; // stay below WeChat's limit -const CODE_TTL_MS = 15 * 60 * 1000; // 15 min -const TASK_TIMEOUT_S = 120; -const CALLBACK_TIMEOUT_MS = 180_000; -const THINKING_REMINDER_MS = 25_000; -const DEDUP_TTL_MS = 10 * 60 * 1000; -const RATE_LIMIT_PER_MIN = 10; -const QUEUE_MAX_DEPTH = 5; -const CLEANUP_INTERVAL_MS = 5 * 60 * 1000; -const MAX_RESPONSE_BYTES = 256 * 1024; -const POLL_INTERVAL_ACTIVE_MS = 2_000; -const POLL_INTERVAL_IDLE_MS = 10_000; -const IDLE_THRESHOLD_MS = 30_000; -const SEND_RETRY_DELAY_MS = 2_000; -const STATE_KEY_CURSOR = 'wecom:sync_cursor'; +const WECOM_MAX_CHARS = 2048; +const CODE_TTL_MS = 15 * 60 * 1000; +const TASK_TIMEOUT_S = 120; +const CALLBACK_TIMEOUT_MS = 180_000; +const THINKING_REMINDER_MS = 25_000; +const DEDUP_TTL_MS = 10 * 60 * 1000; +const RATE_LIMIT_PER_MIN = 10; +const QUEUE_MAX_DEPTH = 5; +const CLEANUP_INTERVAL_MS = 5 * 60 * 1000; +const MAX_RESPONSE_BYTES = 256 * 1024; +const POLL_INTERVAL_ACTIVE_MS = 2_000; +const POLL_INTERVAL_IDLE_MS = 10_000; +const IDLE_THRESHOLD_MS = 30_000; +const SEND_RETRY_DELAY_MS = 2_000; + +// Circuit breaker +const BACKOFF_BASE_MS = POLL_INTERVAL_IDLE_MS; +const BACKOFF_MAX_MS = 5 * 60 * 1000; // 5 min cap +const BACKOFF_MULTIPLIER = 2; + +// Distributed leader lease +const LEADER_LEASE_KEY = 'wecom:poll-leader'; +const LEADER_LEASE_TTL_S = 90; // another instance takes over after 90s + +// Watchdog: restart poll loop if last success > threshold +const WATCHDOG_INTERVAL_MS = 2 * 60 * 1000; +const WATCHDOG_THRESHOLD_MS = 5 * 60 * 1000; + +// DB cursor key +const STATE_KEY_CURSOR = 'wecom:sync_cursor'; const WECOM_API_HOST = 'qyapi.weixin.qq.com'; @@ -83,18 +92,23 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { private readonly kfUrl: string; private readonly enabled: boolean; private readonly agentCallbackBaseUrl: string; + private readonly nodeId = crypto.randomUUID(); // unique per process instance - private stopping = false; - private pollTimer?: NodeJS.Timeout; + private stopping = false; + private pollTimer?: NodeJS.Timeout; + private watchdogTimer?: NodeJS.Timeout; private cleanupTimer?: NodeJS.Timeout; - private syncCursor = ''; - private lastMsgTime = 0; + private syncCursor = ''; + private lastMsgTime = 0; + private lastPollAt = 0; // for watchdog + private consecutiveErrors = 0; // for exponential backoff - // Token cache - private tokenCache = ''; - private tokenExpiresAt = 0; + // Token cache + mutex + private tokenCache = ''; + private tokenExpiresAt = 0; + private tokenRefreshPromise?: Promise; - // State + // State maps private readonly bindingCodes = new Map(); private readonly dedup = new Map(); private readonly rateWindows = new Map(); @@ -111,11 +125,11 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { private readonly instanceRepo: AgentInstanceRepository, private readonly dataSource: DataSource, ) { - this.corpId = this.configService.get('IT0_WECOM_CORP_ID', ''); - this.kfSecret = this.configService.get('IT0_WECOM_KF_SECRET', ''); - this.openKfId = this.configService.get('IT0_WECOM_KF_OPEN_KFID', ''); - this.kfUrl = this.configService.get('IT0_WECOM_KF_URL', ''); - this.enabled = !!(this.corpId && this.kfSecret && this.openKfId); + this.corpId = this.configService.get('IT0_WECOM_CORP_ID', ''); + this.kfSecret = this.configService.get('IT0_WECOM_KF_SECRET', ''); + this.openKfId = this.configService.get('IT0_WECOM_KF_OPEN_KFID', ''); + this.kfUrl = this.configService.get('IT0_WECOM_KF_URL', ''); + this.enabled = !!(this.corpId && this.kfSecret && this.openKfId); this.agentCallbackBaseUrl = this.configService.get('AGENT_SERVICE_PUBLIC_URL', ''); } @@ -125,8 +139,9 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { return; } await this.initStateTable(); - this.logger.log('WeCom router starting (sync_msg polling)...'); + this.logger.log(`WeCom router starting (nodeId=${this.nodeId.slice(0, 8)}...)`) this.schedulePoll(POLL_INTERVAL_IDLE_MS); + this.scheduleWatchdog(); this.cleanupTimer = setInterval(() => this.periodicCleanup(), CLEANUP_INTERVAL_MS); if (this.cleanupTimer.unref) this.cleanupTimer.unref(); this.logger.log('WeCom router started'); @@ -135,7 +150,10 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { onModuleDestroy(): void { this.stopping = true; clearTimeout(this.pollTimer); + clearTimeout(this.watchdogTimer); clearInterval(this.cleanupTimer); + // Release leader lease asynchronously (best effort) + this.releaseLeaderLease().catch(() => {}); for (const [, cb] of this.pendingCallbacks) { clearTimeout(cb.timer); cb.reject(new Error('Service shutting down')); @@ -143,20 +161,11 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { this.pendingCallbacks.clear(); } - isEnabled(): boolean { - return this.enabled; - } - - getKfUrl(): string { - return this.kfUrl; - } + isEnabled(): boolean { return this.enabled; } + getKfUrl(): string { return this.kfUrl; } // ── DB state table ───────────────────────────────────────────────────────── - /** - * Ensure the service_state table exists and load persisted cursor. - * Uses a tiny generic KV table to avoid a dedicated migration file. - */ private async initStateTable(): Promise { try { await this.dataSource.query(` @@ -172,20 +181,19 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { ); if (rows.length > 0 && rows[0].value) { this.syncCursor = rows[0].value; - this.logger.log(`WeCom cursor restored from DB: ${this.syncCursor.slice(0, 12)}...`); + this.logger.log(`WeCom cursor restored: ${this.syncCursor.slice(0, 12)}...`); } else { - this.logger.log('WeCom cursor: starting fresh (no persisted cursor)'); + this.logger.log('WeCom cursor: starting fresh'); } } catch (e: any) { - this.logger.warn(`WeCom state table init failed (will start without cursor): ${e.message}`); + this.logger.warn(`WeCom state table init failed (continuing without cursor): ${e.message}`); } } private async persistCursor(cursor: string): Promise { try { await this.dataSource.query( - `INSERT INTO public.service_state (key, value, updated_at) - VALUES ($1, $2, NOW()) + `INSERT INTO public.service_state (key, value, updated_at) VALUES ($1, $2, NOW()) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, updated_at = NOW()`, [STATE_KEY_CURSOR, cursor], ); @@ -194,12 +202,80 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { } } + // ── Distributed leader lease ─────────────────────────────────────────────── + // + // Uses service_state table for a TTL-based leader election. + // Only the leader polls; others skip until the lease expires (90s). + // Value is JSON: { nodeId, pid } + + private leaseValue(): string { + return JSON.stringify({ nodeId: this.nodeId, pid: process.pid }); + } + + /** + * Try to claim or renew the leader lease. + * Returns true if this instance is now the leader. + */ + private async tryClaimLeaderLease(): Promise { + try { + // Upsert: claim the lease if: + // (a) no lease exists yet (INSERT path), or + // (b) we already hold it (nodeId matches), or + // (c) existing lease has expired (updated_at older than TTL) + const result = await this.dataSource.query( + `INSERT INTO public.service_state (key, value, updated_at) VALUES ($1, $2, NOW()) + ON CONFLICT (key) DO UPDATE + SET value = EXCLUDED.value, updated_at = NOW() + WHERE + (service_state.value)::json->>'nodeId' = $3 + OR service_state.updated_at < NOW() - ($4 || ' seconds')::INTERVAL + RETURNING key`, + [LEADER_LEASE_KEY, this.leaseValue(), this.nodeId, LEADER_LEASE_TTL_S], + ); + return result.length > 0; + } catch (e: any) { + // On DB error, fall through and allow polling (fail open — better than stopping) + this.logger.warn(`WeCom leader lease check failed (continuing): ${e.message}`); + return true; + } + } + + private async releaseLeaderLease(): Promise { + try { + await this.dataSource.query( + `DELETE FROM public.service_state + WHERE key = $1 AND (value)::json->>'nodeId' = $2`, + [LEADER_LEASE_KEY, this.nodeId], + ); + } catch { /* ignore on shutdown */ } + } + + // ── Watchdog ─────────────────────────────────────────────────────────────── + + private scheduleWatchdog(): void { + this.watchdogTimer = setInterval(() => this.watchdog(), WATCHDOG_INTERVAL_MS); + if (this.watchdogTimer.unref) this.watchdogTimer.unref(); + } + + private watchdog(): void { + if (this.stopping) return; + const staleSince = Date.now() - this.lastPollAt; + if (this.lastPollAt > 0 && staleSince > WATCHDOG_THRESHOLD_MS) { + this.logger.warn( + `WeCom watchdog: poll loop silent for ${Math.round(staleSince / 1000)}s — restarting`, + ); + clearTimeout(this.pollTimer); + this.consecutiveErrors = 0; + this.schedulePoll(0); + } + } + // ── Async bridge callback ────────────────────────────────────────────────── resolveCallbackReply(msgId: string, ok: boolean, content: string, isTimeout?: boolean): void { const cb = this.pendingCallbacks.get(msgId); if (!cb) { - this.logger.warn(`WeCom: Received callback for unknown msgId=${msgId} (already resolved or timed out)`); + this.logger.warn(`WeCom: callback for unknown msgId=${msgId} (already resolved or timed out)`); return; } this.pendingCallbacks.delete(msgId); @@ -232,16 +308,39 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { if (this.pollTimer.unref) this.pollTimer.unref(); } + private computeBackoff(): number { + if (this.consecutiveErrors === 0) return 0; // caller will pick active/idle + const backoff = Math.min( + BACKOFF_BASE_MS * Math.pow(BACKOFF_MULTIPLIER, this.consecutiveErrors - 1), + BACKOFF_MAX_MS, + ); + return backoff; + } + private async poll(): Promise { if (this.stopping) return; + + // Leader election: skip if another instance holds the lease + const isLeader = await this.tryClaimLeaderLease(); + if (!isLeader) { + this.schedulePoll(POLL_INTERVAL_IDLE_MS); + return; + } + + this.lastPollAt = Date.now(); try { const hasMore = await this.syncMessages(); + this.consecutiveErrors = 0; // reset backoff on success const idle = Date.now() - this.lastMsgTime > IDLE_THRESHOLD_MS; const nextDelay = hasMore ? 0 : (idle ? POLL_INTERVAL_IDLE_MS : POLL_INTERVAL_ACTIVE_MS); this.schedulePoll(nextDelay); } catch (e: any) { - this.logger.warn(`WeCom poll error: ${e.message}`); - this.schedulePoll(POLL_INTERVAL_IDLE_MS); + this.consecutiveErrors++; + const backoff = this.computeBackoff(); + this.logger.warn( + `WeCom poll error (attempt ${this.consecutiveErrors}), backoff=${backoff}ms: ${e.message}`, + ); + this.schedulePoll(backoff); } } @@ -270,21 +369,18 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { }>(path); if (res.errcode !== 0) { - this.logger.warn(`WeCom sync_msg error: ${res.errcode} ${res.errmsg}`); - return false; + throw new Error(`sync_msg API error: ${res.errcode} ${res.errmsg}`); } if (res.next_cursor && res.next_cursor !== this.syncCursor) { this.syncCursor = res.next_cursor; - // Persist asynchronously — don't block the poll loop this.persistCursor(this.syncCursor).catch(() => {}); } const msgList = res.msg_list ?? []; - // Handle system events (enter_session etc.) — origin=0, msgtype="event" - const systemEvents = msgList.filter(m => m.msgtype === 'event' && m.event); - for (const ev of systemEvents) { + // System events (enter_session etc.) + for (const ev of msgList.filter(m => m.msgtype === 'event' && m.event)) { if (ev.event?.event_type === 'enter_session') { this.handleEnterSession(ev.external_userid).catch((e: Error) => { this.logger.error('WeCom handleEnterSession error:', e.message); @@ -292,19 +388,13 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { } } - // Handle user messages — origin=3 - const userMessages = msgList.filter(m => m.origin === 3); - for (const msg of userMessages) { + // User messages (origin=3) + for (const msg of msgList.filter(m => m.origin === 3)) { this.lastMsgTime = Date.now(); - const wecomMsg: WecomMsg = { - externalUserId: msg.external_userid, - msgId: msg.msgid, - text: msg.text?.content ?? '', - openKfId: msg.open_kfid, - }; - this.handleMessage(wecomMsg, msg.msgtype).catch((e: Error) => { - this.logger.error('WeCom handleMessage error:', e.message); - }); + this.handleMessage( + { externalUserId: msg.external_userid, msgId: msg.msgid, text: msg.text?.content ?? '', openKfId: msg.open_kfid }, + msg.msgtype, + ).catch((e: Error) => this.logger.error('WeCom handleMessage error:', e.message)); } return res.has_more === 1; @@ -315,7 +405,6 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { private async handleEnterSession(externalUserId: string): Promise { if (!externalUserId) return; this.logger.log(`WeCom enter_session: externalUserId=${externalUserId}`); - const instance = await this.instanceRepo.findByWecomExternalUserId(externalUserId); if (instance) { await this.sendMessage( @@ -339,14 +428,11 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { private async handleMessage(msg: WecomMsg, msgType: string): Promise { if (!msg.externalUserId || !msg.msgId) return; - - // Dedup if (this.dedup.has(msg.msgId)) return; this.dedup.set(msg.msgId, Date.now()); this.logger.log(`WeCom message: externalUserId=${msg.externalUserId} type=${msgType} msgId=${msg.msgId}`); - // Non-text messages if (msgType !== 'text' || !msg.text) { await this.sendMessage(msg.externalUserId, '我目前只能处理文字消息~\n图片、语音请转换成文字后再发给我。'); return; @@ -355,7 +441,7 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { const text = msg.text.trim(); if (!text) return; - // Binding code check (6-char hex, case-insensitive) + // Binding code check (6-char hex) const upperText = text.toUpperCase(); const bindEntry = this.bindingCodes.get(upperText); if (bindEntry) { @@ -371,13 +457,11 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { return; } - // Rate limit if (!this.rateAllow(msg.externalUserId)) { await this.sendMessage(msg.externalUserId, '消息频率过高,请稍后再试(每分钟最多10条)。'); return; } - // Queue const pendingDepth = this.queueDepths.get(msg.externalUserId) ?? 0; const accepted = this.enqueue(msg.externalUserId, () => this.routeToAgent(msg.externalUserId, text, msg)); if (!accepted) { @@ -415,7 +499,6 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { const instance = await this.instanceRepo.findByWecomExternalUserId(externalUserId); if (!instance) { - this.logger.warn(`No WeCom binding for externalUserId=${externalUserId}`); await this.sendMessage( externalUserId, '👋 你还没有绑定专属小龙虾。\n\n步骤:\n1. 打开 IT0 App\n2. 创建或选择一只小龙虾\n3. 点击「绑定微信」获取验证码\n4. 把验证码发给我就好了~', @@ -424,36 +507,30 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { } if (instance.status !== 'running') { - const statusHint: Record = { + const hints: Record = { stopped: `💤 小龙虾「${instance.name}」正在休息,请在 IT0 App 中点击启动后再来找我~`, starting: `⏳ 小龙虾「${instance.name}」还在启动中,请等待约1分钟后重试。`, error: `⚠️ 小龙虾「${instance.name}」遇到了问题,请在 IT0 App 中检查状态。`, }; await this.sendMessage( externalUserId, - statusHint[instance.status] ?? `小龙虾「${instance.name}」当前无法接收指令(${instance.status}),请在 IT0 App 中处理。`, + hints[instance.status] ?? `小龙虾「${instance.name}」当前无法接收指令(${instance.status}),请在 IT0 App 中处理。`, ); return; } if (!instance.serverHost) { - this.logger.error(`Instance ${instance.id} has no serverHost`); await this.sendMessage(externalUserId, '小龙虾配置异常(缺少服务器地址),请联系管理员。'); return; } const asyncBridgeUrl = `http://${instance.serverHost}:${instance.hostPort}/task-async`; const callbackUrl = `${this.agentCallbackBaseUrl}/api/v1/agent/channels/wecom/bridge-callback`; - this.logger.log( - `WeCom routing msgId=${msg.msgId} → instance ${instance.id} (${instance.name}) @ ${asyncBridgeUrl}`, - ); + this.logger.log(`WeCom routing msgId=${msg.msgId} → instance ${instance.id} @ ${asyncBridgeUrl}`); - // Immediate ack await this.sendMessage(externalUserId, '🤔 小虾米正在思考,稍等~'); - // Progress reminder after 25s - let thinkingTimer: NodeJS.Timeout | undefined; - thinkingTimer = setTimeout(() => { + let thinkingTimer: NodeJS.Timeout | undefined = setTimeout(() => { this.sendMessage(externalUserId, '⏳ 还在努力想呢,这个任务有点复杂,请再等一下~').catch(() => {}); }, THINKING_REMINDER_MS); if (thinkingTimer.unref) thinkingTimer.unref(); @@ -475,12 +552,9 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { const ack = await this.httpPostJson<{ ok: boolean; pending?: boolean; error?: string }>( asyncBridgeUrl, { - prompt: text, - sessionKey: `agent:main:wx-${externalUserId}`, - idempotencyKey: msg.msgId, - timeoutSeconds: TASK_TIMEOUT_S, - callbackUrl, - callbackData: { externalUserId, msgId: msg.msgId }, + prompt: text, sessionKey: `agent:main:wx-${externalUserId}`, + idempotencyKey: msg.msgId, timeoutSeconds: TASK_TIMEOUT_S, + callbackUrl, callbackData: { externalUserId, msgId: msg.msgId }, }, 15_000, ); @@ -488,11 +562,9 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { if (!ack.ok) { this.pendingCallbacks.delete(msg.msgId); const bridgeError = ack.error ?? ''; - if (bridgeError.includes('not connected') || bridgeError.includes('Gateway not connected')) { - reply = `🔄 小虾米正在重启,请等待约30秒后重试。`; - } else { - reply = `小虾米遇到了问题,请稍后重试。`; - } + reply = bridgeError.includes('not connected') || bridgeError.includes('Gateway not connected') + ? `🔄 小虾米正在重启,请等待约30秒后重试。` + : `小虾米遇到了问题,请稍后重试。`; this.logger.warn(`WeCom bridge rejected task for instance ${instance.id}: ${bridgeError}`); } else { reply = await callbackPromise; @@ -504,6 +576,7 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { reply = this.buildErrorReply(e.message, instance.name, !!e.isTimeout); } finally { clearTimeout(thinkingTimer); + thinkingTimer = undefined; } await this.sendMessage(externalUserId, reply); @@ -516,9 +589,8 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { `建议:\n• 把任务拆成更小的步骤\n• 简化指令后重试\n• 如果问题复杂,可以分多轮来说` ); } - if (error.includes('disconnected') || error.includes('not connected')) { + if (error.includes('disconnected') || error.includes('not connected')) return `🔄 「${instanceName}」与服务的连接中断了,请等待约30秒后重试。`; - } if (error.includes('aborted')) return `⚠️ 任务被中止了,请重新发送。`; if (error.includes('shutting down')) return `🔄 服务正在重启,请稍后重试。`; return `😰 小虾米遇到了点问题,请稍后重试。如果持续出现,请联系管理员。`; @@ -526,18 +598,10 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { // ── Send message ─────────────────────────────────────────────────────────── - /** - * Send a text message to a WeChat user via 微信客服 send_msg API. - * Chunked if over WECOM_MAX_CHARS. Retries once on error. - */ private async sendMessage(externalUserId: string, content: string): Promise { - const safe = content.replace(/\s+at\s+\S+:\d+:\d+/g, '').trim() || '(空响应)'; - const chunks: string[] = []; + const safe = content.replace(/\s+at\s+\S+:\d+:\d+/g, '').trim() || '(空响应)'; for (let i = 0; i < safe.length; i += WECOM_MAX_CHARS) { - chunks.push(safe.slice(i, i + WECOM_MAX_CHARS)); - } - for (const chunk of chunks) { - await this.sendChunkWithRetry(externalUserId, chunk); + await this.sendChunkWithRetry(externalUserId, safe.slice(i, i + WECOM_MAX_CHARS)); } } @@ -545,47 +609,40 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { for (let attempt = 0; attempt < 2; attempt++) { try { const token = await this.getAccessToken(); - const res = await this.wecomPost<{ errcode: number; errmsg: string; msgid?: string }>( + const res = await this.wecomPost<{ errcode: number; errmsg: string }>( `/cgi-bin/kf/send_msg?access_token=${token}`, - { - touser: externalUserId, - open_kfid: this.openKfId, - msgtype: 'text', - text: { content: chunk }, - }, + { touser: externalUserId, open_kfid: this.openKfId, msgtype: 'text', text: { content: chunk } }, ); - if (res.errcode === 0) return; // success + if (res.errcode === 0) return; this.logger.warn( - `WeCom send_msg attempt ${attempt + 1} failed: ${res.errcode} ${res.errmsg} externalUserId=${externalUserId}`, + `WeCom send_msg attempt ${attempt + 1} errcode=${res.errcode} ${res.errmsg} externalUserId=${externalUserId}`, ); - if (attempt === 0) { - await new Promise(r => setTimeout(r, SEND_RETRY_DELAY_MS)); - } } catch (e: any) { - this.logger.warn( - `WeCom sendMessage attempt ${attempt + 1} threw for externalUserId=${externalUserId}: ${e.message}`, - ); - if (attempt === 0) { - await new Promise(r => setTimeout(r, SEND_RETRY_DELAY_MS)); - } + this.logger.warn(`WeCom sendChunk attempt ${attempt + 1} threw: ${e.message}`); } + if (attempt === 0) await new Promise(r => setTimeout(r, SEND_RETRY_DELAY_MS)); } this.logger.error(`WeCom sendMessage failed after 2 attempts for externalUserId=${externalUserId}`); } - // ── Access token ─────────────────────────────────────────────────────────── + // ── Access token (with refresh mutex) ───────────────────────────────────── private async getAccessToken(): Promise { if (this.tokenCache && Date.now() < this.tokenExpiresAt - 300_000) { return this.tokenCache; } - const res = await this.wecomGet<{ - errcode: number; - errmsg: string; - access_token: string; - expires_in: number; - }>(`/cgi-bin/gettoken?corpid=${this.corpId}&corpsecret=${this.kfSecret}`); + // Mutex: reuse in-flight refresh if one is already running + if (this.tokenRefreshPromise) return this.tokenRefreshPromise; + this.tokenRefreshPromise = this.refreshToken().finally(() => { + this.tokenRefreshPromise = undefined; + }); + return this.tokenRefreshPromise; + } + private async refreshToken(): Promise { + const res = await this.wecomGet<{ + errcode: number; errmsg: string; access_token: string; expires_in: number; + }>(`/cgi-bin/gettoken?corpid=${this.corpId}&corpsecret=${this.kfSecret}`); if (res.errcode !== 0) throw new Error(`WeCom gettoken error: ${res.errcode} ${res.errmsg}`); this.tokenCache = res.access_token; this.tokenExpiresAt = Date.now() + res.expires_in * 1000; @@ -603,7 +660,7 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { const next = tail .then(task) .catch((e: Error) => { - try { this.logger.error(`Queue task error (${userId}):`, e.message); } catch { /* ignore */ } + try { this.logger.error(`Queue task error (${userId}):`, e.message); } catch { /* */ } }) .finally(() => { const remaining = (this.queueDepths.get(userId) ?? 1) - 1; @@ -623,7 +680,7 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { private rateAllow(userId: string): boolean { const now = Date.now(); - const timestamps = (this.rateWindows.get(userId) ?? []).filter((t) => now - t < 60_000); + const timestamps = (this.rateWindows.get(userId) ?? []).filter(t => now - t < 60_000); if (timestamps.length >= RATE_LIMIT_PER_MIN) { this.rateWindows.set(userId, timestamps); return false; @@ -637,17 +694,15 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { private periodicCleanup(): void { const now = Date.now(); - for (const [id, ts] of this.dedup) { + for (const [id, ts] of this.dedup) if (now - ts > DEDUP_TTL_MS) this.dedup.delete(id); - } for (const [userId, timestamps] of this.rateWindows) { - const fresh = timestamps.filter((t) => now - t < 60_000); + const fresh = timestamps.filter(t => now - t < 60_000); if (fresh.length === 0) this.rateWindows.delete(userId); else this.rateWindows.set(userId, fresh); } - for (const [code, entry] of this.bindingCodes) { + for (const [code, entry] of this.bindingCodes) if (now > entry.expiresAt) this.bindingCodes.delete(code); - } } // ── HTTP helpers ─────────────────────────────────────────────────────────── @@ -657,13 +712,10 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { const req = https.request( { hostname: WECOM_API_HOST, path, method: 'GET', timeout: 10_000 }, (res) => { - let data = ''; let totalBytes = 0; - res.on('data', (chunk: Buffer) => { - totalBytes += chunk.length; - if (totalBytes <= MAX_RESPONSE_BYTES) data += chunk.toString(); - }); + let data = ''; let total = 0; + res.on('data', (c: Buffer) => { total += c.length; if (total <= MAX_RESPONSE_BYTES) data += c.toString(); }); res.on('end', () => { - if (totalBytes > MAX_RESPONSE_BYTES) { reject(new Error('Response too large')); return; } + if (total > MAX_RESPONSE_BYTES) { reject(new Error('Response too large')); return; } try { resolve(JSON.parse(data) as T); } catch (e) { reject(e); } }); }, @@ -680,20 +732,14 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { const req = https.request( { hostname: WECOM_API_HOST, path, method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'Content-Length': Buffer.byteLength(body), - }, + headers: { 'Content-Type': 'application/json', 'Content-Length': Buffer.byteLength(body) }, timeout: 10_000, }, (res) => { - let data = ''; let totalBytes = 0; - res.on('data', (chunk: Buffer) => { - totalBytes += chunk.length; - if (totalBytes <= MAX_RESPONSE_BYTES) data += chunk.toString(); - }); + let data = ''; let total = 0; + res.on('data', (c: Buffer) => { total += c.length; if (total <= MAX_RESPONSE_BYTES) data += c.toString(); }); res.on('end', () => { - if (totalBytes > MAX_RESPONSE_BYTES) { reject(new Error('Response too large')); return; } + if (total > MAX_RESPONSE_BYTES) { reject(new Error('Response too large')); return; } try { resolve(JSON.parse(data) as T); } catch (e) { reject(e); } }); }, @@ -711,21 +757,16 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { const body = JSON.stringify(payload); const req = http.request( { - hostname: parsed.hostname, - port: parseInt(parsed.port, 10), - path: parsed.pathname, - method: 'POST', + hostname: parsed.hostname, port: parseInt(parsed.port, 10), + path: parsed.pathname, method: 'POST', headers: { 'Content-Type': 'application/json', 'Content-Length': Buffer.byteLength(body) }, timeout: timeoutMs, }, (res) => { - let data = ''; let totalBytes = 0; - res.on('data', (c: Buffer) => { - totalBytes += c.length; - if (totalBytes <= MAX_RESPONSE_BYTES) data += c.toString(); - }); + let data = ''; let total = 0; + res.on('data', (c: Buffer) => { total += c.length; if (total <= MAX_RESPONSE_BYTES) data += c.toString(); }); res.on('end', () => { - if (totalBytes > MAX_RESPONSE_BYTES) { reject(new Error('Bridge response too large')); return; } + if (total > MAX_RESPONSE_BYTES) { reject(new Error('Bridge response too large')); return; } try { resolve(JSON.parse(data) as T); } catch (e) { reject(e); } }); },