it0/packages/services/agent-service/src/infrastructure/wecom/wecom-router.service.ts

634 lines
26 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* WeCom Customer Service Router (微信客服 / 企业微信客服)
*
* IT0 unified WeCom Customer Service bot — one central polling loop for all agent instances.
*
* Responsibilities:
* 1. Poll WeCom sync_msg API every 2s (active) / 10s (idle) for incoming messages.
* 2. Handle binding codes: user sends code → maps their external_userid to an instance.
* 3. Route regular messages: looks up bound instance → POSTs to bridge /task-async → replies.
*
* Required env vars:
* IT0_WECOM_CORP_ID — Enterprise ID (from 企业微信管理后台 → 我的企业)
* IT0_WECOM_KF_SECRET — Customer service app secret (客服应用的 Secret)
* IT0_WECOM_KF_OPEN_KFID — Customer service account ID (open_kfid以 wkf 开头)
*
* Binding flow (code):
* 1. Frontend calls POST /api/v1/agent/channels/wecom/bind/:instanceId
* 2. Backend returns { code: "A3K9F2", expiresAt, kfUrl }
* 3. User opens WeChat → opens 微信客服 link → sends "A3K9F2"
* 4. This service matches code → saves wecomExternalUserId → replies "✅ 绑定成功"
*
* Robustness guarantees:
* - sync_msg cursor persisted in memory (resets on restart — WeCom deduplicates by msgid)
* - Per-user serial queue prevents concurrent LLM calls for same user
* - Dedup map prevents duplicate message processing
* - Rate limit: 10 messages/minute per user
* - CALLBACK_TIMEOUT_MS safety valve if bridge crashes (180s)
* - Periodic cleanup of in-memory maps (5-min interval)
*/
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import * as https from 'https';
import * as http from 'http';
import * as crypto from 'crypto';
import { AgentInstanceRepository } from '../repositories/agent-instance.repository';
// ── Types ─────────────────────────────────────────────────────────────────────
interface WecomMsg {
externalUserId: string;
msgId: string;
text: string;
openKfId: string;
}
interface BindingEntry {
instanceId: string;
expiresAt: number;
}
// ── Constants ─────────────────────────────────────────────────────────────────
const WECOM_MAX_CHARS = 2048; // stay below WeChat's limit
const CODE_TTL_MS = 15 * 60 * 1000; // 15 min
const TASK_TIMEOUT_S = 120;
const CALLBACK_TIMEOUT_MS = 180_000;
const THINKING_REMINDER_MS = 25_000;
const DEDUP_TTL_MS = 10 * 60 * 1000;
const RATE_LIMIT_PER_MIN = 10;
const QUEUE_MAX_DEPTH = 5;
const CLEANUP_INTERVAL_MS = 5 * 60 * 1000;
const MAX_RESPONSE_BYTES = 256 * 1024;
const POLL_INTERVAL_ACTIVE_MS = 2_000;
const POLL_INTERVAL_IDLE_MS = 10_000;
const IDLE_THRESHOLD_MS = 30_000;
const WECOM_API_HOST = 'qyapi.weixin.qq.com';
// ── Service ───────────────────────────────────────────────────────────────────
@Injectable()
export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(WecomRouterService.name);
private readonly corpId: string;
private readonly kfSecret: string;
private readonly openKfId: string;
private readonly kfUrl: string;
private readonly enabled: boolean;
private readonly agentCallbackBaseUrl: string;
private stopping = false;
private pollTimer?: NodeJS.Timeout;
private cleanupTimer?: NodeJS.Timeout;
private syncCursor = '';
private lastMsgTime = 0;
// Token cache
private tokenCache = '';
private tokenExpiresAt = 0;
// State
private readonly bindingCodes = new Map<string, BindingEntry>();
private readonly dedup = new Map<string, number>();
private readonly rateWindows = new Map<string, number[]>();
private readonly queueTails = new Map<string, Promise<void>>();
private readonly queueDepths = new Map<string, number>();
private readonly pendingCallbacks = new Map<string, {
resolve: (reply: string) => void;
reject: (e: Error) => void;
timer: NodeJS.Timeout;
}>();
constructor(
private readonly configService: ConfigService,
private readonly instanceRepo: AgentInstanceRepository,
) {
this.corpId = this.configService.get<string>('IT0_WECOM_CORP_ID', '');
this.kfSecret = this.configService.get<string>('IT0_WECOM_KF_SECRET', '');
this.openKfId = this.configService.get<string>('IT0_WECOM_KF_OPEN_KFID', '');
this.kfUrl = this.configService.get<string>('IT0_WECOM_KF_URL', '');
this.enabled = !!(this.corpId && this.kfSecret && this.openKfId);
this.agentCallbackBaseUrl = this.configService.get<string>('AGENT_SERVICE_PUBLIC_URL', '');
}
onModuleInit(): void {
if (!this.enabled) {
this.logger.warn('IT0_WECOM_CORP_ID/KF_SECRET/KF_OPEN_KFID not set — WeCom router disabled');
return;
}
this.logger.log('WeCom router starting (sync_msg polling)...');
this.schedulePoll(POLL_INTERVAL_IDLE_MS);
this.cleanupTimer = setInterval(() => this.periodicCleanup(), CLEANUP_INTERVAL_MS);
if (this.cleanupTimer.unref) this.cleanupTimer.unref();
this.logger.log('WeCom router started');
}
onModuleDestroy(): void {
this.stopping = true;
clearTimeout(this.pollTimer);
clearInterval(this.cleanupTimer);
for (const [, cb] of this.pendingCallbacks) {
clearTimeout(cb.timer);
cb.reject(new Error('Service shutting down'));
}
this.pendingCallbacks.clear();
}
isEnabled(): boolean {
return this.enabled;
}
getKfUrl(): string {
return this.kfUrl;
}
// ── Async bridge callback ──────────────────────────────────────────────────
resolveCallbackReply(msgId: string, ok: boolean, content: string, isTimeout?: boolean): void {
const cb = this.pendingCallbacks.get(msgId);
if (!cb) {
this.logger.warn(`WeCom: Received callback for unknown msgId=${msgId} (already resolved or timed out)`);
return;
}
this.pendingCallbacks.delete(msgId);
clearTimeout(cb.timer);
if (ok) {
cb.resolve(content);
} else {
const err: Error & { isTimeout?: boolean } = new Error(content);
err.isTimeout = isTimeout ?? false;
cb.reject(err);
}
}
// ── Binding code API ───────────────────────────────────────────────────────
generateBindingCode(instanceId: string): { code: string; expiresAt: number; kfUrl: string } {
for (const [code, entry] of this.bindingCodes) {
if (entry.instanceId === instanceId) this.bindingCodes.delete(code);
}
const code = crypto.randomBytes(3).toString('hex').toUpperCase();
const expiresAt = Date.now() + CODE_TTL_MS;
this.bindingCodes.set(code, { instanceId, expiresAt });
return { code, expiresAt, kfUrl: this.kfUrl };
}
// ── Polling ────────────────────────────────────────────────────────────────
private schedulePoll(delayMs: number): void {
this.pollTimer = setTimeout(() => this.poll(), delayMs);
if (this.pollTimer.unref) this.pollTimer.unref();
}
private async poll(): Promise<void> {
if (this.stopping) return;
try {
const hasMore = await this.syncMessages();
const idle = Date.now() - this.lastMsgTime > IDLE_THRESHOLD_MS;
const nextDelay = hasMore ? 0 : (idle ? POLL_INTERVAL_IDLE_MS : POLL_INTERVAL_ACTIVE_MS);
this.schedulePoll(nextDelay);
} catch (e: any) {
this.logger.warn(`WeCom poll error: ${e.message}`);
this.schedulePoll(POLL_INTERVAL_IDLE_MS);
}
}
private async syncMessages(): Promise<boolean> {
const token = await this.getAccessToken();
const path = `/cgi-bin/kf/sync_msg?access_token=${token}` +
`&cursor=${this.syncCursor}` +
`&open_kfid=${this.openKfId}` +
`&limit=100`;
const res = await this.wecomGet<{
errcode: number;
errmsg: string;
next_cursor: string;
has_more: number;
msg_list: Array<{
msgid: string;
open_kfid: string;
external_userid: string;
send_time: number;
origin: number; // 3=user, 4=bot, 5=human agent
msgtype: string;
text?: { content: string };
}>;
}>(path);
if (res.errcode !== 0) {
this.logger.warn(`WeCom sync_msg error: ${res.errcode} ${res.errmsg}`);
return false;
}
if (res.next_cursor) {
this.syncCursor = res.next_cursor;
}
const userMessages = (res.msg_list ?? []).filter(m => m.origin === 3);
for (const msg of userMessages) {
this.lastMsgTime = Date.now();
const wecomMsg: WecomMsg = {
externalUserId: msg.external_userid,
msgId: msg.msgid,
text: msg.text?.content ?? '',
openKfId: msg.open_kfid,
};
this.handleMessage(wecomMsg, msg.msgtype).catch((e: Error) => {
this.logger.error('WeCom handleMessage error:', e.message);
});
}
return res.has_more === 1;
}
// ── Message handling ───────────────────────────────────────────────────────
private async handleMessage(msg: WecomMsg, msgType: string): Promise<void> {
if (!msg.externalUserId || !msg.msgId) return;
// Dedup
if (this.dedup.has(msg.msgId)) return;
this.dedup.set(msg.msgId, Date.now());
this.logger.log(`WeCom message: externalUserId=${msg.externalUserId} type=${msgType} msgId=${msg.msgId}`);
// Non-text messages
if (msgType !== 'text' || !msg.text) {
await this.sendMessage(msg.externalUserId, '我目前只能处理文字消息~\n图片、语音请转换成文字后再发给我。');
return;
}
const text = msg.text.trim();
if (!text) return;
// Binding code check (6-char hex, case-insensitive)
const upperText = text.toUpperCase();
const bindEntry = this.bindingCodes.get(upperText);
if (bindEntry) {
if (Date.now() > bindEntry.expiresAt) {
this.bindingCodes.delete(upperText);
await this.sendMessage(msg.externalUserId, '验证码已过期,请重新在 IT0 App 中生成新的验证码。');
return;
}
this.bindingCodes.delete(upperText);
this.completeBinding(bindEntry.instanceId, msg.externalUserId).catch((e: Error) =>
this.logger.error('WeCom completeBinding error:', e.message),
);
return;
}
// Rate limit
if (!this.rateAllow(msg.externalUserId)) {
await this.sendMessage(msg.externalUserId, '消息频率过高请稍后再试每分钟最多10条。');
return;
}
// Queue
const pendingDepth = this.queueDepths.get(msg.externalUserId) ?? 0;
const accepted = this.enqueue(msg.externalUserId, () => this.routeToAgent(msg.externalUserId, text, msg));
if (!accepted) {
await this.sendMessage(msg.externalUserId, '消息太多了请稍后再说当前排队已满最多5条');
} else if (pendingDepth > 0) {
await this.sendMessage(msg.externalUserId, `📋 消息已收到,前面还有 ${pendingDepth} 条在处理,请稍候~`);
}
}
// ── Binding completion ─────────────────────────────────────────────────────
private async completeBinding(instanceId: string, externalUserId: string): Promise<void> {
try {
const instance = await this.instanceRepo.findById(instanceId);
if (!instance) {
await this.sendMessage(externalUserId, '绑定失败:智能体实例不存在,请重新操作。');
return;
}
instance.wecomExternalUserId = externalUserId;
await this.instanceRepo.save(instance);
this.logger.log(`WeCom code-binding: instance ${instanceId} → externalUserId=${externalUserId}`);
await this.sendMessage(
externalUserId,
`✅ 绑定成功!\n\n你的小龙虾「${instance.name}」已与微信客服绑定。\n\n现在直接发消息给我我会帮你转达给它`,
);
} catch (e: any) {
this.logger.error('WeCom completeBinding error:', e.message);
await this.sendMessage(externalUserId, '绑定时出现错误,请稍后重试。');
}
}
// ── Message routing ────────────────────────────────────────────────────────
private async routeToAgent(externalUserId: string, text: string, msg: WecomMsg): Promise<void> {
const instance = await this.instanceRepo.findByWecomExternalUserId(externalUserId);
if (!instance) {
this.logger.warn(`No WeCom binding for externalUserId=${externalUserId}`);
await this.sendMessage(
externalUserId,
'👋 你还没有绑定专属小龙虾。\n\n步骤\n1. 打开 IT0 App\n2. 创建或选择一只小龙虾\n3. 点击「绑定微信」获取验证码\n4. 把验证码发给我就好了~',
);
return;
}
if (instance.status !== 'running') {
const statusHint: Record<string, string> = {
stopped: `💤 小龙虾「${instance.name}」正在休息,请在 IT0 App 中点击启动后再来找我~`,
starting: `⏳ 小龙虾「${instance.name}」还在启动中请等待约1分钟后重试。`,
error: `⚠️ 小龙虾「${instance.name}」遇到了问题,请在 IT0 App 中检查状态。`,
};
await this.sendMessage(
externalUserId,
statusHint[instance.status] ?? `小龙虾「${instance.name}」当前无法接收指令(${instance.status}),请在 IT0 App 中处理。`,
);
return;
}
if (!instance.serverHost) {
this.logger.error(`Instance ${instance.id} has no serverHost`);
await this.sendMessage(externalUserId, '小龙虾配置异常(缺少服务器地址),请联系管理员。');
return;
}
const asyncBridgeUrl = `http://${instance.serverHost}:${instance.hostPort}/task-async`;
const callbackUrl = `${this.agentCallbackBaseUrl}/api/v1/agent/channels/wecom/bridge-callback`;
this.logger.log(
`WeCom routing msgId=${msg.msgId} → instance ${instance.id} (${instance.name}) @ ${asyncBridgeUrl}`,
);
// Immediate ack
await this.sendMessage(externalUserId, '🤔 小虾米正在思考,稍等~');
// Progress reminder after 25s
let thinkingTimer: NodeJS.Timeout | undefined;
thinkingTimer = setTimeout(() => {
this.sendMessage(externalUserId, '⏳ 还在努力想呢,这个任务有点复杂,请再等一下~').catch(() => {});
}, THINKING_REMINDER_MS);
if (thinkingTimer.unref) thinkingTimer.unref();
let reply = '';
try {
const callbackPromise = new Promise<string>((resolve, reject) => {
const timer = setTimeout(() => {
this.pendingCallbacks.delete(msg.msgId);
const err: Error & { isTimeout?: boolean } = new Error(
`Async bridge callback timeout after ${CALLBACK_TIMEOUT_MS / 1000}s`,
);
err.isTimeout = true;
reject(err);
}, CALLBACK_TIMEOUT_MS);
this.pendingCallbacks.set(msg.msgId, { resolve, reject, timer });
});
const ack = await this.httpPostJson<{ ok: boolean; pending?: boolean; error?: string }>(
asyncBridgeUrl,
{
prompt: text,
sessionKey: `agent:main:wx-${externalUserId}`,
idempotencyKey: msg.msgId,
timeoutSeconds: TASK_TIMEOUT_S,
callbackUrl,
callbackData: { externalUserId, msgId: msg.msgId },
},
15_000,
);
if (!ack.ok) {
this.pendingCallbacks.delete(msg.msgId);
const bridgeError = ack.error ?? '';
if (bridgeError.includes('not connected') || bridgeError.includes('Gateway not connected')) {
reply = `🔄 小虾米正在重启请等待约30秒后重试。`;
} else {
reply = `小虾米遇到了问题,请稍后重试。`;
}
this.logger.warn(`WeCom bridge rejected task for instance ${instance.id}: ${bridgeError}`);
} else {
reply = await callbackPromise;
this.logger.log(`WeCom bridge callback received, replyLen=${reply.length}`);
}
} catch (e: any) {
this.pendingCallbacks.delete(msg.msgId);
this.logger.error(`WeCom async bridge failed for instance ${instance.id}:`, e.message);
reply = this.buildErrorReply(e.message, instance.name, !!e.isTimeout);
} finally {
clearTimeout(thinkingTimer);
}
await this.sendMessage(externalUserId, reply);
}
private buildErrorReply(error: string, instanceName: string, isTimeout: boolean): string {
if (isTimeout) {
return (
`⏱️ 这个任务花的时间太长了,小虾米超时了。\n\n` +
`建议:\n• 把任务拆成更小的步骤\n• 简化指令后重试\n• 如果问题复杂,可以分多轮来说`
);
}
if (error.includes('disconnected') || error.includes('not connected')) {
return `🔄 「${instanceName}」与服务的连接中断了请等待约30秒后重试。`;
}
if (error.includes('aborted')) return `⚠️ 任务被中止了,请重新发送。`;
if (error.includes('shutting down')) return `🔄 服务正在重启,请稍后重试。`;
return `😰 小虾米遇到了点问题,请稍后重试。如果持续出现,请联系管理员。`;
}
// ── Send message ───────────────────────────────────────────────────────────
/**
* Send a text message to a WeChat user via 微信客服 send_msg API.
* Chunked if over WECOM_MAX_CHARS.
*/
private async sendMessage(externalUserId: string, content: string): Promise<void> {
const safe = content.replace(/\s+at\s+\S+:\d+:\d+/g, '').trim() || '(空响应)';
const chunks: string[] = [];
for (let i = 0; i < safe.length; i += WECOM_MAX_CHARS) {
chunks.push(safe.slice(i, i + WECOM_MAX_CHARS));
}
for (const chunk of chunks) {
try {
const token = await this.getAccessToken();
const res = await this.wecomPost<{ errcode: number; errmsg: string; msgid?: string }>(
`/cgi-bin/kf/send_msg?access_token=${token}`,
{
touser: externalUserId,
open_kfid: this.openKfId,
msgtype: 'text',
text: { content: chunk },
},
);
if (res.errcode !== 0) {
this.logger.error(`WeCom send_msg error: ${res.errcode} ${res.errmsg} externalUserId=${externalUserId}`);
}
} catch (e: any) {
this.logger.error(`WeCom sendMessage failed for externalUserId=${externalUserId}:`, e.message);
}
}
}
// ── Access token ───────────────────────────────────────────────────────────
private async getAccessToken(): Promise<string> {
if (this.tokenCache && Date.now() < this.tokenExpiresAt - 300_000) {
return this.tokenCache;
}
const res = await this.wecomGet<{
errcode: number;
errmsg: string;
access_token: string;
expires_in: number;
}>(`/cgi-bin/gettoken?corpid=${this.corpId}&corpsecret=${this.kfSecret}`);
if (res.errcode !== 0) throw new Error(`WeCom gettoken error: ${res.errcode} ${res.errmsg}`);
this.tokenCache = res.access_token;
this.tokenExpiresAt = Date.now() + res.expires_in * 1000;
this.logger.log('WeCom access token refreshed');
return this.tokenCache;
}
// ── Per-user serial queue ──────────────────────────────────────────────────
private enqueue(userId: string, task: () => Promise<void>): boolean {
const depth = this.queueDepths.get(userId) ?? 0;
if (depth >= QUEUE_MAX_DEPTH) return false;
this.queueDepths.set(userId, depth + 1);
const tail = this.queueTails.get(userId) ?? Promise.resolve();
const next = tail
.then(task)
.catch((e: Error) => {
try { this.logger.error(`Queue task error (${userId}):`, e.message); } catch { /* ignore */ }
})
.finally(() => {
const remaining = (this.queueDepths.get(userId) ?? 1) - 1;
if (remaining <= 0) {
this.queueDepths.delete(userId);
this.queueTails.delete(userId);
} else {
this.queueDepths.set(userId, remaining);
}
})
.catch(() => {});
this.queueTails.set(userId, next);
return true;
}
// ── Rate limiter ───────────────────────────────────────────────────────────
private rateAllow(userId: string): boolean {
const now = Date.now();
const timestamps = (this.rateWindows.get(userId) ?? []).filter((t) => now - t < 60_000);
if (timestamps.length >= RATE_LIMIT_PER_MIN) {
this.rateWindows.set(userId, timestamps);
return false;
}
timestamps.push(now);
this.rateWindows.set(userId, timestamps);
return true;
}
// ── Periodic cleanup ───────────────────────────────────────────────────────
private periodicCleanup(): void {
const now = Date.now();
for (const [id, ts] of this.dedup) {
if (now - ts > DEDUP_TTL_MS) this.dedup.delete(id);
}
for (const [userId, timestamps] of this.rateWindows) {
const fresh = timestamps.filter((t) => now - t < 60_000);
if (fresh.length === 0) this.rateWindows.delete(userId);
else this.rateWindows.set(userId, fresh);
}
for (const [code, entry] of this.bindingCodes) {
if (now > entry.expiresAt) this.bindingCodes.delete(code);
}
}
// ── HTTP helpers ───────────────────────────────────────────────────────────
private wecomGet<T>(path: string): Promise<T> {
return new Promise((resolve, reject) => {
const req = https.request(
{ hostname: WECOM_API_HOST, path, method: 'GET', timeout: 10_000 },
(res) => {
let data = ''; let totalBytes = 0;
res.on('data', (chunk: Buffer) => {
totalBytes += chunk.length;
if (totalBytes <= MAX_RESPONSE_BYTES) data += chunk.toString();
});
res.on('end', () => {
if (totalBytes > MAX_RESPONSE_BYTES) { reject(new Error('Response too large')); return; }
try { resolve(JSON.parse(data) as T); } catch (e) { reject(e); }
});
},
);
req.on('timeout', () => { req.destroy(); reject(new Error('WeCom API GET timeout')); });
req.on('error', reject);
req.end();
});
}
private wecomPost<T>(path: string, payload: object): Promise<T> {
return new Promise((resolve, reject) => {
const body = JSON.stringify(payload);
const req = https.request(
{
hostname: WECOM_API_HOST, path, method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(body),
},
timeout: 10_000,
},
(res) => {
let data = ''; let totalBytes = 0;
res.on('data', (chunk: Buffer) => {
totalBytes += chunk.length;
if (totalBytes <= MAX_RESPONSE_BYTES) data += chunk.toString();
});
res.on('end', () => {
if (totalBytes > MAX_RESPONSE_BYTES) { reject(new Error('Response too large')); return; }
try { resolve(JSON.parse(data) as T); } catch (e) { reject(e); }
});
},
);
req.on('timeout', () => { req.destroy(); reject(new Error('WeCom API POST timeout')); });
req.on('error', reject);
req.write(body);
req.end();
});
}
private httpPostJson<T>(url: string, payload: object, timeoutMs = 35_000): Promise<T> {
return new Promise((resolve, reject) => {
const parsed = new URL(url);
const body = JSON.stringify(payload);
const req = http.request(
{
hostname: parsed.hostname,
port: parseInt(parsed.port, 10),
path: parsed.pathname,
method: 'POST',
headers: { 'Content-Type': 'application/json', 'Content-Length': Buffer.byteLength(body) },
timeout: timeoutMs,
},
(res) => {
let data = ''; let totalBytes = 0;
res.on('data', (c: Buffer) => {
totalBytes += c.length;
if (totalBytes <= MAX_RESPONSE_BYTES) data += c.toString();
});
res.on('end', () => {
if (totalBytes > MAX_RESPONSE_BYTES) { reject(new Error('Bridge response too large')); return; }
try { resolve(JSON.parse(data) as T); } catch (e) { reject(e); }
});
},
);
req.on('timeout', () => { req.destroy(); reject(new Error('Bridge request timeout')); });
req.on('error', reject);
req.write(body);
req.end();
});
}
}