fix(wecom): cursor persistence, send retry, enter_session welcome
Three robustness fixes for the WeCom Customer Service router: 1. **Cursor persistence** — sync_msg cursor now stored in public.service_state (auto-created via CREATE TABLE IF NOT EXISTS). Survives service restarts; no more duplicate message processing. 2. **send_msg retry** — sendChunkWithRetry() retries once after 2s on any API error (non-zero errcode or network failure). Lost replies due to transient WeChat API errors are now recovered. 3. **enter_session welcome** — WeCom fires an enter_session event (origin=0, msgtype=event) when a user opens the chat for the first time. Now handled: bound users get a welcome-back message, unbound users get step-by-step onboarding instructions. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
978c534a7e
commit
9e466549c0
|
|
@ -7,6 +7,7 @@
|
|||
* 1. Poll WeCom sync_msg API every 2s (active) / 10s (idle) for incoming messages.
|
||||
* 2. Handle binding codes: user sends code → maps their external_userid to an instance.
|
||||
* 3. Route regular messages: looks up bound instance → POSTs to bridge /task-async → replies.
|
||||
* 4. Handle enter_session events: send welcome/onboarding message on first contact.
|
||||
*
|
||||
* Required env vars:
|
||||
* IT0_WECOM_CORP_ID — Enterprise ID (from 企业微信管理后台 → 我的企业)
|
||||
|
|
@ -20,10 +21,11 @@
|
|||
* 4. This service matches code → saves wecomExternalUserId → replies "✅ 绑定成功"
|
||||
*
|
||||
* Robustness guarantees:
|
||||
* - sync_msg cursor persisted in memory (resets on restart — WeCom deduplicates by msgid)
|
||||
* - sync_msg cursor persisted to DB (survives restart — no duplicate on resume)
|
||||
* - Per-user serial queue prevents concurrent LLM calls for same user
|
||||
* - Dedup map prevents duplicate message processing
|
||||
* - Dedup map prevents duplicate message processing within session
|
||||
* - Rate limit: 10 messages/minute per user
|
||||
* - send_msg retries once on error (2s delay)
|
||||
* - CALLBACK_TIMEOUT_MS safety valve if bridge crashes (180s)
|
||||
* - Periodic cleanup of in-memory maps (5-min interval)
|
||||
*/
|
||||
|
|
@ -33,6 +35,7 @@ import { ConfigService } from '@nestjs/config';
|
|||
import * as https from 'https';
|
||||
import * as http from 'http';
|
||||
import * as crypto from 'crypto';
|
||||
import { DataSource } from 'typeorm';
|
||||
import { AgentInstanceRepository } from '../repositories/agent-instance.repository';
|
||||
|
||||
// ── Types ─────────────────────────────────────────────────────────────────────
|
||||
|
|
@ -64,6 +67,8 @@ const MAX_RESPONSE_BYTES = 256 * 1024;
|
|||
const POLL_INTERVAL_ACTIVE_MS = 2_000;
|
||||
const POLL_INTERVAL_IDLE_MS = 10_000;
|
||||
const IDLE_THRESHOLD_MS = 30_000;
|
||||
const SEND_RETRY_DELAY_MS = 2_000;
|
||||
const STATE_KEY_CURSOR = 'wecom:sync_cursor';
|
||||
|
||||
const WECOM_API_HOST = 'qyapi.weixin.qq.com';
|
||||
|
||||
|
|
@ -104,6 +109,7 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
|||
constructor(
|
||||
private readonly configService: ConfigService,
|
||||
private readonly instanceRepo: AgentInstanceRepository,
|
||||
private readonly dataSource: DataSource,
|
||||
) {
|
||||
this.corpId = this.configService.get<string>('IT0_WECOM_CORP_ID', '');
|
||||
this.kfSecret = this.configService.get<string>('IT0_WECOM_KF_SECRET', '');
|
||||
|
|
@ -113,11 +119,12 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
|||
this.agentCallbackBaseUrl = this.configService.get<string>('AGENT_SERVICE_PUBLIC_URL', '');
|
||||
}
|
||||
|
||||
onModuleInit(): void {
|
||||
async onModuleInit(): Promise<void> {
|
||||
if (!this.enabled) {
|
||||
this.logger.warn('IT0_WECOM_CORP_ID/KF_SECRET/KF_OPEN_KFID not set — WeCom router disabled');
|
||||
return;
|
||||
}
|
||||
await this.initStateTable();
|
||||
this.logger.log('WeCom router starting (sync_msg polling)...');
|
||||
this.schedulePoll(POLL_INTERVAL_IDLE_MS);
|
||||
this.cleanupTimer = setInterval(() => this.periodicCleanup(), CLEANUP_INTERVAL_MS);
|
||||
|
|
@ -144,6 +151,49 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
|||
return this.kfUrl;
|
||||
}
|
||||
|
||||
// ── DB state table ─────────────────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Ensure the service_state table exists and load persisted cursor.
|
||||
* Uses a tiny generic KV table to avoid a dedicated migration file.
|
||||
*/
|
||||
private async initStateTable(): Promise<void> {
|
||||
try {
|
||||
await this.dataSource.query(`
|
||||
CREATE TABLE IF NOT EXISTS public.service_state (
|
||||
key VARCHAR(100) PRIMARY KEY,
|
||||
value TEXT NOT NULL,
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
)
|
||||
`);
|
||||
const rows = await this.dataSource.query(
|
||||
`SELECT value FROM public.service_state WHERE key = $1`,
|
||||
[STATE_KEY_CURSOR],
|
||||
);
|
||||
if (rows.length > 0 && rows[0].value) {
|
||||
this.syncCursor = rows[0].value;
|
||||
this.logger.log(`WeCom cursor restored from DB: ${this.syncCursor.slice(0, 12)}...`);
|
||||
} else {
|
||||
this.logger.log('WeCom cursor: starting fresh (no persisted cursor)');
|
||||
}
|
||||
} catch (e: any) {
|
||||
this.logger.warn(`WeCom state table init failed (will start without cursor): ${e.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
private async persistCursor(cursor: string): Promise<void> {
|
||||
try {
|
||||
await this.dataSource.query(
|
||||
`INSERT INTO public.service_state (key, value, updated_at)
|
||||
VALUES ($1, $2, NOW())
|
||||
ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, updated_at = NOW()`,
|
||||
[STATE_KEY_CURSOR, cursor],
|
||||
);
|
||||
} catch (e: any) {
|
||||
this.logger.warn(`WeCom cursor persist failed: ${e.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
// ── Async bridge callback ──────────────────────────────────────────────────
|
||||
|
||||
resolveCallbackReply(msgId: string, ok: boolean, content: string, isTimeout?: boolean): void {
|
||||
|
|
@ -212,9 +262,10 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
|||
open_kfid: string;
|
||||
external_userid: string;
|
||||
send_time: number;
|
||||
origin: number; // 3=user, 4=bot, 5=human agent
|
||||
origin: number; // 0=system/event, 3=user, 4=bot, 5=human agent
|
||||
msgtype: string;
|
||||
text?: { content: string };
|
||||
event?: { event_type: string };
|
||||
}>;
|
||||
}>(path);
|
||||
|
||||
|
|
@ -223,12 +274,26 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
|||
return false;
|
||||
}
|
||||
|
||||
if (res.next_cursor) {
|
||||
if (res.next_cursor && res.next_cursor !== this.syncCursor) {
|
||||
this.syncCursor = res.next_cursor;
|
||||
// Persist asynchronously — don't block the poll loop
|
||||
this.persistCursor(this.syncCursor).catch(() => {});
|
||||
}
|
||||
|
||||
const userMessages = (res.msg_list ?? []).filter(m => m.origin === 3);
|
||||
const msgList = res.msg_list ?? [];
|
||||
|
||||
// Handle system events (enter_session etc.) — origin=0, msgtype="event"
|
||||
const systemEvents = msgList.filter(m => m.msgtype === 'event' && m.event);
|
||||
for (const ev of systemEvents) {
|
||||
if (ev.event?.event_type === 'enter_session') {
|
||||
this.handleEnterSession(ev.external_userid).catch((e: Error) => {
|
||||
this.logger.error('WeCom handleEnterSession error:', e.message);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Handle user messages — origin=3
|
||||
const userMessages = msgList.filter(m => m.origin === 3);
|
||||
for (const msg of userMessages) {
|
||||
this.lastMsgTime = Date.now();
|
||||
const wecomMsg: WecomMsg = {
|
||||
|
|
@ -245,6 +310,31 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
|||
return res.has_more === 1;
|
||||
}
|
||||
|
||||
// ── Enter-session welcome ──────────────────────────────────────────────────
|
||||
|
||||
private async handleEnterSession(externalUserId: string): Promise<void> {
|
||||
if (!externalUserId) return;
|
||||
this.logger.log(`WeCom enter_session: externalUserId=${externalUserId}`);
|
||||
|
||||
const instance = await this.instanceRepo.findByWecomExternalUserId(externalUserId);
|
||||
if (instance) {
|
||||
await this.sendMessage(
|
||||
externalUserId,
|
||||
`👋 欢迎回来!\n\n你的小龙虾「${instance.name}」已就绪,直接发消息给我,我会帮你转达~`,
|
||||
);
|
||||
} else {
|
||||
await this.sendMessage(
|
||||
externalUserId,
|
||||
`👋 你好!我是 IT0 的小龙虾助手~\n\n` +
|
||||
`还没绑定专属小龙虾?按以下步骤操作:\n` +
|
||||
`1. 打开 IT0 App\n` +
|
||||
`2. 创建或选择一只小龙虾\n` +
|
||||
`3. 点击「绑定微信」获取验证码\n` +
|
||||
`4. 把验证码发给我即可完成绑定 🎉`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// ── Message handling ───────────────────────────────────────────────────────
|
||||
|
||||
private async handleMessage(msg: WecomMsg, msgType: string): Promise<void> {
|
||||
|
|
@ -438,7 +528,7 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
|||
|
||||
/**
|
||||
* Send a text message to a WeChat user via 微信客服 send_msg API.
|
||||
* Chunked if over WECOM_MAX_CHARS.
|
||||
* Chunked if over WECOM_MAX_CHARS. Retries once on error.
|
||||
*/
|
||||
private async sendMessage(externalUserId: string, content: string): Promise<void> {
|
||||
const safe = content.replace(/\s+at\s+\S+:\d+:\d+/g, '').trim() || '(空响应)';
|
||||
|
|
@ -447,6 +537,12 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
|||
chunks.push(safe.slice(i, i + WECOM_MAX_CHARS));
|
||||
}
|
||||
for (const chunk of chunks) {
|
||||
await this.sendChunkWithRetry(externalUserId, chunk);
|
||||
}
|
||||
}
|
||||
|
||||
private async sendChunkWithRetry(externalUserId: string, chunk: string): Promise<void> {
|
||||
for (let attempt = 0; attempt < 2; attempt++) {
|
||||
try {
|
||||
const token = await this.getAccessToken();
|
||||
const res = await this.wecomPost<{ errcode: number; errmsg: string; msgid?: string }>(
|
||||
|
|
@ -458,14 +554,24 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
|||
text: { content: chunk },
|
||||
},
|
||||
);
|
||||
if (res.errcode !== 0) {
|
||||
this.logger.error(`WeCom send_msg error: ${res.errcode} ${res.errmsg} externalUserId=${externalUserId}`);
|
||||
if (res.errcode === 0) return; // success
|
||||
this.logger.warn(
|
||||
`WeCom send_msg attempt ${attempt + 1} failed: ${res.errcode} ${res.errmsg} externalUserId=${externalUserId}`,
|
||||
);
|
||||
if (attempt === 0) {
|
||||
await new Promise(r => setTimeout(r, SEND_RETRY_DELAY_MS));
|
||||
}
|
||||
} catch (e: any) {
|
||||
this.logger.error(`WeCom sendMessage failed for externalUserId=${externalUserId}:`, e.message);
|
||||
this.logger.warn(
|
||||
`WeCom sendMessage attempt ${attempt + 1} threw for externalUserId=${externalUserId}: ${e.message}`,
|
||||
);
|
||||
if (attempt === 0) {
|
||||
await new Promise(r => setTimeout(r, SEND_RETRY_DELAY_MS));
|
||||
}
|
||||
}
|
||||
}
|
||||
this.logger.error(`WeCom sendMessage failed after 2 attempts for externalUserId=${externalUserId}`);
|
||||
}
|
||||
|
||||
// ── Access token ───────────────────────────────────────────────────────────
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue