fix(wecom): 4 bugs — watchdog follower, atomic getdel, Redis bindcodes, enter_session dedup

Bug 1 — Watchdog doesn't track followers:
  lastPollAt = Date.now() moved before leader check. All poll()
  invocations update the timestamp, so if a follower's loop dies
  the watchdog fires after WATCHDOG_THRESHOLD_MS and restarts it.

Bug 2 — Non-atomic GetDel for cross-instance recovery:
  Replaced GET + DEL with atomic GETDEL (Redis 6.2+, ioredis v5).
  Two instances can no longer both recover the same callback reply.

Bug 3 — Binding codes stored in per-process memory:
  generateBindingCode() now async; stores in Redis:
    wecom:bindcode:{CODE}        → instanceId  (TTL 15min)
    wecom:bindcode:inst:{instId} → CODE        (reverse lookup)
  resolveBindCode() uses GETDEL atomically, then deletes reverse key.
  Falls back to in-memory Map when Redis is unavailable.
  Old code for same instance is revoked on regenerate.
  handleMessage updated: resolveBindCode() replaces Map.get();
  6-char hex pattern with no match now returns expired-code hint.
  Controller wecomGenerateBindCode now awaits generateBindingCode().

Bug 4 — enter_session events not deduplicated:
  handleEnterSession now receives msgId from the event.
  redisDedup(msgId) called before sending welcome message — prevents
  duplicate welcomes on WeCom retransmission or cursor reset.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-03-10 06:46:17 -07:00
parent b3180b0727
commit 61b2778ff0
2 changed files with 82 additions and 22 deletions

View File

