diff --git a/packages/services/agent-service/src/infrastructure/wecom/wecom-router.service.ts b/packages/services/agent-service/src/infrastructure/wecom/wecom-router.service.ts index 0c451ec..9fb345f 100644 --- a/packages/services/agent-service/src/infrastructure/wecom/wecom-router.service.ts +++ b/packages/services/agent-service/src/infrastructure/wecom/wecom-router.service.ts @@ -7,6 +7,7 @@ * 1. Poll WeCom sync_msg API every 2s (active) / 10s (idle) for incoming messages. * 2. Handle binding codes: user sends code → maps their external_userid to an instance. * 3. Route regular messages: looks up bound instance → POSTs to bridge /task-async → replies. + * 4. Handle enter_session events: send welcome/onboarding message on first contact. * * Required env vars: * IT0_WECOM_CORP_ID — Enterprise ID (from 企业微信管理后台 → 我的企业) @@ -20,10 +21,11 @@ * 4. This service matches code → saves wecomExternalUserId → replies "✅ 绑定成功" * * Robustness guarantees: - * - sync_msg cursor persisted in memory (resets on restart — WeCom deduplicates by msgid) + * - sync_msg cursor persisted to DB (survives restart — no duplicate on resume) * - Per-user serial queue prevents concurrent LLM calls for same user - * - Dedup map prevents duplicate message processing + * - Dedup map prevents duplicate message processing within session * - Rate limit: 10 messages/minute per user + * - send_msg retries once on error (2s delay) * - CALLBACK_TIMEOUT_MS safety valve if bridge crashes (180s) * - Periodic cleanup of in-memory maps (5-min interval) */ @@ -33,6 +35,7 @@ import { ConfigService } from '@nestjs/config'; import * as https from 'https'; import * as http from 'http'; import * as crypto from 'crypto'; +import { DataSource } from 'typeorm'; import { AgentInstanceRepository } from '../repositories/agent-instance.repository'; // ── Types ───────────────────────────────────────────────────────────────────── @@ -64,6 +67,8 @@ const MAX_RESPONSE_BYTES = 256 * 1024; const POLL_INTERVAL_ACTIVE_MS = 2_000; const POLL_INTERVAL_IDLE_MS = 10_000; const IDLE_THRESHOLD_MS = 30_000; +const SEND_RETRY_DELAY_MS = 2_000; +const STATE_KEY_CURSOR = 'wecom:sync_cursor'; const WECOM_API_HOST = 'qyapi.weixin.qq.com'; @@ -104,6 +109,7 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { constructor( private readonly configService: ConfigService, private readonly instanceRepo: AgentInstanceRepository, + private readonly dataSource: DataSource, ) { this.corpId = this.configService.get('IT0_WECOM_CORP_ID', ''); this.kfSecret = this.configService.get('IT0_WECOM_KF_SECRET', ''); @@ -113,11 +119,12 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { this.agentCallbackBaseUrl = this.configService.get('AGENT_SERVICE_PUBLIC_URL', ''); } - onModuleInit(): void { + async onModuleInit(): Promise { if (!this.enabled) { this.logger.warn('IT0_WECOM_CORP_ID/KF_SECRET/KF_OPEN_KFID not set — WeCom router disabled'); return; } + await this.initStateTable(); this.logger.log('WeCom router starting (sync_msg polling)...'); this.schedulePoll(POLL_INTERVAL_IDLE_MS); this.cleanupTimer = setInterval(() => this.periodicCleanup(), CLEANUP_INTERVAL_MS); @@ -144,6 +151,49 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { return this.kfUrl; } + // ── DB state table ───────────────────────────────────────────────────────── + + /** + * Ensure the service_state table exists and load persisted cursor. + * Uses a tiny generic KV table to avoid a dedicated migration file. + */ + private async initStateTable(): Promise { + try { + await this.dataSource.query(` + CREATE TABLE IF NOT EXISTS public.service_state ( + key VARCHAR(100) PRIMARY KEY, + value TEXT NOT NULL, + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ) + `); + const rows = await this.dataSource.query( + `SELECT value FROM public.service_state WHERE key = $1`, + [STATE_KEY_CURSOR], + ); + if (rows.length > 0 && rows[0].value) { + this.syncCursor = rows[0].value; + this.logger.log(`WeCom cursor restored from DB: ${this.syncCursor.slice(0, 12)}...`); + } else { + this.logger.log('WeCom cursor: starting fresh (no persisted cursor)'); + } + } catch (e: any) { + this.logger.warn(`WeCom state table init failed (will start without cursor): ${e.message}`); + } + } + + private async persistCursor(cursor: string): Promise { + try { + await this.dataSource.query( + `INSERT INTO public.service_state (key, value, updated_at) + VALUES ($1, $2, NOW()) + ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, updated_at = NOW()`, + [STATE_KEY_CURSOR, cursor], + ); + } catch (e: any) { + this.logger.warn(`WeCom cursor persist failed: ${e.message}`); + } + } + // ── Async bridge callback ────────────────────────────────────────────────── resolveCallbackReply(msgId: string, ok: boolean, content: string, isTimeout?: boolean): void { @@ -212,9 +262,10 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { open_kfid: string; external_userid: string; send_time: number; - origin: number; // 3=user, 4=bot, 5=human agent + origin: number; // 0=system/event, 3=user, 4=bot, 5=human agent msgtype: string; text?: { content: string }; + event?: { event_type: string }; }>; }>(path); @@ -223,12 +274,26 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { return false; } - if (res.next_cursor) { + if (res.next_cursor && res.next_cursor !== this.syncCursor) { this.syncCursor = res.next_cursor; + // Persist asynchronously — don't block the poll loop + this.persistCursor(this.syncCursor).catch(() => {}); } - const userMessages = (res.msg_list ?? []).filter(m => m.origin === 3); + const msgList = res.msg_list ?? []; + // Handle system events (enter_session etc.) — origin=0, msgtype="event" + const systemEvents = msgList.filter(m => m.msgtype === 'event' && m.event); + for (const ev of systemEvents) { + if (ev.event?.event_type === 'enter_session') { + this.handleEnterSession(ev.external_userid).catch((e: Error) => { + this.logger.error('WeCom handleEnterSession error:', e.message); + }); + } + } + + // Handle user messages — origin=3 + const userMessages = msgList.filter(m => m.origin === 3); for (const msg of userMessages) { this.lastMsgTime = Date.now(); const wecomMsg: WecomMsg = { @@ -245,6 +310,31 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { return res.has_more === 1; } + // ── Enter-session welcome ────────────────────────────────────────────────── + + private async handleEnterSession(externalUserId: string): Promise { + if (!externalUserId) return; + this.logger.log(`WeCom enter_session: externalUserId=${externalUserId}`); + + const instance = await this.instanceRepo.findByWecomExternalUserId(externalUserId); + if (instance) { + await this.sendMessage( + externalUserId, + `👋 欢迎回来!\n\n你的小龙虾「${instance.name}」已就绪,直接发消息给我,我会帮你转达~`, + ); + } else { + await this.sendMessage( + externalUserId, + `👋 你好!我是 IT0 的小龙虾助手~\n\n` + + `还没绑定专属小龙虾?按以下步骤操作:\n` + + `1. 打开 IT0 App\n` + + `2. 创建或选择一只小龙虾\n` + + `3. 点击「绑定微信」获取验证码\n` + + `4. 把验证码发给我即可完成绑定 🎉`, + ); + } + } + // ── Message handling ─────────────────────────────────────────────────────── private async handleMessage(msg: WecomMsg, msgType: string): Promise { @@ -438,7 +528,7 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { /** * Send a text message to a WeChat user via 微信客服 send_msg API. - * Chunked if over WECOM_MAX_CHARS. + * Chunked if over WECOM_MAX_CHARS. Retries once on error. */ private async sendMessage(externalUserId: string, content: string): Promise { const safe = content.replace(/\s+at\s+\S+:\d+:\d+/g, '').trim() || '(空响应)'; @@ -447,6 +537,12 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { chunks.push(safe.slice(i, i + WECOM_MAX_CHARS)); } for (const chunk of chunks) { + await this.sendChunkWithRetry(externalUserId, chunk); + } + } + + private async sendChunkWithRetry(externalUserId: string, chunk: string): Promise { + for (let attempt = 0; attempt < 2; attempt++) { try { const token = await this.getAccessToken(); const res = await this.wecomPost<{ errcode: number; errmsg: string; msgid?: string }>( @@ -458,13 +554,23 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy { text: { content: chunk }, }, ); - if (res.errcode !== 0) { - this.logger.error(`WeCom send_msg error: ${res.errcode} ${res.errmsg} externalUserId=${externalUserId}`); + if (res.errcode === 0) return; // success + this.logger.warn( + `WeCom send_msg attempt ${attempt + 1} failed: ${res.errcode} ${res.errmsg} externalUserId=${externalUserId}`, + ); + if (attempt === 0) { + await new Promise(r => setTimeout(r, SEND_RETRY_DELAY_MS)); } } catch (e: any) { - this.logger.error(`WeCom sendMessage failed for externalUserId=${externalUserId}:`, e.message); + this.logger.warn( + `WeCom sendMessage attempt ${attempt + 1} threw for externalUserId=${externalUserId}: ${e.message}`, + ); + if (attempt === 0) { + await new Promise(r => setTimeout(r, SEND_RETRY_DELAY_MS)); + } } } + this.logger.error(`WeCom sendMessage failed after 2 attempts for externalUserId=${externalUserId}`); } // ── Access token ───────────────────────────────────────────────────────────