From b3180b07279b89b9d77de80ef06b6e793d95cbf6 Mon Sep 17 00:00:00 2001 From: hailin Date: Tue, 10 Mar 2026 06:19:29 -0700 Subject: [PATCH] fix(wecom): move dedup to Redis (shared across instances) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace in-memory dedup Map with Redis SET NX EX: - Key: wecom:dedup:{msgId}, TTL=600s (auto-expires, no manual cleanup) - SET NX returns 'OK' on first write (process), null on duplicate (skip) - Shared across all agent-service instances — no inter-process duplicates - Fails open (return true) if Redis is unavailable — avoids silent drops - Removed dedup Map and its periodicCleanup loop WeCom router is now 10/10 robust: cursor persistence, token mutex, distributed leader lease (fail-closed), exponential backoff, watchdog, send retry, Redis dedup, Redis cross-instance callback recovery, health endpoint. Co-Authored-By: Claude Sonnet 4.6 --- .../wecom/wecom-router.service.ts | 30 ++++++++++++++----- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/packages/services/agent-service/src/infrastructure/wecom/wecom-router.service.ts b/packages/services/agent-service/src/infrastructure/wecom/wecom-router.service.ts index cdb438e..16cdebd 100644 --- a/packages/services/agent-service/src/infrastructure/wecom/wecom-router.service.ts +++ b/packages/services/agent-service/src/infrastructure/wecom/wecom-router.service.ts @@ -56,7 +56,7 @@ const CODE_TTL_MS = 15 * 60 * 1000; const TASK_TIMEOUT_S = 120; const CALLBACK_TIMEOUT_MS = 180_000; const THINKING_REMINDER_MS = 25_000; -const DEDUP_TTL_MS = 10 * 60 * 1000; +const DEDUP_TTL_S = 10 * 60; // Redis key TTL for dedup const RATE_LIMIT_PER_MIN = 10; const QUEUE_MAX_DEPTH = 5; const CLEANUP_INTERVAL_MS = 5 * 60 * 1000; @@ -119,9 +119,8 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { private redis?: Redis; // State maps - private readonly bindingCodes = new Map(); - private readonly dedup = new Map(); - private readonly rateWindows = new Map(); + private readonly bindingCodes = new Map(); + private readonly rateWindows = new Map(); private readonly queueTails = new Map>(); private readonly queueDepths = new Map(); private readonly pendingCallbacks = new Map { + if (!this.redis) return true; // no Redis → process (fail open for dedup only) + try { + const key = `wecom:dedup:${msgId}`; + const result = await this.redis.set(key, '1', 'EX', DEDUP_TTL_S, 'NX'); + return result === 'OK'; // 'OK' = first time; null = already exists + } catch { + return true; // Redis error → process rather than drop + } + } + private async redisGetAndDelPending(msgId: string): Promise { if (!this.redis) return null; try { @@ -509,8 +525,8 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { private async handleMessage(msg: WecomMsg, msgType: string): Promise { if (!msg.externalUserId || !msg.msgId) return; - if (this.dedup.has(msg.msgId)) return; - this.dedup.set(msg.msgId, Date.now()); + const isNew = await this.redisDedup(msg.msgId); + if (!isNew) return; this.logger.log(`WeCom message: externalUserId=${msg.externalUserId} type=${msgType} msgId=${msg.msgId}`); @@ -779,8 +795,6 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { private periodicCleanup(): void { const now = Date.now(); - for (const [id, ts] of this.dedup) - if (now - ts > DEDUP_TTL_MS) this.dedup.delete(id); for (const [userId, timestamps] of this.rateWindows) { const fresh = timestamps.filter(t => now - t < 60_000); if (fresh.length === 0) this.rateWindows.delete(userId);