diff --git a/packages/services/agent-service/src/infrastructure/wecom/wecom-router.service.ts b/packages/services/agent-service/src/infrastructure/wecom/wecom-router.service.ts index cdb438e..16cdebd 100644 --- a/packages/services/agent-service/src/infrastructure/wecom/wecom-router.service.ts +++ b/packages/services/agent-service/src/infrastructure/wecom/wecom-router.service.ts @@ -56,7 +56,7 @@ const CODE_TTL_MS = 15 * 60 * 1000; const TASK_TIMEOUT_S = 120; const CALLBACK_TIMEOUT_MS = 180_000; const THINKING_REMINDER_MS = 25_000; -const DEDUP_TTL_MS = 10 * 60 * 1000; +const DEDUP_TTL_S = 10 * 60; // Redis key TTL for dedup const RATE_LIMIT_PER_MIN = 10; const QUEUE_MAX_DEPTH = 5; const CLEANUP_INTERVAL_MS = 5 * 60 * 1000; @@ -119,9 +119,8 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { private redis?: Redis; // State maps - private readonly bindingCodes = new Map(); - private readonly dedup = new Map(); - private readonly rateWindows = new Map(); + private readonly bindingCodes = new Map(); + private readonly rateWindows = new Map(); private readonly queueTails = new Map>(); private readonly queueDepths = new Map(); private readonly pendingCallbacks = new Map { + if (!this.redis) return true; // no Redis → process (fail open for dedup only) + try { + const key = `wecom:dedup:${msgId}`; + const result = await this.redis.set(key, '1', 'EX', DEDUP_TTL_S, 'NX'); + return result === 'OK'; // 'OK' = first time; null = already exists + } catch { + return true; // Redis error → process rather than drop + } + } + private async redisGetAndDelPending(msgId: string): Promise { if (!this.redis) return null; try { @@ -509,8 +525,8 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { private async handleMessage(msg: WecomMsg, msgType: string): Promise { if (!msg.externalUserId || !msg.msgId) return; - if (this.dedup.has(msg.msgId)) return; - this.dedup.set(msg.msgId, Date.now()); + const isNew = await this.redisDedup(msg.msgId); + if (!isNew) return; this.logger.log(`WeCom message: externalUserId=${msg.externalUserId} type=${msgType} msgId=${msg.msgId}`); @@ -779,8 +795,6 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { private periodicCleanup(): void { const now = Date.now(); - for (const [id, ts] of this.dedup) - if (now - ts > DEDUP_TTL_MS) this.dedup.delete(id); for (const [userId, timestamps] of this.rateWindows) { const fresh = timestamps.filter(t => now - t < 60_000); if (fresh.length === 0) this.rateWindows.delete(userId);