diff --git a/packages/services/agent-service/src/infrastructure/wecom/wecom-router.service.ts b/packages/services/agent-service/src/infrastructure/wecom/wecom-router.service.ts index cb29826..cdb438e 100644 --- a/packages/services/agent-service/src/infrastructure/wecom/wecom-router.service.ts +++ b/packages/services/agent-service/src/infrastructure/wecom/wecom-router.service.ts @@ -3,25 +3,17 @@ * * IT0 unified WeCom Customer Service bot — one central polling loop for all agent instances. * - * Responsibilities: - * 1. Poll WeCom sync_msg API every 2s (active) / 10s (idle) for incoming messages. - * 2. Handle binding codes: user sends code → maps their external_userid to an instance. - * 3. Route regular messages: looks up bound instance → POSTs to bridge /task-async → replies. - * 4. Handle enter_session events: send welcome/onboarding message on first contact. - * - * Required env vars: - * IT0_WECOM_CORP_ID — Enterprise ID (from 企业微信管理后台 → 我的企业) - * IT0_WECOM_KF_SECRET — Customer service app secret (客服应用的 Secret) - * IT0_WECOM_KF_OPEN_KFID — Customer service account ID (open_kfid,以 wkf 开头) - * * Robustness guarantees: * - sync_msg cursor persisted to DB (survives restart) * - Token refresh mutex — concurrent callers share one in-flight refresh - * - Distributed leader lease — only one agent-service instance polls at a time + * - Distributed leader lease (fail-closed) — only one instance polls at a time; + * DB failure → skip poll rather than multi-master * - Exponential backoff on consecutive poll errors (up to 5 min) * - Watchdog timer restarts the poll loop if it silently dies - * - Per-user serial queue, dedup, rate limit, send retry - * - Periodic cleanup of in-memory maps (5-min interval) + * - Cross-instance callback recovery via Redis — if instance A crashes mid-task, + * instance B can recover the reply when bridge-callback arrives + * - Per-user serial queue, dedup, rate limit, send retry (1 retry) + * - Health status endpoint for observability */ import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; @@ -30,6 +22,7 @@ import * as https from 'https'; import * as http from 'http'; import * as crypto from 'crypto'; import { DataSource } from 'typeorm'; +import Redis from 'ioredis'; import { AgentInstanceRepository } from '../repositories/agent-instance.repository'; // ── Types ───────────────────────────────────────────────────────────────────── @@ -46,6 +39,16 @@ interface BindingEntry { expiresAt: number; } +export interface WecomHealthStatus { + enabled: boolean; + isLeader: boolean; + lastPollAt: number; // unix ms, 0 = never + staleSinceMs: number; // ms since last successful poll + consecutiveErrors: number; + pendingCallbacks: number; + queuedUsers: number; +} + // ── Constants ───────────────────────────────────────────────────────────────── const WECOM_MAX_CHARS = 2048; @@ -65,17 +68,20 @@ const SEND_RETRY_DELAY_MS = 2_000; // Circuit breaker const BACKOFF_BASE_MS = POLL_INTERVAL_IDLE_MS; -const BACKOFF_MAX_MS = 5 * 60 * 1000; // 5 min cap +const BACKOFF_MAX_MS = 5 * 60 * 1000; const BACKOFF_MULTIPLIER = 2; // Distributed leader lease const LEADER_LEASE_KEY = 'wecom:poll-leader'; -const LEADER_LEASE_TTL_S = 90; // another instance takes over after 90s +const LEADER_LEASE_TTL_S = 90; -// Watchdog: restart poll loop if last success > threshold +// Watchdog const WATCHDOG_INTERVAL_MS = 2 * 60 * 1000; const WATCHDOG_THRESHOLD_MS = 5 * 60 * 1000; +// Redis pending task keys (cross-instance callback recovery) +const REDIS_PENDING_TTL_S = 200; // slightly longer than CALLBACK_TIMEOUT_MS/1000 + // DB cursor key const STATE_KEY_CURSOR = 'wecom:sync_cursor'; @@ -92,22 +98,26 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { private readonly kfUrl: string; private readonly enabled: boolean; private readonly agentCallbackBaseUrl: string; - private readonly nodeId = crypto.randomUUID(); // unique per process instance + private readonly nodeId = crypto.randomUUID(); - private stopping = false; - private pollTimer?: NodeJS.Timeout; + private stopping = false; + private isLeader = false; + private pollTimer?: NodeJS.Timeout; private watchdogTimer?: NodeJS.Timeout; - private cleanupTimer?: NodeJS.Timeout; - private syncCursor = ''; - private lastMsgTime = 0; - private lastPollAt = 0; // for watchdog - private consecutiveErrors = 0; // for exponential backoff + private cleanupTimer?: NodeJS.Timeout; + private syncCursor = ''; + private lastMsgTime = 0; + private lastPollAt = 0; + private consecutiveErrors = 0; // Token cache + mutex private tokenCache = ''; private tokenExpiresAt = 0; private tokenRefreshPromise?: Promise; + // Redis client for cross-instance callback recovery + private redis?: Redis; + // State maps private readonly bindingCodes = new Map(); private readonly dedup = new Map(); @@ -138,8 +148,9 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { this.logger.warn('IT0_WECOM_CORP_ID/KF_SECRET/KF_OPEN_KFID not set — WeCom router disabled'); return; } + this.initRedis(); await this.initStateTable(); - this.logger.log(`WeCom router starting (nodeId=${this.nodeId.slice(0, 8)}...)`) + this.logger.log(`WeCom router starting (nodeId=${this.nodeId.slice(0, 8)}...)`); this.schedulePoll(POLL_INTERVAL_IDLE_MS); this.scheduleWatchdog(); this.cleanupTimer = setInterval(() => this.periodicCleanup(), CLEANUP_INTERVAL_MS); @@ -147,23 +158,81 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { this.logger.log('WeCom router started'); } - onModuleDestroy(): void { + async onModuleDestroy(): Promise { this.stopping = true; clearTimeout(this.pollTimer); clearTimeout(this.watchdogTimer); clearInterval(this.cleanupTimer); - // Release leader lease asynchronously (best effort) - this.releaseLeaderLease().catch(() => {}); - for (const [, cb] of this.pendingCallbacks) { + await this.releaseLeaderLease(); + + // Gracefully reject pending callbacks and clean up Redis keys + for (const [msgId, cb] of this.pendingCallbacks) { clearTimeout(cb.timer); cb.reject(new Error('Service shutting down')); + this.redisDel(this.redisPendingKey(msgId)); } this.pendingCallbacks.clear(); + + if (this.redis) { + await this.redis.quit().catch(() => {}); + } } isEnabled(): boolean { return this.enabled; } getKfUrl(): string { return this.kfUrl; } + getStatus(): WecomHealthStatus { + return { + enabled: this.enabled, + isLeader: this.isLeader, + lastPollAt: this.lastPollAt, + staleSinceMs: this.lastPollAt ? Date.now() - this.lastPollAt : 0, + consecutiveErrors: this.consecutiveErrors, + pendingCallbacks: this.pendingCallbacks.size, + queuedUsers: this.queueDepths.size, + }; + } + + // ── Redis ────────────────────────────────────────────────────────────────── + + private initRedis(): void { + const redisUrl = this.configService.get('REDIS_URL', 'redis://localhost:6379'); + try { + this.redis = new Redis(redisUrl, { lazyConnect: false, enableOfflineQueue: false }); + this.redis.on('error', (e: Error) => + this.logger.warn(`WeCom Redis error: ${e.message}`), + ); + this.logger.log('WeCom Redis client connected'); + } catch (e: any) { + this.logger.warn(`WeCom Redis init failed (cross-instance recovery disabled): ${e.message}`); + } + } + + private redisPendingKey(msgId: string): string { + return `wecom:pending:${msgId}`; + } + + private async redisSetPending(msgId: string, externalUserId: string): Promise { + if (!this.redis) return; + try { + await this.redis.set(this.redisPendingKey(msgId), externalUserId, 'EX', REDIS_PENDING_TTL_S); + } catch { /* non-fatal */ } + } + + private async redisGetAndDelPending(msgId: string): Promise { + if (!this.redis) return null; + try { + const val = await this.redis.get(this.redisPendingKey(msgId)); + if (val) await this.redis.del(this.redisPendingKey(msgId)); + return val; + } catch { return null; } + } + + private redisDel(key: string): void { + if (!this.redis) return; + this.redis.del(key).catch(() => {}); + } + // ── DB state table ───────────────────────────────────────────────────────── private async initStateTable(): Promise { @@ -202,11 +271,7 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { } } - // ── Distributed leader lease ─────────────────────────────────────────────── - // - // Uses service_state table for a TTL-based leader election. - // Only the leader polls; others skip until the lease expires (90s). - // Value is JSON: { nodeId, pid } + // ── Distributed leader lease (fail-closed) ──────────────────────────────── private leaseValue(): string { return JSON.stringify({ nodeId: this.nodeId, pid: process.pid }); @@ -214,14 +279,10 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { /** * Try to claim or renew the leader lease. - * Returns true if this instance is now the leader. + * Returns false on DB error (fail-closed) — prevents multi-master on DB outage. */ private async tryClaimLeaderLease(): Promise { try { - // Upsert: claim the lease if: - // (a) no lease exists yet (INSERT path), or - // (b) we already hold it (nodeId matches), or - // (c) existing lease has expired (updated_at older than TTL) const result = await this.dataSource.query( `INSERT INTO public.service_state (key, value, updated_at) VALUES ($1, $2, NOW()) ON CONFLICT (key) DO UPDATE @@ -232,21 +293,23 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { RETURNING key`, [LEADER_LEASE_KEY, this.leaseValue(), this.nodeId, LEADER_LEASE_TTL_S], ); - return result.length > 0; + this.isLeader = result.length > 0; + return this.isLeader; } catch (e: any) { - // On DB error, fall through and allow polling (fail open — better than stopping) - this.logger.warn(`WeCom leader lease check failed (continuing): ${e.message}`); - return true; + // Fail-closed: DB error → do NOT poll (avoid multi-master) + this.logger.warn(`WeCom leader lease DB error (skipping poll): ${e.message}`); + this.isLeader = false; + return false; } } private async releaseLeaderLease(): Promise { try { await this.dataSource.query( - `DELETE FROM public.service_state - WHERE key = $1 AND (value)::json->>'nodeId' = $2`, + `DELETE FROM public.service_state WHERE key = $1 AND (value)::json->>'nodeId' = $2`, [LEADER_LEASE_KEY, this.nodeId], ); + this.isLeader = false; } catch { /* ignore on shutdown */ } } @@ -270,23 +333,45 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { } } - // ── Async bridge callback ────────────────────────────────────────────────── + // ── Async bridge callback (with cross-instance recovery) ────────────────── - resolveCallbackReply(msgId: string, ok: boolean, content: string, isTimeout?: boolean): void { + async resolveCallbackReply( + msgId: string, ok: boolean, content: string, isTimeout?: boolean, + ): Promise { + // Fast path: we hold the Promise locally const cb = this.pendingCallbacks.get(msgId); - if (!cb) { - this.logger.warn(`WeCom: callback for unknown msgId=${msgId} (already resolved or timed out)`); + if (cb) { + this.pendingCallbacks.delete(msgId); + clearTimeout(cb.timer); + this.redisDel(this.redisPendingKey(msgId)); + if (ok) { + cb.resolve(content); + } else { + const err: Error & { isTimeout?: boolean } = new Error(content); + err.isTimeout = isTimeout ?? false; + cb.reject(err); + } return; } - this.pendingCallbacks.delete(msgId); - clearTimeout(cb.timer); - if (ok) { - cb.resolve(content); - } else { - const err: Error & { isTimeout?: boolean } = new Error(content); - err.isTimeout = isTimeout ?? false; - cb.reject(err); + + // Recovery path: another instance started this task; send reply directly via WeChat API + const externalUserId = await this.redisGetAndDelPending(msgId); + if (externalUserId) { + this.logger.warn( + `WeCom cross-instance recovery: msgId=${msgId} externalUserId=${externalUserId}`, + ); + const reply = ok + ? content + : this.buildErrorReply(content, '小龙虾', isTimeout ?? false); + await this.sendMessage(externalUserId, reply).catch((e: Error) => + this.logger.error(`WeCom recovery sendMessage failed: ${e.message}`), + ); + return; } + + this.logger.warn( + `WeCom: callback for unknown msgId=${msgId} (already resolved, timed out, or cross-instance without Redis)`, + ); } // ── Binding code API ─────────────────────────────────────────────────────── @@ -309,18 +394,16 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { } private computeBackoff(): number { - if (this.consecutiveErrors === 0) return 0; // caller will pick active/idle - const backoff = Math.min( + if (this.consecutiveErrors === 0) return 0; + return Math.min( BACKOFF_BASE_MS * Math.pow(BACKOFF_MULTIPLIER, this.consecutiveErrors - 1), BACKOFF_MAX_MS, ); - return backoff; } private async poll(): Promise { if (this.stopping) return; - // Leader election: skip if another instance holds the lease const isLeader = await this.tryClaimLeaderLease(); if (!isLeader) { this.schedulePoll(POLL_INTERVAL_IDLE_MS); @@ -330,7 +413,7 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { this.lastPollAt = Date.now(); try { const hasMore = await this.syncMessages(); - this.consecutiveErrors = 0; // reset backoff on success + this.consecutiveErrors = 0; const idle = Date.now() - this.lastMsgTime > IDLE_THRESHOLD_MS; const nextDelay = hasMore ? 0 : (idle ? POLL_INTERVAL_IDLE_MS : POLL_INTERVAL_ACTIVE_MS); this.schedulePoll(nextDelay); @@ -361,7 +444,7 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { open_kfid: string; external_userid: string; send_time: number; - origin: number; // 0=system/event, 3=user, 4=bot, 5=human agent + origin: number; msgtype: string; text?: { content: string }; event?: { event_type: string }; @@ -379,16 +462,14 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { const msgList = res.msg_list ?? []; - // System events (enter_session etc.) for (const ev of msgList.filter(m => m.msgtype === 'event' && m.event)) { if (ev.event?.event_type === 'enter_session') { - this.handleEnterSession(ev.external_userid).catch((e: Error) => { - this.logger.error('WeCom handleEnterSession error:', e.message); - }); + this.handleEnterSession(ev.external_userid).catch((e: Error) => + this.logger.error('WeCom handleEnterSession error:', e.message), + ); } } - // User messages (origin=3) for (const msg of msgList.filter(m => m.origin === 3)) { this.lastMsgTime = Date.now(); this.handleMessage( @@ -441,7 +522,6 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { const text = msg.text.trim(); if (!text) return; - // Binding code check (6-char hex) const upperText = text.toUpperCase(); const bindEntry = this.bindingCodes.get(upperText); if (bindEntry) { @@ -540,6 +620,7 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { const callbackPromise = new Promise((resolve, reject) => { const timer = setTimeout(() => { this.pendingCallbacks.delete(msg.msgId); + this.redisDel(this.redisPendingKey(msg.msgId)); const err: Error & { isTimeout?: boolean } = new Error( `Async bridge callback timeout after ${CALLBACK_TIMEOUT_MS / 1000}s`, ); @@ -549,6 +630,9 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { this.pendingCallbacks.set(msg.msgId, { resolve, reject, timer }); }); + // Store in Redis for cross-instance recovery in case this instance crashes + await this.redisSetPending(msg.msgId, externalUserId); + const ack = await this.httpPostJson<{ ok: boolean; pending?: boolean; error?: string }>( asyncBridgeUrl, { @@ -561,6 +645,7 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { if (!ack.ok) { this.pendingCallbacks.delete(msg.msgId); + this.redisDel(this.redisPendingKey(msg.msgId)); const bridgeError = ack.error ?? ''; reply = bridgeError.includes('not connected') || bridgeError.includes('Gateway not connected') ? `🔄 小虾米正在重启,请等待约30秒后重试。` @@ -572,6 +657,7 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { } } catch (e: any) { this.pendingCallbacks.delete(msg.msgId); + this.redisDel(this.redisPendingKey(msg.msgId)); this.logger.error(`WeCom async bridge failed for instance ${instance.id}:`, e.message); reply = this.buildErrorReply(e.message, instance.name, !!e.isTimeout); } finally { @@ -615,7 +701,7 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { ); if (res.errcode === 0) return; this.logger.warn( - `WeCom send_msg attempt ${attempt + 1} errcode=${res.errcode} ${res.errmsg} externalUserId=${externalUserId}`, + `WeCom send_msg attempt ${attempt + 1} errcode=${res.errcode} ${res.errmsg} for ${externalUserId}`, ); } catch (e: any) { this.logger.warn(`WeCom sendChunk attempt ${attempt + 1} threw: ${e.message}`); @@ -631,7 +717,6 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { if (this.tokenCache && Date.now() < this.tokenExpiresAt - 300_000) { return this.tokenCache; } - // Mutex: reuse in-flight refresh if one is already running if (this.tokenRefreshPromise) return this.tokenRefreshPromise; this.tokenRefreshPromise = this.refreshToken().finally(() => { this.tokenRefreshPromise = undefined; diff --git a/packages/services/agent-service/src/interfaces/rest/controllers/agent-channel.controller.ts b/packages/services/agent-service/src/interfaces/rest/controllers/agent-channel.controller.ts index 7335e6f..588fb75 100644 --- a/packages/services/agent-service/src/interfaces/rest/controllers/agent-channel.controller.ts +++ b/packages/services/agent-service/src/interfaces/rest/controllers/agent-channel.controller.ts @@ -258,8 +258,12 @@ export class AgentChannelController { * WeCom bridge async-task callback — called by OpenClaw bridge. * Unauthenticated; internal service only. */ + /** + * resolveCallbackReply is async — on cross-instance recovery it sends + * the reply directly to the WeChat user via WeChat API. + */ @Post('wecom/bridge-callback') - handleWecomBridgeCallback( + async handleWecomBridgeCallback( @Body() body: { ok: boolean; result?: string; @@ -274,12 +278,18 @@ export class AgentChannelController { `WeCom bridge callback: ok=${ok} msgId=${msgId} externalUserId=${externalUserId} ` + `${ok ? `replyLen=${result?.length ?? 0}` : `error=${error} isTimeout=${isTimeout}`}`, ); - this.wecomRouter.resolveCallbackReply( + await this.wecomRouter.resolveCallbackReply( msgId, ok, ok ? (result ?? '') : (error ?? '智能体没有返回内容。'), isTimeout, ); return { received: true }; } + /** WeCom poll-loop health — isLeader, lastPollAt, pendingCallbacks, consecutiveErrors */ + @Get('wecom/health') + getWecomHealth() { + return this.wecomRouter.getStatus(); + } + private htmlPage(title: string, message: string, success: boolean): string { const color = success ? '#22C55E' : '#EF4444'; const icon = success ? '✅' : '❌';