diff --git a/packages/services/agent-service/src/infrastructure/wecom/wecom-router.service.ts b/packages/services/agent-service/src/infrastructure/wecom/wecom-router.service.ts index 16cdebd..611f024 100644 --- a/packages/services/agent-service/src/infrastructure/wecom/wecom-router.service.ts +++ b/packages/services/agent-service/src/infrastructure/wecom/wecom-router.service.ts @@ -57,6 +57,7 @@ const TASK_TIMEOUT_S = 120; const CALLBACK_TIMEOUT_MS = 180_000; const THINKING_REMINDER_MS = 25_000; const DEDUP_TTL_S = 10 * 60; // Redis key TTL for dedup +const CODE_TTL_S = 15 * 60; // Redis key TTL for binding codes (= CODE_TTL_MS/1000) const RATE_LIMIT_PER_MIN = 10; const QUEUE_MAX_DEPTH = 5; const CLEANUP_INTERVAL_MS = 5 * 60 * 1000; @@ -238,9 +239,8 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { private async redisGetAndDelPending(msgId: string): Promise { if (!this.redis) return null; try { - const val = await this.redis.get(this.redisPendingKey(msgId)); - if (val) await this.redis.del(this.redisPendingKey(msgId)); - return val; + // GETDEL is atomic (Redis 6.2+) — prevents two instances both recovering the same callback + return await (this.redis as any).getdel(this.redisPendingKey(msgId)); } catch { return null; } } @@ -392,16 +392,73 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { // ── Binding code API ─────────────────────────────────────────────────────── - generateBindingCode(instanceId: string): { code: string; expiresAt: number; kfUrl: string } { - for (const [code, entry] of this.bindingCodes) { - if (entry.instanceId === instanceId) this.bindingCodes.delete(code); - } + /** + * Generate a 6-char hex binding code. + * Stored in Redis (shared across instances) with in-memory fallback. + * Redis keys: + * wecom:bindcode:{CODE} → instanceId (TTL=15min) + * wecom:bindcode:inst:{instId} → CODE (reverse lookup to revoke old code) + */ + async generateBindingCode(instanceId: string): Promise<{ code: string; expiresAt: number; kfUrl: string }> { const code = crypto.randomBytes(3).toString('hex').toUpperCase(); const expiresAt = Date.now() + CODE_TTL_MS; - this.bindingCodes.set(code, { instanceId, expiresAt }); + + if (this.redis) { + try { + // Revoke any previous code for this instance + const oldCode = await this.redis.get(`wecom:bindcode:inst:${instanceId}`); + if (oldCode) await this.redis.del(`wecom:bindcode:${oldCode}`); + // Store new code + await this.redis.set(`wecom:bindcode:${code}`, instanceId, 'EX', CODE_TTL_S); + await this.redis.set(`wecom:bindcode:inst:${instanceId}`, code, 'EX', CODE_TTL_S); + this.logger.log(`WeCom bindcode ${code} → instance ${instanceId} (Redis)`); + } catch (e: any) { + this.logger.warn(`WeCom Redis bindcode store failed, using in-memory: ${e.message}`); + this.storeBindCodeMemory(instanceId, code, expiresAt); + } + } else { + this.storeBindCodeMemory(instanceId, code, expiresAt); + } + return { code, expiresAt, kfUrl: this.kfUrl }; } + private storeBindCodeMemory(instanceId: string, code: string, expiresAt: number): void { + for (const [c, entry] of this.bindingCodes) { + if (entry.instanceId === instanceId) this.bindingCodes.delete(c); + } + this.bindingCodes.set(code, { instanceId, expiresAt }); + } + + /** + * Look up a binding code. Checks Redis first, falls back to in-memory Map. + * Returns the instanceId if code is valid and not expired, null otherwise. + */ + private async resolveBindCode(code: string): Promise { + if (this.redis) { + try { + const instanceId = await (this.redis as any).getdel(`wecom:bindcode:${code}`); + if (instanceId) { + // Clean up reverse-lookup key too (best effort) + this.redis.del(`wecom:bindcode:inst:${instanceId}`).catch(() => {}); + return instanceId as string; + } + return null; // not found in Redis — don't fall through to memory (avoid stale match) + } catch (e: any) { + this.logger.warn(`WeCom Redis bindcode lookup failed, trying in-memory: ${e.message}`); + } + } + // Fallback: in-memory (single-instance or Redis unavailable) + const entry = this.bindingCodes.get(code); + if (!entry) return null; + if (Date.now() > entry.expiresAt) { + this.bindingCodes.delete(code); + return null; + } + this.bindingCodes.delete(code); + return entry.instanceId; + } + // ── Polling ──────────────────────────────────────────────────────────────── private schedulePoll(delayMs: number): void { @@ -420,13 +477,13 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { private async poll(): Promise { if (this.stopping) return; + this.lastPollAt = Date.now(); // track all poll attempts so watchdog works for followers too + const isLeader = await this.tryClaimLeaderLease(); if (!isLeader) { this.schedulePoll(POLL_INTERVAL_IDLE_MS); return; } - - this.lastPollAt = Date.now(); try { const hasMore = await this.syncMessages(); this.consecutiveErrors = 0; @@ -480,7 +537,7 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { for (const ev of msgList.filter(m => m.msgtype === 'event' && m.event)) { if (ev.event?.event_type === 'enter_session') { - this.handleEnterSession(ev.external_userid).catch((e: Error) => + this.handleEnterSession(ev.external_userid, ev.msgid).catch((e: Error) => this.logger.error('WeCom handleEnterSession error:', e.message), ); } @@ -499,8 +556,11 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { // ── Enter-session welcome ────────────────────────────────────────────────── - private async handleEnterSession(externalUserId: string): Promise { + private async handleEnterSession(externalUserId: string, msgId: string): Promise { if (!externalUserId) return; + // Dedup: WeCom may retransmit the same enter_session event after cursor reset + const isNew = await this.redisDedup(msgId || `es:${externalUserId}:${Date.now()}`); + if (!isNew) return; this.logger.log(`WeCom enter_session: externalUserId=${externalUserId}`); const instance = await this.instanceRepo.findByWecomExternalUserId(externalUserId); if (instance) { @@ -539,19 +599,19 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { if (!text) return; const upperText = text.toUpperCase(); - const bindEntry = this.bindingCodes.get(upperText); - if (bindEntry) { - if (Date.now() > bindEntry.expiresAt) { - this.bindingCodes.delete(upperText); - await this.sendMessage(msg.externalUserId, '验证码已过期,请重新在 IT0 App 中生成新的验证码。'); - return; - } - this.bindingCodes.delete(upperText); - this.completeBinding(bindEntry.instanceId, msg.externalUserId).catch((e: Error) => + // resolveBindCode returns instanceId (and atomically removes the code), or null if not found/expired + const boundInstanceId = await this.resolveBindCode(upperText); + if (boundInstanceId !== null) { + this.completeBinding(boundInstanceId, msg.externalUserId).catch((e: Error) => this.logger.error('WeCom completeBinding error:', e.message), ); return; } + // 6-char hex pattern but not a valid code → tell user it's expired + if (/^[0-9A-F]{6}$/.test(upperText)) { + await this.sendMessage(msg.externalUserId, '验证码无效或已过期,请重新在 IT0 App 中生成新的验证码。'); + return; + } if (!this.rateAllow(msg.externalUserId)) { await this.sendMessage(msg.externalUserId, '消息频率过高,请稍后再试(每分钟最多10条)。'); diff --git a/packages/services/agent-service/src/interfaces/rest/controllers/agent-channel.controller.ts b/packages/services/agent-service/src/interfaces/rest/controllers/agent-channel.controller.ts index 588fb75..60c0a7b 100644 --- a/packages/services/agent-service/src/interfaces/rest/controllers/agent-channel.controller.ts +++ b/packages/services/agent-service/src/interfaces/rest/controllers/agent-channel.controller.ts @@ -232,7 +232,7 @@ export class AgentChannelController { } const inst = await this.instanceRepo.findById(instanceId); if (!inst) throw new NotFoundException(`Instance ${instanceId} not found`); - const { code, expiresAt, kfUrl } = this.wecomRouter.generateBindingCode(instanceId); + const { code, expiresAt, kfUrl } = await this.wecomRouter.generateBindingCode(instanceId); return { code, expiresAt, kfUrl }; }