fix(wecom): health endpoint, fail-closed lease, Redis cross-instance recovery

Fix 1 — Observability (health endpoint):
  WecomRouterService.getStatus() returns { enabled, isLeader, lastPollAt,
  staleSinceMs, consecutiveErrors, pendingCallbacks, queuedUsers }.
  GET /api/v1/agent/channels/wecom/health exposes it.

Fix 2 — Leader lease fail-closed:
  tryClaimLeaderLease() catch now returns false instead of true.
  DB failure → skip poll, preventing multi-master on DB outage.
  isLeader flag tracked for health status.

Fix 3 — Cross-instance callback recovery via Redis:
  routeToAgent() stores wecom:pending:{msgId} → externalUserId in Redis
  with 200s TTL before waiting for the bridge callback.
  resolveCallbackReply() is now async:
    Fast path  — local pendingCallbacks (same instance, 99% case)
    Recovery   — Redis GET → send reply directly to WeChat user
  onModuleDestroy() cleans up Redis keys on graceful shutdown.
  wecom/bridge-callback handler updated to await resolveCallbackReply.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-03-10 06:07:58 -07:00
parent 0d5441f720
commit e87924563c
2 changed files with 169 additions and 74 deletions

View File

@ -3,25 +3,17 @@
*
* IT0 unified WeCom Customer Service bot one central polling loop for all agent instances.
*
* Responsibilities:
* 1. Poll WeCom sync_msg API every 2s (active) / 10s (idle) for incoming messages.
* 2. Handle binding codes: user sends code maps their external_userid to an instance.
* 3. Route regular messages: looks up bound instance POSTs to bridge /task-async replies.
* 4. Handle enter_session events: send welcome/onboarding message on first contact.
*
* Required env vars:
* IT0_WECOM_CORP_ID Enterprise ID (from )
* IT0_WECOM_KF_SECRET Customer service app secret ( Secret)
* IT0_WECOM_KF_OPEN_KFID Customer service account ID (open_kfid wkf )
*
* Robustness guarantees:
* - sync_msg cursor persisted to DB (survives restart)
* - Token refresh mutex concurrent callers share one in-flight refresh
* - Distributed leader lease only one agent-service instance polls at a time
* - Distributed leader lease (fail-closed) only one instance polls at a time;
* DB failure skip poll rather than multi-master
* - Exponential backoff on consecutive poll errors (up to 5 min)
* - Watchdog timer restarts the poll loop if it silently dies
* - Per-user serial queue, dedup, rate limit, send retry
* - Periodic cleanup of in-memory maps (5-min interval)
* - Cross-instance callback recovery via Redis if instance A crashes mid-task,
* instance B can recover the reply when bridge-callback arrives
* - Per-user serial queue, dedup, rate limit, send retry (1 retry)
* - Health status endpoint for observability
*/
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
@ -30,6 +22,7 @@ import * as https from 'https';
import * as http from 'http';
import * as crypto from 'crypto';
import { DataSource } from 'typeorm';
import Redis from 'ioredis';
import { AgentInstanceRepository } from '../repositories/agent-instance.repository';
// ── Types ─────────────────────────────────────────────────────────────────────
@ -46,6 +39,16 @@ interface BindingEntry {
expiresAt: number;
}
export interface WecomHealthStatus {
enabled: boolean;
isLeader: boolean;
lastPollAt: number; // unix ms, 0 = never
staleSinceMs: number; // ms since last successful poll
consecutiveErrors: number;
pendingCallbacks: number;
queuedUsers: number;
}
// ── Constants ─────────────────────────────────────────────────────────────────
const WECOM_MAX_CHARS = 2048;
@ -65,17 +68,20 @@ const SEND_RETRY_DELAY_MS = 2_000;
// Circuit breaker
const BACKOFF_BASE_MS = POLL_INTERVAL_IDLE_MS;
const BACKOFF_MAX_MS = 5 * 60 * 1000; // 5 min cap
const BACKOFF_MAX_MS = 5 * 60 * 1000;
const BACKOFF_MULTIPLIER = 2;
// Distributed leader lease
const LEADER_LEASE_KEY = 'wecom:poll-leader';
const LEADER_LEASE_TTL_S = 90; // another instance takes over after 90s
const LEADER_LEASE_TTL_S = 90;
// Watchdog: restart poll loop if last success > threshold
// Watchdog
const WATCHDOG_INTERVAL_MS = 2 * 60 * 1000;
const WATCHDOG_THRESHOLD_MS = 5 * 60 * 1000;
// Redis pending task keys (cross-instance callback recovery)
const REDIS_PENDING_TTL_S = 200; // slightly longer than CALLBACK_TIMEOUT_MS/1000
// DB cursor key
const STATE_KEY_CURSOR = 'wecom:sync_cursor';
@ -92,22 +98,26 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
private readonly kfUrl: string;
private readonly enabled: boolean;
private readonly agentCallbackBaseUrl: string;
private readonly nodeId = crypto.randomUUID(); // unique per process instance
private readonly nodeId = crypto.randomUUID();
private stopping = false;
private pollTimer?: NodeJS.Timeout;
private stopping = false;
private isLeader = false;
private pollTimer?: NodeJS.Timeout;
private watchdogTimer?: NodeJS.Timeout;
private cleanupTimer?: NodeJS.Timeout;
private syncCursor = '';
private lastMsgTime = 0;
private lastPollAt = 0; // for watchdog
private consecutiveErrors = 0; // for exponential backoff
private cleanupTimer?: NodeJS.Timeout;
private syncCursor = '';
private lastMsgTime = 0;
private lastPollAt = 0;
private consecutiveErrors = 0;
// Token cache + mutex
private tokenCache = '';
private tokenExpiresAt = 0;
private tokenRefreshPromise?: Promise<string>;
// Redis client for cross-instance callback recovery
private redis?: Redis;
// State maps
private readonly bindingCodes = new Map<string, BindingEntry>();
private readonly dedup = new Map<string, number>();
@ -138,8 +148,9 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
this.logger.warn('IT0_WECOM_CORP_ID/KF_SECRET/KF_OPEN_KFID not set — WeCom router disabled');
return;
}
this.initRedis();
await this.initStateTable();
this.logger.log(`WeCom router starting (nodeId=${this.nodeId.slice(0, 8)}...)`)
this.logger.log(`WeCom router starting (nodeId=${this.nodeId.slice(0, 8)}...)`);
this.schedulePoll(POLL_INTERVAL_IDLE_MS);
this.scheduleWatchdog();
this.cleanupTimer = setInterval(() => this.periodicCleanup(), CLEANUP_INTERVAL_MS);
@ -147,23 +158,81 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
this.logger.log('WeCom router started');
}
onModuleDestroy(): void {
async onModuleDestroy(): Promise<void> {
this.stopping = true;
clearTimeout(this.pollTimer);
clearTimeout(this.watchdogTimer);
clearInterval(this.cleanupTimer);
// Release leader lease asynchronously (best effort)
this.releaseLeaderLease().catch(() => {});
for (const [, cb] of this.pendingCallbacks) {
await this.releaseLeaderLease();
// Gracefully reject pending callbacks and clean up Redis keys
for (const [msgId, cb] of this.pendingCallbacks) {
clearTimeout(cb.timer);
cb.reject(new Error('Service shutting down'));
this.redisDel(this.redisPendingKey(msgId));
}
this.pendingCallbacks.clear();
if (this.redis) {
await this.redis.quit().catch(() => {});
}
}
isEnabled(): boolean { return this.enabled; }
getKfUrl(): string { return this.kfUrl; }
getStatus(): WecomHealthStatus {
return {
enabled: this.enabled,
isLeader: this.isLeader,
lastPollAt: this.lastPollAt,
staleSinceMs: this.lastPollAt ? Date.now() - this.lastPollAt : 0,
consecutiveErrors: this.consecutiveErrors,
pendingCallbacks: this.pendingCallbacks.size,
queuedUsers: this.queueDepths.size,
};
}
// ── Redis ──────────────────────────────────────────────────────────────────
private initRedis(): void {
const redisUrl = this.configService.get<string>('REDIS_URL', 'redis://localhost:6379');
try {
this.redis = new Redis(redisUrl, { lazyConnect: false, enableOfflineQueue: false });
this.redis.on('error', (e: Error) =>
this.logger.warn(`WeCom Redis error: ${e.message}`),
);
this.logger.log('WeCom Redis client connected');
} catch (e: any) {
this.logger.warn(`WeCom Redis init failed (cross-instance recovery disabled): ${e.message}`);
}
}
private redisPendingKey(msgId: string): string {
return `wecom:pending:${msgId}`;
}
private async redisSetPending(msgId: string, externalUserId: string): Promise<void> {
if (!this.redis) return;
try {
await this.redis.set(this.redisPendingKey(msgId), externalUserId, 'EX', REDIS_PENDING_TTL_S);
} catch { /* non-fatal */ }
}
private async redisGetAndDelPending(msgId: string): Promise<string | null> {
if (!this.redis) return null;
try {
const val = await this.redis.get(this.redisPendingKey(msgId));
if (val) await this.redis.del(this.redisPendingKey(msgId));
return val;
} catch { return null; }
}
private redisDel(key: string): void {
if (!this.redis) return;
this.redis.del(key).catch(() => {});
}
// ── DB state table ─────────────────────────────────────────────────────────
private async initStateTable(): Promise<void> {
@ -202,11 +271,7 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
}
}
// ── Distributed leader lease ───────────────────────────────────────────────
//
// Uses service_state table for a TTL-based leader election.
// Only the leader polls; others skip until the lease expires (90s).
// Value is JSON: { nodeId, pid }
// ── Distributed leader lease (fail-closed) ────────────────────────────────
private leaseValue(): string {
return JSON.stringify({ nodeId: this.nodeId, pid: process.pid });
@ -214,14 +279,10 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
/**
* Try to claim or renew the leader lease.
* Returns true if this instance is now the leader.
* Returns false on DB error (fail-closed) prevents multi-master on DB outage.
*/
private async tryClaimLeaderLease(): Promise<boolean> {
try {
// Upsert: claim the lease if:
// (a) no lease exists yet (INSERT path), or
// (b) we already hold it (nodeId matches), or
// (c) existing lease has expired (updated_at older than TTL)
const result = await this.dataSource.query(
`INSERT INTO public.service_state (key, value, updated_at) VALUES ($1, $2, NOW())
ON CONFLICT (key) DO UPDATE
@ -232,21 +293,23 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
RETURNING key`,
[LEADER_LEASE_KEY, this.leaseValue(), this.nodeId, LEADER_LEASE_TTL_S],
);
return result.length > 0;
this.isLeader = result.length > 0;
return this.isLeader;
} catch (e: any) {
// On DB error, fall through and allow polling (fail open — better than stopping)
this.logger.warn(`WeCom leader lease check failed (continuing): ${e.message}`);
return true;
// Fail-closed: DB error → do NOT poll (avoid multi-master)
this.logger.warn(`WeCom leader lease DB error (skipping poll): ${e.message}`);
this.isLeader = false;
return false;
}
}
private async releaseLeaderLease(): Promise<void> {
try {
await this.dataSource.query(
`DELETE FROM public.service_state
WHERE key = $1 AND (value)::json->>'nodeId' = $2`,
`DELETE FROM public.service_state WHERE key = $1 AND (value)::json->>'nodeId' = $2`,
[LEADER_LEASE_KEY, this.nodeId],
);
this.isLeader = false;
} catch { /* ignore on shutdown */ }
}
@ -270,23 +333,45 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
}
}
// ── Async bridge callback ──────────────────────────────────────────────────
// ── Async bridge callback (with cross-instance recovery) ──────────────────
resolveCallbackReply(msgId: string, ok: boolean, content: string, isTimeout?: boolean): void {
async resolveCallbackReply(
msgId: string, ok: boolean, content: string, isTimeout?: boolean,
): Promise<void> {
// Fast path: we hold the Promise locally
const cb = this.pendingCallbacks.get(msgId);
if (!cb) {
this.logger.warn(`WeCom: callback for unknown msgId=${msgId} (already resolved or timed out)`);
if (cb) {
this.pendingCallbacks.delete(msgId);
clearTimeout(cb.timer);
this.redisDel(this.redisPendingKey(msgId));
if (ok) {
cb.resolve(content);
} else {
const err: Error & { isTimeout?: boolean } = new Error(content);
err.isTimeout = isTimeout ?? false;
cb.reject(err);
}
return;
}
this.pendingCallbacks.delete(msgId);
clearTimeout(cb.timer);
if (ok) {
cb.resolve(content);
} else {
const err: Error & { isTimeout?: boolean } = new Error(content);
err.isTimeout = isTimeout ?? false;
cb.reject(err);
// Recovery path: another instance started this task; send reply directly via WeChat API
const externalUserId = await this.redisGetAndDelPending(msgId);
if (externalUserId) {
this.logger.warn(
`WeCom cross-instance recovery: msgId=${msgId} externalUserId=${externalUserId}`,
);
const reply = ok
? content
: this.buildErrorReply(content, '小龙虾', isTimeout ?? false);
await this.sendMessage(externalUserId, reply).catch((e: Error) =>
this.logger.error(`WeCom recovery sendMessage failed: ${e.message}`),
);
return;
}
this.logger.warn(
`WeCom: callback for unknown msgId=${msgId} (already resolved, timed out, or cross-instance without Redis)`,
);
}
// ── Binding code API ───────────────────────────────────────────────────────
@ -309,18 +394,16 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
}
private computeBackoff(): number {
if (this.consecutiveErrors === 0) return 0; // caller will pick active/idle
const backoff = Math.min(
if (this.consecutiveErrors === 0) return 0;
return Math.min(
BACKOFF_BASE_MS * Math.pow(BACKOFF_MULTIPLIER, this.consecutiveErrors - 1),
BACKOFF_MAX_MS,
);
return backoff;
}
private async poll(): Promise<void> {
if (this.stopping) return;
// Leader election: skip if another instance holds the lease
const isLeader = await this.tryClaimLeaderLease();
if (!isLeader) {
this.schedulePoll(POLL_INTERVAL_IDLE_MS);
@ -330,7 +413,7 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
this.lastPollAt = Date.now();
try {
const hasMore = await this.syncMessages();
this.consecutiveErrors = 0; // reset backoff on success
this.consecutiveErrors = 0;
const idle = Date.now() - this.lastMsgTime > IDLE_THRESHOLD_MS;
const nextDelay = hasMore ? 0 : (idle ? POLL_INTERVAL_IDLE_MS : POLL_INTERVAL_ACTIVE_MS);
this.schedulePoll(nextDelay);
@ -361,7 +444,7 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
open_kfid: string;
external_userid: string;
send_time: number;
origin: number; // 0=system/event, 3=user, 4=bot, 5=human agent
origin: number;
msgtype: string;
text?: { content: string };
event?: { event_type: string };
@ -379,16 +462,14 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
const msgList = res.msg_list ?? [];
// System events (enter_session etc.)
for (const ev of msgList.filter(m => m.msgtype === 'event' && m.event)) {
if (ev.event?.event_type === 'enter_session') {
this.handleEnterSession(ev.external_userid).catch((e: Error) => {
this.logger.error('WeCom handleEnterSession error:', e.message);
});
this.handleEnterSession(ev.external_userid).catch((e: Error) =>
this.logger.error('WeCom handleEnterSession error:', e.message),
);
}
}
// User messages (origin=3)
for (const msg of msgList.filter(m => m.origin === 3)) {
this.lastMsgTime = Date.now();
this.handleMessage(
@ -441,7 +522,6 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
const text = msg.text.trim();
if (!text) return;
// Binding code check (6-char hex)
const upperText = text.toUpperCase();
const bindEntry = this.bindingCodes.get(upperText);
if (bindEntry) {
@ -540,6 +620,7 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
const callbackPromise = new Promise<string>((resolve, reject) => {
const timer = setTimeout(() => {
this.pendingCallbacks.delete(msg.msgId);
this.redisDel(this.redisPendingKey(msg.msgId));
const err: Error & { isTimeout?: boolean } = new Error(
`Async bridge callback timeout after ${CALLBACK_TIMEOUT_MS / 1000}s`,
);
@ -549,6 +630,9 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
this.pendingCallbacks.set(msg.msgId, { resolve, reject, timer });
});
// Store in Redis for cross-instance recovery in case this instance crashes
await this.redisSetPending(msg.msgId, externalUserId);
const ack = await this.httpPostJson<{ ok: boolean; pending?: boolean; error?: string }>(
asyncBridgeUrl,
{
@ -561,6 +645,7 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
if (!ack.ok) {
this.pendingCallbacks.delete(msg.msgId);
this.redisDel(this.redisPendingKey(msg.msgId));
const bridgeError = ack.error ?? '';
reply = bridgeError.includes('not connected') || bridgeError.includes('Gateway not connected')
? `🔄 小虾米正在重启请等待约30秒后重试。`
@ -572,6 +657,7 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
}
} catch (e: any) {
this.pendingCallbacks.delete(msg.msgId);
this.redisDel(this.redisPendingKey(msg.msgId));
this.logger.error(`WeCom async bridge failed for instance ${instance.id}:`, e.message);
reply = this.buildErrorReply(e.message, instance.name, !!e.isTimeout);
} finally {
@ -615,7 +701,7 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
);
if (res.errcode === 0) return;
this.logger.warn(
`WeCom send_msg attempt ${attempt + 1} errcode=${res.errcode} ${res.errmsg} externalUserId=${externalUserId}`,
`WeCom send_msg attempt ${attempt + 1} errcode=${res.errcode} ${res.errmsg} for ${externalUserId}`,
);
} catch (e: any) {
this.logger.warn(`WeCom sendChunk attempt ${attempt + 1} threw: ${e.message}`);
@ -631,7 +717,6 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
if (this.tokenCache && Date.now() < this.tokenExpiresAt - 300_000) {
return this.tokenCache;
}
// Mutex: reuse in-flight refresh if one is already running
if (this.tokenRefreshPromise) return this.tokenRefreshPromise;
this.tokenRefreshPromise = this.refreshToken().finally(() => {
this.tokenRefreshPromise = undefined;

View File

@ -258,8 +258,12 @@ export class AgentChannelController {
* WeCom bridge async-task callback called by OpenClaw bridge.
* Unauthenticated; internal service only.
*/
/**
* resolveCallbackReply is async on cross-instance recovery it sends
* the reply directly to the WeChat user via WeChat API.
*/
@Post('wecom/bridge-callback')
handleWecomBridgeCallback(
async handleWecomBridgeCallback(
@Body() body: {
ok: boolean;
result?: string;
@ -274,12 +278,18 @@ export class AgentChannelController {
`WeCom bridge callback: ok=${ok} msgId=${msgId} externalUserId=${externalUserId} ` +
`${ok ? `replyLen=${result?.length ?? 0}` : `error=${error} isTimeout=${isTimeout}`}`,
);
this.wecomRouter.resolveCallbackReply(
await this.wecomRouter.resolveCallbackReply(
msgId, ok, ok ? (result ?? '') : (error ?? '智能体没有返回内容。'), isTimeout,
);
return { received: true };
}
/** WeCom poll-loop health — isLeader, lastPollAt, pendingCallbacks, consecutiveErrors */
@Get('wecom/health')
getWecomHealth() {
return this.wecomRouter.getStatus();
}
private htmlPage(title: string, message: string, success: boolean): string {
const color = success ? '#22C55E' : '#EF4444';
const icon = success ? '✅' : '❌';