From 647df6e42fa83e1828339d4b7d2e4be13cdfcfcd Mon Sep 17 00:00:00 2001 From: hailin Date: Mon, 9 Mar 2026 10:54:36 -0700 Subject: [PATCH] =?UTF-8?q?feat(wecom):=20add=20WeChat=20Customer=20Servic?= =?UTF-8?q?e=20channel=20=E2=80=94=20sync=5Fmsg=20polling=20+=20code=20bin?= =?UTF-8?q?ding=20+=20bridge=20callback?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/gateway/config/kong.yml | 9 + .../agent-service/src/agent.module.ts | 2 + .../domain/entities/agent-instance.entity.ts | 3 + .../repositories/agent-instance.repository.ts | 4 + .../wecom/wecom-router.service.ts | 633 ++++++++++++++++++ .../controllers/agent-channel.controller.ts | 60 ++ 6 files changed, 711 insertions(+) create mode 100644 packages/services/agent-service/src/infrastructure/wecom/wecom-router.service.ts diff --git a/packages/gateway/config/kong.yml b/packages/gateway/config/kong.yml index d48a3cb..16509b5 100644 --- a/packages/gateway/config/kong.yml +++ b/packages/gateway/config/kong.yml @@ -39,6 +39,15 @@ services: - /api/v1/agent/channels/feishu/oauth/callback strip_path: false + # Public WeCom bridge callback — no JWT (internal bridge POSTs here after LLM completes) + - name: wecom-public + url: http://agent-service:3002 + routes: + - name: wecom-bridge-callback + paths: + - /api/v1/agent/channels/wecom/bridge-callback + strip_path: false + - name: agent-service url: http://agent-service:3002 routes: diff --git a/packages/services/agent-service/src/agent.module.ts b/packages/services/agent-service/src/agent.module.ts index f8520af..8c9c9aa 100644 --- a/packages/services/agent-service/src/agent.module.ts +++ b/packages/services/agent-service/src/agent.module.ts @@ -54,6 +54,7 @@ import { AgentInstanceController } from './interfaces/rest/controllers/agent-ins import { AgentChannelController } from './interfaces/rest/controllers/agent-channel.controller'; import { DingTalkRouterService } from './infrastructure/dingtalk/dingtalk-router.service'; import { FeishuRouterService } from './infrastructure/feishu/feishu-router.service'; +import { WecomRouterService } from './infrastructure/wecom/wecom-router.service'; import { SystemPromptBuilder } from './infrastructure/engines/claude-code-cli/system-prompt-builder'; @Module({ @@ -103,6 +104,7 @@ import { SystemPromptBuilder } from './infrastructure/engines/claude-code-cli/sy AgentInstanceDeployService, DingTalkRouterService, FeishuRouterService, + WecomRouterService, SystemPromptBuilder, ], }) diff --git a/packages/services/agent-service/src/domain/entities/agent-instance.entity.ts b/packages/services/agent-service/src/domain/entities/agent-instance.entity.ts index 9dcc636..df8d282 100644 --- a/packages/services/agent-service/src/domain/entities/agent-instance.entity.ts +++ b/packages/services/agent-service/src/domain/entities/agent-instance.entity.ts @@ -50,6 +50,9 @@ export class AgentInstance { @Column({ type: 'varchar', length: 100, name: 'feishu_user_id', nullable: true }) feishuUserId?: string; + @Column({ type: 'varchar', length: 100, name: 'wecom_external_user_id', nullable: true }) + wecomExternalUserId?: string; + @Column({ type: 'jsonb', default: {} }) config!: Record; diff --git a/packages/services/agent-service/src/infrastructure/repositories/agent-instance.repository.ts b/packages/services/agent-service/src/infrastructure/repositories/agent-instance.repository.ts index 6598c92..e4a35ad 100644 --- a/packages/services/agent-service/src/infrastructure/repositories/agent-instance.repository.ts +++ b/packages/services/agent-service/src/infrastructure/repositories/agent-instance.repository.ts @@ -35,6 +35,10 @@ export class AgentInstanceRepository { return this.repo.findOne({ where: { feishuUserId, status: Not('removed') } as any }); } + findByWecomExternalUserId(wecomExternalUserId: string): Promise { + return this.repo.findOne({ where: { wecomExternalUserId, status: Not('removed') } as any }); + } + save(instance: AgentInstance): Promise { return this.repo.save(instance); } diff --git a/packages/services/agent-service/src/infrastructure/wecom/wecom-router.service.ts b/packages/services/agent-service/src/infrastructure/wecom/wecom-router.service.ts new file mode 100644 index 0000000..0c451ec --- /dev/null +++ b/packages/services/agent-service/src/infrastructure/wecom/wecom-router.service.ts @@ -0,0 +1,633 @@ +/** + * WeCom Customer Service Router (微信客服 / 企业微信客服) + * + * IT0 unified WeCom Customer Service bot — one central polling loop for all agent instances. + * + * Responsibilities: + * 1. Poll WeCom sync_msg API every 2s (active) / 10s (idle) for incoming messages. + * 2. Handle binding codes: user sends code → maps their external_userid to an instance. + * 3. Route regular messages: looks up bound instance → POSTs to bridge /task-async → replies. + * + * Required env vars: + * IT0_WECOM_CORP_ID — Enterprise ID (from 企业微信管理后台 → 我的企业) + * IT0_WECOM_KF_SECRET — Customer service app secret (客服应用的 Secret) + * IT0_WECOM_KF_OPEN_KFID — Customer service account ID (open_kfid,以 wkf 开头) + * + * Binding flow (code): + * 1. Frontend calls POST /api/v1/agent/channels/wecom/bind/:instanceId + * 2. Backend returns { code: "A3K9F2", expiresAt, kfUrl } + * 3. User opens WeChat → opens 微信客服 link → sends "A3K9F2" + * 4. This service matches code → saves wecomExternalUserId → replies "✅ 绑定成功" + * + * Robustness guarantees: + * - sync_msg cursor persisted in memory (resets on restart — WeCom deduplicates by msgid) + * - Per-user serial queue prevents concurrent LLM calls for same user + * - Dedup map prevents duplicate message processing + * - Rate limit: 10 messages/minute per user + * - CALLBACK_TIMEOUT_MS safety valve if bridge crashes (180s) + * - Periodic cleanup of in-memory maps (5-min interval) + */ + +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import * as https from 'https'; +import * as http from 'http'; +import * as crypto from 'crypto'; +import { AgentInstanceRepository } from '../repositories/agent-instance.repository'; + +// ── Types ───────────────────────────────────────────────────────────────────── + +interface WecomMsg { + externalUserId: string; + msgId: string; + text: string; + openKfId: string; +} + +interface BindingEntry { + instanceId: string; + expiresAt: number; +} + +// ── Constants ───────────────────────────────────────────────────────────────── + +const WECOM_MAX_CHARS = 2048; // stay below WeChat's limit +const CODE_TTL_MS = 15 * 60 * 1000; // 15 min +const TASK_TIMEOUT_S = 120; +const CALLBACK_TIMEOUT_MS = 180_000; +const THINKING_REMINDER_MS = 25_000; +const DEDUP_TTL_MS = 10 * 60 * 1000; +const RATE_LIMIT_PER_MIN = 10; +const QUEUE_MAX_DEPTH = 5; +const CLEANUP_INTERVAL_MS = 5 * 60 * 1000; +const MAX_RESPONSE_BYTES = 256 * 1024; +const POLL_INTERVAL_ACTIVE_MS = 2_000; +const POLL_INTERVAL_IDLE_MS = 10_000; +const IDLE_THRESHOLD_MS = 30_000; + +const WECOM_API_HOST = 'qyapi.weixin.qq.com'; + +// ── Service ─────────────────────────────────────────────────────────────────── + +@Injectable() +export class WecomRouterService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(WecomRouterService.name); + private readonly corpId: string; + private readonly kfSecret: string; + private readonly openKfId: string; + private readonly kfUrl: string; + private readonly enabled: boolean; + private readonly agentCallbackBaseUrl: string; + + private stopping = false; + private pollTimer?: NodeJS.Timeout; + private cleanupTimer?: NodeJS.Timeout; + private syncCursor = ''; + private lastMsgTime = 0; + + // Token cache + private tokenCache = ''; + private tokenExpiresAt = 0; + + // State + private readonly bindingCodes = new Map(); + private readonly dedup = new Map(); + private readonly rateWindows = new Map(); + private readonly queueTails = new Map>(); + private readonly queueDepths = new Map(); + private readonly pendingCallbacks = new Map void; + reject: (e: Error) => void; + timer: NodeJS.Timeout; + }>(); + + constructor( + private readonly configService: ConfigService, + private readonly instanceRepo: AgentInstanceRepository, + ) { + this.corpId = this.configService.get('IT0_WECOM_CORP_ID', ''); + this.kfSecret = this.configService.get('IT0_WECOM_KF_SECRET', ''); + this.openKfId = this.configService.get('IT0_WECOM_KF_OPEN_KFID', ''); + this.kfUrl = this.configService.get('IT0_WECOM_KF_URL', ''); + this.enabled = !!(this.corpId && this.kfSecret && this.openKfId); + this.agentCallbackBaseUrl = this.configService.get('AGENT_SERVICE_PUBLIC_URL', ''); + } + + onModuleInit(): void { + if (!this.enabled) { + this.logger.warn('IT0_WECOM_CORP_ID/KF_SECRET/KF_OPEN_KFID not set — WeCom router disabled'); + return; + } + this.logger.log('WeCom router starting (sync_msg polling)...'); + this.schedulePoll(POLL_INTERVAL_IDLE_MS); + this.cleanupTimer = setInterval(() => this.periodicCleanup(), CLEANUP_INTERVAL_MS); + if (this.cleanupTimer.unref) this.cleanupTimer.unref(); + this.logger.log('WeCom router started'); + } + + onModuleDestroy(): void { + this.stopping = true; + clearTimeout(this.pollTimer); + clearInterval(this.cleanupTimer); + for (const [, cb] of this.pendingCallbacks) { + clearTimeout(cb.timer); + cb.reject(new Error('Service shutting down')); + } + this.pendingCallbacks.clear(); + } + + isEnabled(): boolean { + return this.enabled; + } + + getKfUrl(): string { + return this.kfUrl; + } + + // ── Async bridge callback ────────────────────────────────────────────────── + + resolveCallbackReply(msgId: string, ok: boolean, content: string, isTimeout?: boolean): void { + const cb = this.pendingCallbacks.get(msgId); + if (!cb) { + this.logger.warn(`WeCom: Received callback for unknown msgId=${msgId} (already resolved or timed out)`); + return; + } + this.pendingCallbacks.delete(msgId); + clearTimeout(cb.timer); + if (ok) { + cb.resolve(content); + } else { + const err: Error & { isTimeout?: boolean } = new Error(content); + err.isTimeout = isTimeout ?? false; + cb.reject(err); + } + } + + // ── Binding code API ─────────────────────────────────────────────────────── + + generateBindingCode(instanceId: string): { code: string; expiresAt: number; kfUrl: string } { + for (const [code, entry] of this.bindingCodes) { + if (entry.instanceId === instanceId) this.bindingCodes.delete(code); + } + const code = crypto.randomBytes(3).toString('hex').toUpperCase(); + const expiresAt = Date.now() + CODE_TTL_MS; + this.bindingCodes.set(code, { instanceId, expiresAt }); + return { code, expiresAt, kfUrl: this.kfUrl }; + } + + // ── Polling ──────────────────────────────────────────────────────────────── + + private schedulePoll(delayMs: number): void { + this.pollTimer = setTimeout(() => this.poll(), delayMs); + if (this.pollTimer.unref) this.pollTimer.unref(); + } + + private async poll(): Promise { + if (this.stopping) return; + try { + const hasMore = await this.syncMessages(); + const idle = Date.now() - this.lastMsgTime > IDLE_THRESHOLD_MS; + const nextDelay = hasMore ? 0 : (idle ? POLL_INTERVAL_IDLE_MS : POLL_INTERVAL_ACTIVE_MS); + this.schedulePoll(nextDelay); + } catch (e: any) { + this.logger.warn(`WeCom poll error: ${e.message}`); + this.schedulePoll(POLL_INTERVAL_IDLE_MS); + } + } + + private async syncMessages(): Promise { + const token = await this.getAccessToken(); + const path = `/cgi-bin/kf/sync_msg?access_token=${token}` + + `&cursor=${this.syncCursor}` + + `&open_kfid=${this.openKfId}` + + `&limit=100`; + + const res = await this.wecomGet<{ + errcode: number; + errmsg: string; + next_cursor: string; + has_more: number; + msg_list: Array<{ + msgid: string; + open_kfid: string; + external_userid: string; + send_time: number; + origin: number; // 3=user, 4=bot, 5=human agent + msgtype: string; + text?: { content: string }; + }>; + }>(path); + + if (res.errcode !== 0) { + this.logger.warn(`WeCom sync_msg error: ${res.errcode} ${res.errmsg}`); + return false; + } + + if (res.next_cursor) { + this.syncCursor = res.next_cursor; + } + + const userMessages = (res.msg_list ?? []).filter(m => m.origin === 3); + + for (const msg of userMessages) { + this.lastMsgTime = Date.now(); + const wecomMsg: WecomMsg = { + externalUserId: msg.external_userid, + msgId: msg.msgid, + text: msg.text?.content ?? '', + openKfId: msg.open_kfid, + }; + this.handleMessage(wecomMsg, msg.msgtype).catch((e: Error) => { + this.logger.error('WeCom handleMessage error:', e.message); + }); + } + + return res.has_more === 1; + } + + // ── Message handling ─────────────────────────────────────────────────────── + + private async handleMessage(msg: WecomMsg, msgType: string): Promise { + if (!msg.externalUserId || !msg.msgId) return; + + // Dedup + if (this.dedup.has(msg.msgId)) return; + this.dedup.set(msg.msgId, Date.now()); + + this.logger.log(`WeCom message: externalUserId=${msg.externalUserId} type=${msgType} msgId=${msg.msgId}`); + + // Non-text messages + if (msgType !== 'text' || !msg.text) { + await this.sendMessage(msg.externalUserId, '我目前只能处理文字消息~\n图片、语音请转换成文字后再发给我。'); + return; + } + + const text = msg.text.trim(); + if (!text) return; + + // Binding code check (6-char hex, case-insensitive) + const upperText = text.toUpperCase(); + const bindEntry = this.bindingCodes.get(upperText); + if (bindEntry) { + if (Date.now() > bindEntry.expiresAt) { + this.bindingCodes.delete(upperText); + await this.sendMessage(msg.externalUserId, '验证码已过期,请重新在 IT0 App 中生成新的验证码。'); + return; + } + this.bindingCodes.delete(upperText); + this.completeBinding(bindEntry.instanceId, msg.externalUserId).catch((e: Error) => + this.logger.error('WeCom completeBinding error:', e.message), + ); + return; + } + + // Rate limit + if (!this.rateAllow(msg.externalUserId)) { + await this.sendMessage(msg.externalUserId, '消息频率过高,请稍后再试(每分钟最多10条)。'); + return; + } + + // Queue + const pendingDepth = this.queueDepths.get(msg.externalUserId) ?? 0; + const accepted = this.enqueue(msg.externalUserId, () => this.routeToAgent(msg.externalUserId, text, msg)); + if (!accepted) { + await this.sendMessage(msg.externalUserId, '消息太多了,请稍后再说~(当前排队已满,最多5条)'); + } else if (pendingDepth > 0) { + await this.sendMessage(msg.externalUserId, `📋 消息已收到,前面还有 ${pendingDepth} 条在处理,请稍候~`); + } + } + + // ── Binding completion ───────────────────────────────────────────────────── + + private async completeBinding(instanceId: string, externalUserId: string): Promise { + try { + const instance = await this.instanceRepo.findById(instanceId); + if (!instance) { + await this.sendMessage(externalUserId, '绑定失败:智能体实例不存在,请重新操作。'); + return; + } + instance.wecomExternalUserId = externalUserId; + await this.instanceRepo.save(instance); + this.logger.log(`WeCom code-binding: instance ${instanceId} → externalUserId=${externalUserId}`); + await this.sendMessage( + externalUserId, + `✅ 绑定成功!\n\n你的小龙虾「${instance.name}」已与微信客服绑定。\n\n现在直接发消息给我,我会帮你转达给它!`, + ); + } catch (e: any) { + this.logger.error('WeCom completeBinding error:', e.message); + await this.sendMessage(externalUserId, '绑定时出现错误,请稍后重试。'); + } + } + + // ── Message routing ──────────────────────────────────────────────────────── + + private async routeToAgent(externalUserId: string, text: string, msg: WecomMsg): Promise { + const instance = await this.instanceRepo.findByWecomExternalUserId(externalUserId); + + if (!instance) { + this.logger.warn(`No WeCom binding for externalUserId=${externalUserId}`); + await this.sendMessage( + externalUserId, + '👋 你还没有绑定专属小龙虾。\n\n步骤:\n1. 打开 IT0 App\n2. 创建或选择一只小龙虾\n3. 点击「绑定微信」获取验证码\n4. 把验证码发给我就好了~', + ); + return; + } + + if (instance.status !== 'running') { + const statusHint: Record = { + stopped: `💤 小龙虾「${instance.name}」正在休息,请在 IT0 App 中点击启动后再来找我~`, + starting: `⏳ 小龙虾「${instance.name}」还在启动中,请等待约1分钟后重试。`, + error: `⚠️ 小龙虾「${instance.name}」遇到了问题,请在 IT0 App 中检查状态。`, + }; + await this.sendMessage( + externalUserId, + statusHint[instance.status] ?? `小龙虾「${instance.name}」当前无法接收指令(${instance.status}),请在 IT0 App 中处理。`, + ); + return; + } + + if (!instance.serverHost) { + this.logger.error(`Instance ${instance.id} has no serverHost`); + await this.sendMessage(externalUserId, '小龙虾配置异常(缺少服务器地址),请联系管理员。'); + return; + } + + const asyncBridgeUrl = `http://${instance.serverHost}:${instance.hostPort}/task-async`; + const callbackUrl = `${this.agentCallbackBaseUrl}/api/v1/agent/channels/wecom/bridge-callback`; + this.logger.log( + `WeCom routing msgId=${msg.msgId} → instance ${instance.id} (${instance.name}) @ ${asyncBridgeUrl}`, + ); + + // Immediate ack + await this.sendMessage(externalUserId, '🤔 小虾米正在思考,稍等~'); + + // Progress reminder after 25s + let thinkingTimer: NodeJS.Timeout | undefined; + thinkingTimer = setTimeout(() => { + this.sendMessage(externalUserId, '⏳ 还在努力想呢,这个任务有点复杂,请再等一下~').catch(() => {}); + }, THINKING_REMINDER_MS); + if (thinkingTimer.unref) thinkingTimer.unref(); + + let reply = ''; + try { + const callbackPromise = new Promise((resolve, reject) => { + const timer = setTimeout(() => { + this.pendingCallbacks.delete(msg.msgId); + const err: Error & { isTimeout?: boolean } = new Error( + `Async bridge callback timeout after ${CALLBACK_TIMEOUT_MS / 1000}s`, + ); + err.isTimeout = true; + reject(err); + }, CALLBACK_TIMEOUT_MS); + this.pendingCallbacks.set(msg.msgId, { resolve, reject, timer }); + }); + + const ack = await this.httpPostJson<{ ok: boolean; pending?: boolean; error?: string }>( + asyncBridgeUrl, + { + prompt: text, + sessionKey: `agent:main:wx-${externalUserId}`, + idempotencyKey: msg.msgId, + timeoutSeconds: TASK_TIMEOUT_S, + callbackUrl, + callbackData: { externalUserId, msgId: msg.msgId }, + }, + 15_000, + ); + + if (!ack.ok) { + this.pendingCallbacks.delete(msg.msgId); + const bridgeError = ack.error ?? ''; + if (bridgeError.includes('not connected') || bridgeError.includes('Gateway not connected')) { + reply = `🔄 小虾米正在重启,请等待约30秒后重试。`; + } else { + reply = `小虾米遇到了问题,请稍后重试。`; + } + this.logger.warn(`WeCom bridge rejected task for instance ${instance.id}: ${bridgeError}`); + } else { + reply = await callbackPromise; + this.logger.log(`WeCom bridge callback received, replyLen=${reply.length}`); + } + } catch (e: any) { + this.pendingCallbacks.delete(msg.msgId); + this.logger.error(`WeCom async bridge failed for instance ${instance.id}:`, e.message); + reply = this.buildErrorReply(e.message, instance.name, !!e.isTimeout); + } finally { + clearTimeout(thinkingTimer); + } + + await this.sendMessage(externalUserId, reply); + } + + private buildErrorReply(error: string, instanceName: string, isTimeout: boolean): string { + if (isTimeout) { + return ( + `⏱️ 这个任务花的时间太长了,小虾米超时了。\n\n` + + `建议:\n• 把任务拆成更小的步骤\n• 简化指令后重试\n• 如果问题复杂,可以分多轮来说` + ); + } + if (error.includes('disconnected') || error.includes('not connected')) { + return `🔄 「${instanceName}」与服务的连接中断了,请等待约30秒后重试。`; + } + if (error.includes('aborted')) return `⚠️ 任务被中止了,请重新发送。`; + if (error.includes('shutting down')) return `🔄 服务正在重启,请稍后重试。`; + return `😰 小虾米遇到了点问题,请稍后重试。如果持续出现,请联系管理员。`; + } + + // ── Send message ─────────────────────────────────────────────────────────── + + /** + * Send a text message to a WeChat user via 微信客服 send_msg API. + * Chunked if over WECOM_MAX_CHARS. + */ + private async sendMessage(externalUserId: string, content: string): Promise { + const safe = content.replace(/\s+at\s+\S+:\d+:\d+/g, '').trim() || '(空响应)'; + const chunks: string[] = []; + for (let i = 0; i < safe.length; i += WECOM_MAX_CHARS) { + chunks.push(safe.slice(i, i + WECOM_MAX_CHARS)); + } + for (const chunk of chunks) { + try { + const token = await this.getAccessToken(); + const res = await this.wecomPost<{ errcode: number; errmsg: string; msgid?: string }>( + `/cgi-bin/kf/send_msg?access_token=${token}`, + { + touser: externalUserId, + open_kfid: this.openKfId, + msgtype: 'text', + text: { content: chunk }, + }, + ); + if (res.errcode !== 0) { + this.logger.error(`WeCom send_msg error: ${res.errcode} ${res.errmsg} externalUserId=${externalUserId}`); + } + } catch (e: any) { + this.logger.error(`WeCom sendMessage failed for externalUserId=${externalUserId}:`, e.message); + } + } + } + + // ── Access token ─────────────────────────────────────────────────────────── + + private async getAccessToken(): Promise { + if (this.tokenCache && Date.now() < this.tokenExpiresAt - 300_000) { + return this.tokenCache; + } + const res = await this.wecomGet<{ + errcode: number; + errmsg: string; + access_token: string; + expires_in: number; + }>(`/cgi-bin/gettoken?corpid=${this.corpId}&corpsecret=${this.kfSecret}`); + + if (res.errcode !== 0) throw new Error(`WeCom gettoken error: ${res.errcode} ${res.errmsg}`); + this.tokenCache = res.access_token; + this.tokenExpiresAt = Date.now() + res.expires_in * 1000; + this.logger.log('WeCom access token refreshed'); + return this.tokenCache; + } + + // ── Per-user serial queue ────────────────────────────────────────────────── + + private enqueue(userId: string, task: () => Promise): boolean { + const depth = this.queueDepths.get(userId) ?? 0; + if (depth >= QUEUE_MAX_DEPTH) return false; + this.queueDepths.set(userId, depth + 1); + const tail = this.queueTails.get(userId) ?? Promise.resolve(); + const next = tail + .then(task) + .catch((e: Error) => { + try { this.logger.error(`Queue task error (${userId}):`, e.message); } catch { /* ignore */ } + }) + .finally(() => { + const remaining = (this.queueDepths.get(userId) ?? 1) - 1; + if (remaining <= 0) { + this.queueDepths.delete(userId); + this.queueTails.delete(userId); + } else { + this.queueDepths.set(userId, remaining); + } + }) + .catch(() => {}); + this.queueTails.set(userId, next); + return true; + } + + // ── Rate limiter ─────────────────────────────────────────────────────────── + + private rateAllow(userId: string): boolean { + const now = Date.now(); + const timestamps = (this.rateWindows.get(userId) ?? []).filter((t) => now - t < 60_000); + if (timestamps.length >= RATE_LIMIT_PER_MIN) { + this.rateWindows.set(userId, timestamps); + return false; + } + timestamps.push(now); + this.rateWindows.set(userId, timestamps); + return true; + } + + // ── Periodic cleanup ─────────────────────────────────────────────────────── + + private periodicCleanup(): void { + const now = Date.now(); + for (const [id, ts] of this.dedup) { + if (now - ts > DEDUP_TTL_MS) this.dedup.delete(id); + } + for (const [userId, timestamps] of this.rateWindows) { + const fresh = timestamps.filter((t) => now - t < 60_000); + if (fresh.length === 0) this.rateWindows.delete(userId); + else this.rateWindows.set(userId, fresh); + } + for (const [code, entry] of this.bindingCodes) { + if (now > entry.expiresAt) this.bindingCodes.delete(code); + } + } + + // ── HTTP helpers ─────────────────────────────────────────────────────────── + + private wecomGet(path: string): Promise { + return new Promise((resolve, reject) => { + const req = https.request( + { hostname: WECOM_API_HOST, path, method: 'GET', timeout: 10_000 }, + (res) => { + let data = ''; let totalBytes = 0; + res.on('data', (chunk: Buffer) => { + totalBytes += chunk.length; + if (totalBytes <= MAX_RESPONSE_BYTES) data += chunk.toString(); + }); + res.on('end', () => { + if (totalBytes > MAX_RESPONSE_BYTES) { reject(new Error('Response too large')); return; } + try { resolve(JSON.parse(data) as T); } catch (e) { reject(e); } + }); + }, + ); + req.on('timeout', () => { req.destroy(); reject(new Error('WeCom API GET timeout')); }); + req.on('error', reject); + req.end(); + }); + } + + private wecomPost(path: string, payload: object): Promise { + return new Promise((resolve, reject) => { + const body = JSON.stringify(payload); + const req = https.request( + { + hostname: WECOM_API_HOST, path, method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'Content-Length': Buffer.byteLength(body), + }, + timeout: 10_000, + }, + (res) => { + let data = ''; let totalBytes = 0; + res.on('data', (chunk: Buffer) => { + totalBytes += chunk.length; + if (totalBytes <= MAX_RESPONSE_BYTES) data += chunk.toString(); + }); + res.on('end', () => { + if (totalBytes > MAX_RESPONSE_BYTES) { reject(new Error('Response too large')); return; } + try { resolve(JSON.parse(data) as T); } catch (e) { reject(e); } + }); + }, + ); + req.on('timeout', () => { req.destroy(); reject(new Error('WeCom API POST timeout')); }); + req.on('error', reject); + req.write(body); + req.end(); + }); + } + + private httpPostJson(url: string, payload: object, timeoutMs = 35_000): Promise { + return new Promise((resolve, reject) => { + const parsed = new URL(url); + const body = JSON.stringify(payload); + const req = http.request( + { + hostname: parsed.hostname, + port: parseInt(parsed.port, 10), + path: parsed.pathname, + method: 'POST', + headers: { 'Content-Type': 'application/json', 'Content-Length': Buffer.byteLength(body) }, + timeout: timeoutMs, + }, + (res) => { + let data = ''; let totalBytes = 0; + res.on('data', (c: Buffer) => { + totalBytes += c.length; + if (totalBytes <= MAX_RESPONSE_BYTES) data += c.toString(); + }); + res.on('end', () => { + if (totalBytes > MAX_RESPONSE_BYTES) { reject(new Error('Bridge response too large')); return; } + try { resolve(JSON.parse(data) as T); } catch (e) { reject(e); } + }); + }, + ); + req.on('timeout', () => { req.destroy(); reject(new Error('Bridge request timeout')); }); + req.on('error', reject); + req.write(body); + req.end(); + }); + } +} diff --git a/packages/services/agent-service/src/interfaces/rest/controllers/agent-channel.controller.ts b/packages/services/agent-service/src/interfaces/rest/controllers/agent-channel.controller.ts index 93de40e..7335e6f 100644 --- a/packages/services/agent-service/src/interfaces/rest/controllers/agent-channel.controller.ts +++ b/packages/services/agent-service/src/interfaces/rest/controllers/agent-channel.controller.ts @@ -14,6 +14,7 @@ import { import { Response } from 'express'; import { DingTalkRouterService } from '../../../infrastructure/dingtalk/dingtalk-router.service'; import { FeishuRouterService } from '../../../infrastructure/feishu/feishu-router.service'; +import { WecomRouterService } from '../../../infrastructure/wecom/wecom-router.service'; import { AgentInstanceRepository } from '../../../infrastructure/repositories/agent-instance.repository'; @Controller('api/v1/agent/channels') @@ -23,6 +24,7 @@ export class AgentChannelController { constructor( private readonly dingTalkRouter: DingTalkRouterService, private readonly feishuRouter: FeishuRouterService, + private readonly wecomRouter: WecomRouterService, private readonly instanceRepo: AgentInstanceRepository, ) {} @@ -220,6 +222,64 @@ export class AgentChannelController { return { received: true }; } + // ── WeCom (微信客服) endpoints ───────────────────────────────────────────── + + /** Generate a 6-char binding code for WeChat Customer Service */ + @Post('wecom/bind/:instanceId') + async wecomGenerateBindCode(@Param('instanceId') instanceId: string) { + if (!this.wecomRouter.isEnabled()) { + throw new ServiceUnavailableException('WeCom integration not configured on this server'); + } + const inst = await this.instanceRepo.findById(instanceId); + if (!inst) throw new NotFoundException(`Instance ${instanceId} not found`); + const { code, expiresAt, kfUrl } = this.wecomRouter.generateBindingCode(instanceId); + return { code, expiresAt, kfUrl }; + } + + /** Check if the given instance is already bound to a WeChat user */ + @Get('wecom/status/:instanceId') + async wecomGetBindStatus(@Param('instanceId') instanceId: string) { + const inst = await this.instanceRepo.findById(instanceId); + if (!inst) throw new NotFoundException(`Instance ${instanceId} not found`); + return { bound: !!inst.wecomExternalUserId }; + } + + /** Unbind the WeChat user from the given agent instance */ + @Post('wecom/unbind/:instanceId') + async wecomUnbind(@Param('instanceId') instanceId: string) { + const inst = await this.instanceRepo.findById(instanceId); + if (!inst) throw new NotFoundException(`Instance ${instanceId} not found`); + inst.wecomExternalUserId = undefined; + await this.instanceRepo.save(inst); + return { unbound: true }; + } + + /** + * WeCom bridge async-task callback — called by OpenClaw bridge. + * Unauthenticated; internal service only. + */ + @Post('wecom/bridge-callback') + handleWecomBridgeCallback( + @Body() body: { + ok: boolean; + result?: string; + error?: string; + isTimeout?: boolean; + callbackData: { externalUserId: string; msgId: string }; + }, + ) { + const { ok, result, error, isTimeout, callbackData } = body; + const { externalUserId, msgId } = callbackData ?? {}; + this.logger.log( + `WeCom bridge callback: ok=${ok} msgId=${msgId} externalUserId=${externalUserId} ` + + `${ok ? `replyLen=${result?.length ?? 0}` : `error=${error} isTimeout=${isTimeout}`}`, + ); + this.wecomRouter.resolveCallbackReply( + msgId, ok, ok ? (result ?? '') : (error ?? '智能体没有返回内容。'), isTimeout, + ); + return { received: true }; + } + private htmlPage(title: string, message: string, success: boolean): string { const color = success ? '#22C55E' : '#EF4444'; const icon = success ? '✅' : '❌';