@ -57,6 +57,7 @@ const TASK_TIMEOUT_S = 120;
const CALLBACK_TIMEOUT_MS = 180_000;
const THINKING_REMINDER_MS = 25_000;
const DEDUP_TTL_S = 10 * 60; // Redis key TTL for dedup
const CODE_TTL_S = 15 * 60; // Redis key TTL for binding codes (= CODE_TTL_MS/1000)
const RATE_LIMIT_PER_MIN = 10;
const QUEUE_MAX_DEPTH = 5;
const CLEANUP_INTERVAL_MS = 5 * 60 * 1000;
@ -238,9 +239,8 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
private async redisGetAndDelPending(msgId: string): Promise<string | null> {
if (!this.redis) return null;
try {
const val = await this.redis.get(this.redisPendingKey(msgId));
if (val) await this.redis.del(this.redisPendingKey(msgId));
return val;
// GETDEL is atomic (Redis 6.2+) — prevents two instances both recovering the same callback
return await (this.redis as any).getdel(this.redisPendingKey(msgId));
} catch { return null; }
}
@ -392,16 +392,73 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
// ── Binding code API ───────────────────────────────────────────────────────
generateBindingCode(instanceId: string): { code: string; expiresAt: number; kfUrl: string } {
for (const [code, entry] of this.bindingCodes) {
if (entry.instanceId === instanceId) this.bindingCodes.delete(code);
}
/**
* Generate a 6-char hex binding code.
* Stored in Redis (shared across instances) with in-memory fallback.
* Redis keys:
* wecom:bindcode:{CODE} instanceId (TTL=15min)
* wecom:bindcode:inst:{instId} CODE (reverse lookup to revoke old code)
*/
async generateBindingCode(instanceId: string): Promise<{ code: string; expiresAt: number; kfUrl: string }> {
const code = crypto.randomBytes(3).toString('hex').toUpperCase();
const expiresAt = Date.now() + CODE_TTL_MS;
this.bindingCodes.set(code, { instanceId, expiresAt });
if (this.redis) {
try {
// Revoke any previous code for this instance
const oldCode = await this.redis.get(`wecom:bindcode:inst:${instanceId}`);
if (oldCode) await this.redis.del(`wecom:bindcode:${oldCode}`);
// Store new code
await this.redis.set(`wecom:bindcode:${code}`, instanceId, 'EX', CODE_TTL_S);
await this.redis.set(`wecom:bindcode:inst:${instanceId}`, code, 'EX', CODE_TTL_S);
this.logger.log(`WeCom bindcode ${code} → instance ${instanceId} (Redis)`);
} catch (e: any) {
this.logger.warn(`WeCom Redis bindcode store failed, using in-memory: ${e.message}`);
this.storeBindCodeMemory(instanceId, code, expiresAt);
}
} else {
this.storeBindCodeMemory(instanceId, code, expiresAt);
}
return { code, expiresAt, kfUrl: this.kfUrl };
}
private storeBindCodeMemory(instanceId: string, code: string, expiresAt: number): void {
for (const [c, entry] of this.bindingCodes) {
if (entry.instanceId === instanceId) this.bindingCodes.delete(c);
}
this.bindingCodes.set(code, { instanceId, expiresAt });
}
/**
* Look up a binding code. Checks Redis first, falls back to in-memory Map.
* Returns the instanceId if code is valid and not expired, null otherwise.
*/
private async resolveBindCode(code: string): Promise<string | null> {
if (this.redis) {
try {
const instanceId = await (this.redis as any).getdel(`wecom:bindcode:${code}`);
if (instanceId) {
// Clean up reverse-lookup key too (best effort)
this.redis.del(`wecom:bindcode:inst:${instanceId}`).catch(() => {});
return instanceId as string;
}
return null; // not found in Redis — don't fall through to memory (avoid stale match)
} catch (e: any) {
this.logger.warn(`WeCom Redis bindcode lookup failed, trying in-memory: ${e.message}`);
}
}
// Fallback: in-memory (single-instance or Redis unavailable)
const entry = this.bindingCodes.get(code);
if (!entry) return null;
if (Date.now() > entry.expiresAt) {
this.bindingCodes.delete(code);
return null;
}
this.bindingCodes.delete(code);
return entry.instanceId;
}
// ── Polling ────────────────────────────────────────────────────────────────
private schedulePoll(delayMs: number): void {
@ -420,13 +477,13 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
private async poll(): Promise<void> {
if (this.stopping) return;
this.lastPollAt = Date.now(); // track all poll attempts so watchdog works for followers too
const isLeader = await this.tryClaimLeaderLease();
if (!isLeader) {
this.schedulePoll(POLL_INTERVAL_IDLE_MS);
return;
}
this.lastPollAt = Date.now();
try {
const hasMore = await this.syncMessages();
this.consecutiveErrors = 0;
@ -480,7 +537,7 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
for (const ev of msgList.filter(m => m.msgtype === 'event' && m.event)) {
if (ev.event?.event_type === 'enter_session') {
this.handleEnterSession(ev.external_userid).catch((e: Error) =>
this.handleEnterSession(ev.external_userid, ev.msgid).catch((e: Error) =>
this.logger.error('WeCom handleEnterSession error:', e.message),
);
}
@ -499,8 +556,11 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
// ── Enter-session welcome ──────────────────────────────────────────────────
private async handleEnterSession(externalUserId: string): Promise<void> {
private async handleEnterSession(externalUserId: string, msgId: string): Promise<void> {
if (!externalUserId) return;
// Dedup: WeCom may retransmit the same enter_session event after cursor reset
const isNew = await this.redisDedup(msgId || `es:${externalUserId}:${Date.now()}`);
if (!isNew) return;
this.logger.log(`WeCom enter_session: externalUserId=${externalUserId}`);
const instance = await this.instanceRepo.findByWecomExternalUserId(externalUserId);
if (instance) {
@ -539,19 +599,19 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
if (!text) return;
const upperText = text.toUpperCase();
const bindEntry = this.bindingCodes.get(upperText);
if (bindEntry) {
if (Date.now() > bindEntry.expiresAt) {
this.bindingCodes.delete(upperText);
await this.sendMessage(msg.externalUserId, '验证码已过期,请重新在 IT0 App 中生成新的验证码。');
return;
}
this.bindingCodes.delete(upperText);
this.completeBinding(bindEntry.instanceId, msg.externalUserId).catch((e: Error) =>
// resolveBindCode returns instanceId (and atomically removes the code), or null if not found/expired
const boundInstanceId = await this.resolveBindCode(upperText);
if (boundInstanceId !== null) {
this.completeBinding(boundInstanceId, msg.externalUserId).catch((e: Error) =>
this.logger.error('WeCom completeBinding error:', e.message),
);
return;
}
// 6-char hex pattern but not a valid code → tell user it's expired
if (/^[0-9A-F]{6}$/.test(upperText)) {
await this.sendMessage(msg.externalUserId, '验证码无效或已过期,请重新在 IT0 App 中生成新的验证码。');
return;
}
if (!this.rateAllow(msg.externalUserId)) {
await this.sendMessage(msg.externalUserId, '消息频率过高请稍后再试每分钟最多10条。');

View File

@ -232,7 +232,7 @@ export class AgentChannelController {
}
const inst = await this.instanceRepo.findById(instanceId);
if (!inst) throw new NotFoundException(`Instance ${instanceId} not found`);
const { code, expiresAt, kfUrl } = this.wecomRouter.generateBindingCode(instanceId);
const { code, expiresAt, kfUrl } = await this.wecomRouter.generateBindingCode(instanceId);
return { code, expiresAt, kfUrl };
}