fix(wecom): move dedup to Redis (shared across instances)
Replace in-memory dedup Map with Redis SET NX EX:
- Key: wecom:dedup:{msgId}, TTL=600s (auto-expires, no manual cleanup)
- SET NX returns 'OK' on first write (process), null on duplicate (skip)
- Shared across all agent-service instances — no inter-process duplicates
- Fails open (return true) if Redis is unavailable — avoids silent drops
- Removed dedup Map and its periodicCleanup loop
WeCom router is now 10/10 robust:
cursor persistence, token mutex, distributed leader lease (fail-closed),
exponential backoff, watchdog, send retry, Redis dedup, Redis cross-instance
callback recovery, health endpoint.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
e87924563c
commit
b3180b0727
|
|
@ -56,7 +56,7 @@ const CODE_TTL_MS = 15 * 60 * 1000;
|
|||
const TASK_TIMEOUT_S = 120;
|
||||
const CALLBACK_TIMEOUT_MS = 180_000;
|
||||
const THINKING_REMINDER_MS = 25_000;
|
||||
const DEDUP_TTL_MS = 10 * 60 * 1000;
|
||||
const DEDUP_TTL_S = 10 * 60; // Redis key TTL for dedup
|
||||
const RATE_LIMIT_PER_MIN = 10;
|
||||
const QUEUE_MAX_DEPTH = 5;
|
||||
const CLEANUP_INTERVAL_MS = 5 * 60 * 1000;
|
||||
|
|
@ -120,7 +120,6 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
|||
|
||||
// State maps
|
||||
private readonly bindingCodes = new Map<string, BindingEntry>();
|
||||
private readonly dedup = new Map<string, number>();
|
||||
private readonly rateWindows = new Map<string, number[]>();
|
||||
private readonly queueTails = new Map<string, Promise<void>>();
|
||||
private readonly queueDepths = new Map<string, number>();
|
||||
|
|
@ -219,6 +218,23 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
|||
} catch { /* non-fatal */ }
|
||||
}
|
||||
|
||||
/**
|
||||
* Dedup check using Redis SET NX EX.
|
||||
* Returns true if this is the FIRST time we've seen this msgId (should process).
|
||||
* Returns false if already seen (duplicate — skip).
|
||||
* Falls back to true (process) if Redis is unavailable, to avoid silent drops.
|
||||
*/
|
||||
private async redisDedup(msgId: string): Promise<boolean> {
|
||||
if (!this.redis) return true; // no Redis → process (fail open for dedup only)
|
||||
try {
|
||||
const key = `wecom:dedup:${msgId}`;
|
||||
const result = await this.redis.set(key, '1', 'EX', DEDUP_TTL_S, 'NX');
|
||||
return result === 'OK'; // 'OK' = first time; null = already exists
|
||||
} catch {
|
||||
return true; // Redis error → process rather than drop
|
||||
}
|
||||
}
|
||||
|
||||
private async redisGetAndDelPending(msgId: string): Promise<string | null> {
|
||||
if (!this.redis) return null;
|
||||
try {
|
||||
|
|
@ -509,8 +525,8 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
|||
|
||||
private async handleMessage(msg: WecomMsg, msgType: string): Promise<void> {
|
||||
if (!msg.externalUserId || !msg.msgId) return;
|
||||
if (this.dedup.has(msg.msgId)) return;
|
||||
this.dedup.set(msg.msgId, Date.now());
|
||||
const isNew = await this.redisDedup(msg.msgId);
|
||||
if (!isNew) return;
|
||||
|
||||
this.logger.log(`WeCom message: externalUserId=${msg.externalUserId} type=${msgType} msgId=${msg.msgId}`);
|
||||
|
||||
|
|
@ -779,8 +795,6 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
|||
|
||||
private periodicCleanup(): void {
|
||||
const now = Date.now();
|
||||
for (const [id, ts] of this.dedup)
|
||||
if (now - ts > DEDUP_TTL_MS) this.dedup.delete(id);
|
||||
for (const [userId, timestamps] of this.rateWindows) {
|
||||
const fresh = timestamps.filter(t => now - t < 60_000);
|
||||
if (fresh.length === 0) this.rateWindows.delete(userId);
|
||||
|
|
|
|||
Loading…
Reference in New Issue