fix(wecom): token mutex, leader lease, backoff, watchdog
Four additional robustness fixes: 1. **Token refresh mutex** — tokenRefreshPromise deduplicates concurrent refresh calls. All callers share one in-flight HTTP request instead of each firing their own, eliminating the race condition. 2. **Distributed leader lease** — service_state table used for a TTL-based leader election (LEADER_LEASE_TTL_S=90s). Only one agent-service instance polls at a time; others skip until the lease expires. Lease auto-released on graceful shutdown. 3. **Exponential backoff** — consecutive poll errors increment a counter; next delay = min(10s × 2^(n-1), 5min). Prevents log spam and reduces load during sustained WeCom API outages. Counter resets on any successful poll. 4. **Watchdog timer** — setInterval every 2min checks lastPollAt. If poll loop has been silent for >5min, clears the timer and reschedules immediately, recovering from any silent crash. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
9e466549c0
commit
0d5441f720
|
|
@ -14,19 +14,13 @@
|
||||||
* IT0_WECOM_KF_SECRET — Customer service app secret (客服应用的 Secret)
|
* IT0_WECOM_KF_SECRET — Customer service app secret (客服应用的 Secret)
|
||||||
* IT0_WECOM_KF_OPEN_KFID — Customer service account ID (open_kfid,以 wkf 开头)
|
* IT0_WECOM_KF_OPEN_KFID — Customer service account ID (open_kfid,以 wkf 开头)
|
||||||
*
|
*
|
||||||
* Binding flow (code):
|
|
||||||
* 1. Frontend calls POST /api/v1/agent/channels/wecom/bind/:instanceId
|
|
||||||
* 2. Backend returns { code: "A3K9F2", expiresAt, kfUrl }
|
|
||||||
* 3. User opens WeChat → opens 微信客服 link → sends "A3K9F2"
|
|
||||||
* 4. This service matches code → saves wecomExternalUserId → replies "✅ 绑定成功"
|
|
||||||
*
|
|
||||||
* Robustness guarantees:
|
* Robustness guarantees:
|
||||||
* - sync_msg cursor persisted to DB (survives restart — no duplicate on resume)
|
* - sync_msg cursor persisted to DB (survives restart)
|
||||||
* - Per-user serial queue prevents concurrent LLM calls for same user
|
* - Token refresh mutex — concurrent callers share one in-flight refresh
|
||||||
* - Dedup map prevents duplicate message processing within session
|
* - Distributed leader lease — only one agent-service instance polls at a time
|
||||||
* - Rate limit: 10 messages/minute per user
|
* - Exponential backoff on consecutive poll errors (up to 5 min)
|
||||||
* - send_msg retries once on error (2s delay)
|
* - Watchdog timer restarts the poll loop if it silently dies
|
||||||
* - CALLBACK_TIMEOUT_MS safety valve if bridge crashes (180s)
|
* - Per-user serial queue, dedup, rate limit, send retry
|
||||||
* - Periodic cleanup of in-memory maps (5-min interval)
|
* - Periodic cleanup of in-memory maps (5-min interval)
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
|
@ -54,8 +48,8 @@ interface BindingEntry {
|
||||||
|
|
||||||
// ── Constants ─────────────────────────────────────────────────────────────────
|
// ── Constants ─────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
const WECOM_MAX_CHARS = 2048; // stay below WeChat's limit
|
const WECOM_MAX_CHARS = 2048;
|
||||||
const CODE_TTL_MS = 15 * 60 * 1000; // 15 min
|
const CODE_TTL_MS = 15 * 60 * 1000;
|
||||||
const TASK_TIMEOUT_S = 120;
|
const TASK_TIMEOUT_S = 120;
|
||||||
const CALLBACK_TIMEOUT_MS = 180_000;
|
const CALLBACK_TIMEOUT_MS = 180_000;
|
||||||
const THINKING_REMINDER_MS = 25_000;
|
const THINKING_REMINDER_MS = 25_000;
|
||||||
|
|
@ -68,6 +62,21 @@ const POLL_INTERVAL_ACTIVE_MS = 2_000;
|
||||||
const POLL_INTERVAL_IDLE_MS = 10_000;
|
const POLL_INTERVAL_IDLE_MS = 10_000;
|
||||||
const IDLE_THRESHOLD_MS = 30_000;
|
const IDLE_THRESHOLD_MS = 30_000;
|
||||||
const SEND_RETRY_DELAY_MS = 2_000;
|
const SEND_RETRY_DELAY_MS = 2_000;
|
||||||
|
|
||||||
|
// Circuit breaker
|
||||||
|
const BACKOFF_BASE_MS = POLL_INTERVAL_IDLE_MS;
|
||||||
|
const BACKOFF_MAX_MS = 5 * 60 * 1000; // 5 min cap
|
||||||
|
const BACKOFF_MULTIPLIER = 2;
|
||||||
|
|
||||||
|
// Distributed leader lease
|
||||||
|
const LEADER_LEASE_KEY = 'wecom:poll-leader';
|
||||||
|
const LEADER_LEASE_TTL_S = 90; // another instance takes over after 90s
|
||||||
|
|
||||||
|
// Watchdog: restart poll loop if last success > threshold
|
||||||
|
const WATCHDOG_INTERVAL_MS = 2 * 60 * 1000;
|
||||||
|
const WATCHDOG_THRESHOLD_MS = 5 * 60 * 1000;
|
||||||
|
|
||||||
|
// DB cursor key
|
||||||
const STATE_KEY_CURSOR = 'wecom:sync_cursor';
|
const STATE_KEY_CURSOR = 'wecom:sync_cursor';
|
||||||
|
|
||||||
const WECOM_API_HOST = 'qyapi.weixin.qq.com';
|
const WECOM_API_HOST = 'qyapi.weixin.qq.com';
|
||||||
|
|
@ -83,18 +92,23 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
||||||
private readonly kfUrl: string;
|
private readonly kfUrl: string;
|
||||||
private readonly enabled: boolean;
|
private readonly enabled: boolean;
|
||||||
private readonly agentCallbackBaseUrl: string;
|
private readonly agentCallbackBaseUrl: string;
|
||||||
|
private readonly nodeId = crypto.randomUUID(); // unique per process instance
|
||||||
|
|
||||||
private stopping = false;
|
private stopping = false;
|
||||||
private pollTimer?: NodeJS.Timeout;
|
private pollTimer?: NodeJS.Timeout;
|
||||||
|
private watchdogTimer?: NodeJS.Timeout;
|
||||||
private cleanupTimer?: NodeJS.Timeout;
|
private cleanupTimer?: NodeJS.Timeout;
|
||||||
private syncCursor = '';
|
private syncCursor = '';
|
||||||
private lastMsgTime = 0;
|
private lastMsgTime = 0;
|
||||||
|
private lastPollAt = 0; // for watchdog
|
||||||
|
private consecutiveErrors = 0; // for exponential backoff
|
||||||
|
|
||||||
// Token cache
|
// Token cache + mutex
|
||||||
private tokenCache = '';
|
private tokenCache = '';
|
||||||
private tokenExpiresAt = 0;
|
private tokenExpiresAt = 0;
|
||||||
|
private tokenRefreshPromise?: Promise<string>;
|
||||||
|
|
||||||
// State
|
// State maps
|
||||||
private readonly bindingCodes = new Map<string, BindingEntry>();
|
private readonly bindingCodes = new Map<string, BindingEntry>();
|
||||||
private readonly dedup = new Map<string, number>();
|
private readonly dedup = new Map<string, number>();
|
||||||
private readonly rateWindows = new Map<string, number[]>();
|
private readonly rateWindows = new Map<string, number[]>();
|
||||||
|
|
@ -125,8 +139,9 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
await this.initStateTable();
|
await this.initStateTable();
|
||||||
this.logger.log('WeCom router starting (sync_msg polling)...');
|
this.logger.log(`WeCom router starting (nodeId=${this.nodeId.slice(0, 8)}...)`)
|
||||||
this.schedulePoll(POLL_INTERVAL_IDLE_MS);
|
this.schedulePoll(POLL_INTERVAL_IDLE_MS);
|
||||||
|
this.scheduleWatchdog();
|
||||||
this.cleanupTimer = setInterval(() => this.periodicCleanup(), CLEANUP_INTERVAL_MS);
|
this.cleanupTimer = setInterval(() => this.periodicCleanup(), CLEANUP_INTERVAL_MS);
|
||||||
if (this.cleanupTimer.unref) this.cleanupTimer.unref();
|
if (this.cleanupTimer.unref) this.cleanupTimer.unref();
|
||||||
this.logger.log('WeCom router started');
|
this.logger.log('WeCom router started');
|
||||||
|
|
@ -135,7 +150,10 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
||||||
onModuleDestroy(): void {
|
onModuleDestroy(): void {
|
||||||
this.stopping = true;
|
this.stopping = true;
|
||||||
clearTimeout(this.pollTimer);
|
clearTimeout(this.pollTimer);
|
||||||
|
clearTimeout(this.watchdogTimer);
|
||||||
clearInterval(this.cleanupTimer);
|
clearInterval(this.cleanupTimer);
|
||||||
|
// Release leader lease asynchronously (best effort)
|
||||||
|
this.releaseLeaderLease().catch(() => {});
|
||||||
for (const [, cb] of this.pendingCallbacks) {
|
for (const [, cb] of this.pendingCallbacks) {
|
||||||
clearTimeout(cb.timer);
|
clearTimeout(cb.timer);
|
||||||
cb.reject(new Error('Service shutting down'));
|
cb.reject(new Error('Service shutting down'));
|
||||||
|
|
@ -143,20 +161,11 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
||||||
this.pendingCallbacks.clear();
|
this.pendingCallbacks.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
isEnabled(): boolean {
|
isEnabled(): boolean { return this.enabled; }
|
||||||
return this.enabled;
|
getKfUrl(): string { return this.kfUrl; }
|
||||||
}
|
|
||||||
|
|
||||||
getKfUrl(): string {
|
|
||||||
return this.kfUrl;
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── DB state table ─────────────────────────────────────────────────────────
|
// ── DB state table ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
/**
|
|
||||||
* Ensure the service_state table exists and load persisted cursor.
|
|
||||||
* Uses a tiny generic KV table to avoid a dedicated migration file.
|
|
||||||
*/
|
|
||||||
private async initStateTable(): Promise<void> {
|
private async initStateTable(): Promise<void> {
|
||||||
try {
|
try {
|
||||||
await this.dataSource.query(`
|
await this.dataSource.query(`
|
||||||
|
|
@ -172,20 +181,19 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
||||||
);
|
);
|
||||||
if (rows.length > 0 && rows[0].value) {
|
if (rows.length > 0 && rows[0].value) {
|
||||||
this.syncCursor = rows[0].value;
|
this.syncCursor = rows[0].value;
|
||||||
this.logger.log(`WeCom cursor restored from DB: ${this.syncCursor.slice(0, 12)}...`);
|
this.logger.log(`WeCom cursor restored: ${this.syncCursor.slice(0, 12)}...`);
|
||||||
} else {
|
} else {
|
||||||
this.logger.log('WeCom cursor: starting fresh (no persisted cursor)');
|
this.logger.log('WeCom cursor: starting fresh');
|
||||||
}
|
}
|
||||||
} catch (e: any) {
|
} catch (e: any) {
|
||||||
this.logger.warn(`WeCom state table init failed (will start without cursor): ${e.message}`);
|
this.logger.warn(`WeCom state table init failed (continuing without cursor): ${e.message}`);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async persistCursor(cursor: string): Promise<void> {
|
private async persistCursor(cursor: string): Promise<void> {
|
||||||
try {
|
try {
|
||||||
await this.dataSource.query(
|
await this.dataSource.query(
|
||||||
`INSERT INTO public.service_state (key, value, updated_at)
|
`INSERT INTO public.service_state (key, value, updated_at) VALUES ($1, $2, NOW())
|
||||||
VALUES ($1, $2, NOW())
|
|
||||||
ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, updated_at = NOW()`,
|
ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, updated_at = NOW()`,
|
||||||
[STATE_KEY_CURSOR, cursor],
|
[STATE_KEY_CURSOR, cursor],
|
||||||
);
|
);
|
||||||
|
|
@ -194,12 +202,80 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── Distributed leader lease ───────────────────────────────────────────────
|
||||||
|
//
|
||||||
|
// Uses service_state table for a TTL-based leader election.
|
||||||
|
// Only the leader polls; others skip until the lease expires (90s).
|
||||||
|
// Value is JSON: { nodeId, pid }
|
||||||
|
|
||||||
|
private leaseValue(): string {
|
||||||
|
return JSON.stringify({ nodeId: this.nodeId, pid: process.pid });
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Try to claim or renew the leader lease.
|
||||||
|
* Returns true if this instance is now the leader.
|
||||||
|
*/
|
||||||
|
private async tryClaimLeaderLease(): Promise<boolean> {
|
||||||
|
try {
|
||||||
|
// Upsert: claim the lease if:
|
||||||
|
// (a) no lease exists yet (INSERT path), or
|
||||||
|
// (b) we already hold it (nodeId matches), or
|
||||||
|
// (c) existing lease has expired (updated_at older than TTL)
|
||||||
|
const result = await this.dataSource.query(
|
||||||
|
`INSERT INTO public.service_state (key, value, updated_at) VALUES ($1, $2, NOW())
|
||||||
|
ON CONFLICT (key) DO UPDATE
|
||||||
|
SET value = EXCLUDED.value, updated_at = NOW()
|
||||||
|
WHERE
|
||||||
|
(service_state.value)::json->>'nodeId' = $3
|
||||||
|
OR service_state.updated_at < NOW() - ($4 || ' seconds')::INTERVAL
|
||||||
|
RETURNING key`,
|
||||||
|
[LEADER_LEASE_KEY, this.leaseValue(), this.nodeId, LEADER_LEASE_TTL_S],
|
||||||
|
);
|
||||||
|
return result.length > 0;
|
||||||
|
} catch (e: any) {
|
||||||
|
// On DB error, fall through and allow polling (fail open — better than stopping)
|
||||||
|
this.logger.warn(`WeCom leader lease check failed (continuing): ${e.message}`);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async releaseLeaderLease(): Promise<void> {
|
||||||
|
try {
|
||||||
|
await this.dataSource.query(
|
||||||
|
`DELETE FROM public.service_state
|
||||||
|
WHERE key = $1 AND (value)::json->>'nodeId' = $2`,
|
||||||
|
[LEADER_LEASE_KEY, this.nodeId],
|
||||||
|
);
|
||||||
|
} catch { /* ignore on shutdown */ }
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Watchdog ───────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
private scheduleWatchdog(): void {
|
||||||
|
this.watchdogTimer = setInterval(() => this.watchdog(), WATCHDOG_INTERVAL_MS);
|
||||||
|
if (this.watchdogTimer.unref) this.watchdogTimer.unref();
|
||||||
|
}
|
||||||
|
|
||||||
|
private watchdog(): void {
|
||||||
|
if (this.stopping) return;
|
||||||
|
const staleSince = Date.now() - this.lastPollAt;
|
||||||
|
if (this.lastPollAt > 0 && staleSince > WATCHDOG_THRESHOLD_MS) {
|
||||||
|
this.logger.warn(
|
||||||
|
`WeCom watchdog: poll loop silent for ${Math.round(staleSince / 1000)}s — restarting`,
|
||||||
|
);
|
||||||
|
clearTimeout(this.pollTimer);
|
||||||
|
this.consecutiveErrors = 0;
|
||||||
|
this.schedulePoll(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ── Async bridge callback ──────────────────────────────────────────────────
|
// ── Async bridge callback ──────────────────────────────────────────────────
|
||||||
|
|
||||||
resolveCallbackReply(msgId: string, ok: boolean, content: string, isTimeout?: boolean): void {
|
resolveCallbackReply(msgId: string, ok: boolean, content: string, isTimeout?: boolean): void {
|
||||||
const cb = this.pendingCallbacks.get(msgId);
|
const cb = this.pendingCallbacks.get(msgId);
|
||||||
if (!cb) {
|
if (!cb) {
|
||||||
this.logger.warn(`WeCom: Received callback for unknown msgId=${msgId} (already resolved or timed out)`);
|
this.logger.warn(`WeCom: callback for unknown msgId=${msgId} (already resolved or timed out)`);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
this.pendingCallbacks.delete(msgId);
|
this.pendingCallbacks.delete(msgId);
|
||||||
|
|
@ -232,16 +308,39 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
||||||
if (this.pollTimer.unref) this.pollTimer.unref();
|
if (this.pollTimer.unref) this.pollTimer.unref();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private computeBackoff(): number {
|
||||||
|
if (this.consecutiveErrors === 0) return 0; // caller will pick active/idle
|
||||||
|
const backoff = Math.min(
|
||||||
|
BACKOFF_BASE_MS * Math.pow(BACKOFF_MULTIPLIER, this.consecutiveErrors - 1),
|
||||||
|
BACKOFF_MAX_MS,
|
||||||
|
);
|
||||||
|
return backoff;
|
||||||
|
}
|
||||||
|
|
||||||
private async poll(): Promise<void> {
|
private async poll(): Promise<void> {
|
||||||
if (this.stopping) return;
|
if (this.stopping) return;
|
||||||
|
|
||||||
|
// Leader election: skip if another instance holds the lease
|
||||||
|
const isLeader = await this.tryClaimLeaderLease();
|
||||||
|
if (!isLeader) {
|
||||||
|
this.schedulePoll(POLL_INTERVAL_IDLE_MS);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.lastPollAt = Date.now();
|
||||||
try {
|
try {
|
||||||
const hasMore = await this.syncMessages();
|
const hasMore = await this.syncMessages();
|
||||||
|
this.consecutiveErrors = 0; // reset backoff on success
|
||||||
const idle = Date.now() - this.lastMsgTime > IDLE_THRESHOLD_MS;
|
const idle = Date.now() - this.lastMsgTime > IDLE_THRESHOLD_MS;
|
||||||
const nextDelay = hasMore ? 0 : (idle ? POLL_INTERVAL_IDLE_MS : POLL_INTERVAL_ACTIVE_MS);
|
const nextDelay = hasMore ? 0 : (idle ? POLL_INTERVAL_IDLE_MS : POLL_INTERVAL_ACTIVE_MS);
|
||||||
this.schedulePoll(nextDelay);
|
this.schedulePoll(nextDelay);
|
||||||
} catch (e: any) {
|
} catch (e: any) {
|
||||||
this.logger.warn(`WeCom poll error: ${e.message}`);
|
this.consecutiveErrors++;
|
||||||
this.schedulePoll(POLL_INTERVAL_IDLE_MS);
|
const backoff = this.computeBackoff();
|
||||||
|
this.logger.warn(
|
||||||
|
`WeCom poll error (attempt ${this.consecutiveErrors}), backoff=${backoff}ms: ${e.message}`,
|
||||||
|
);
|
||||||
|
this.schedulePoll(backoff);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -270,21 +369,18 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
||||||
}>(path);
|
}>(path);
|
||||||
|
|
||||||
if (res.errcode !== 0) {
|
if (res.errcode !== 0) {
|
||||||
this.logger.warn(`WeCom sync_msg error: ${res.errcode} ${res.errmsg}`);
|
throw new Error(`sync_msg API error: ${res.errcode} ${res.errmsg}`);
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (res.next_cursor && res.next_cursor !== this.syncCursor) {
|
if (res.next_cursor && res.next_cursor !== this.syncCursor) {
|
||||||
this.syncCursor = res.next_cursor;
|
this.syncCursor = res.next_cursor;
|
||||||
// Persist asynchronously — don't block the poll loop
|
|
||||||
this.persistCursor(this.syncCursor).catch(() => {});
|
this.persistCursor(this.syncCursor).catch(() => {});
|
||||||
}
|
}
|
||||||
|
|
||||||
const msgList = res.msg_list ?? [];
|
const msgList = res.msg_list ?? [];
|
||||||
|
|
||||||
// Handle system events (enter_session etc.) — origin=0, msgtype="event"
|
// System events (enter_session etc.)
|
||||||
const systemEvents = msgList.filter(m => m.msgtype === 'event' && m.event);
|
for (const ev of msgList.filter(m => m.msgtype === 'event' && m.event)) {
|
||||||
for (const ev of systemEvents) {
|
|
||||||
if (ev.event?.event_type === 'enter_session') {
|
if (ev.event?.event_type === 'enter_session') {
|
||||||
this.handleEnterSession(ev.external_userid).catch((e: Error) => {
|
this.handleEnterSession(ev.external_userid).catch((e: Error) => {
|
||||||
this.logger.error('WeCom handleEnterSession error:', e.message);
|
this.logger.error('WeCom handleEnterSession error:', e.message);
|
||||||
|
|
@ -292,19 +388,13 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle user messages — origin=3
|
// User messages (origin=3)
|
||||||
const userMessages = msgList.filter(m => m.origin === 3);
|
for (const msg of msgList.filter(m => m.origin === 3)) {
|
||||||
for (const msg of userMessages) {
|
|
||||||
this.lastMsgTime = Date.now();
|
this.lastMsgTime = Date.now();
|
||||||
const wecomMsg: WecomMsg = {
|
this.handleMessage(
|
||||||
externalUserId: msg.external_userid,
|
{ externalUserId: msg.external_userid, msgId: msg.msgid, text: msg.text?.content ?? '', openKfId: msg.open_kfid },
|
||||||
msgId: msg.msgid,
|
msg.msgtype,
|
||||||
text: msg.text?.content ?? '',
|
).catch((e: Error) => this.logger.error('WeCom handleMessage error:', e.message));
|
||||||
openKfId: msg.open_kfid,
|
|
||||||
};
|
|
||||||
this.handleMessage(wecomMsg, msg.msgtype).catch((e: Error) => {
|
|
||||||
this.logger.error('WeCom handleMessage error:', e.message);
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return res.has_more === 1;
|
return res.has_more === 1;
|
||||||
|
|
@ -315,7 +405,6 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
||||||
private async handleEnterSession(externalUserId: string): Promise<void> {
|
private async handleEnterSession(externalUserId: string): Promise<void> {
|
||||||
if (!externalUserId) return;
|
if (!externalUserId) return;
|
||||||
this.logger.log(`WeCom enter_session: externalUserId=${externalUserId}`);
|
this.logger.log(`WeCom enter_session: externalUserId=${externalUserId}`);
|
||||||
|
|
||||||
const instance = await this.instanceRepo.findByWecomExternalUserId(externalUserId);
|
const instance = await this.instanceRepo.findByWecomExternalUserId(externalUserId);
|
||||||
if (instance) {
|
if (instance) {
|
||||||
await this.sendMessage(
|
await this.sendMessage(
|
||||||
|
|
@ -339,14 +428,11 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
||||||
|
|
||||||
private async handleMessage(msg: WecomMsg, msgType: string): Promise<void> {
|
private async handleMessage(msg: WecomMsg, msgType: string): Promise<void> {
|
||||||
if (!msg.externalUserId || !msg.msgId) return;
|
if (!msg.externalUserId || !msg.msgId) return;
|
||||||
|
|
||||||
// Dedup
|
|
||||||
if (this.dedup.has(msg.msgId)) return;
|
if (this.dedup.has(msg.msgId)) return;
|
||||||
this.dedup.set(msg.msgId, Date.now());
|
this.dedup.set(msg.msgId, Date.now());
|
||||||
|
|
||||||
this.logger.log(`WeCom message: externalUserId=${msg.externalUserId} type=${msgType} msgId=${msg.msgId}`);
|
this.logger.log(`WeCom message: externalUserId=${msg.externalUserId} type=${msgType} msgId=${msg.msgId}`);
|
||||||
|
|
||||||
// Non-text messages
|
|
||||||
if (msgType !== 'text' || !msg.text) {
|
if (msgType !== 'text' || !msg.text) {
|
||||||
await this.sendMessage(msg.externalUserId, '我目前只能处理文字消息~\n图片、语音请转换成文字后再发给我。');
|
await this.sendMessage(msg.externalUserId, '我目前只能处理文字消息~\n图片、语音请转换成文字后再发给我。');
|
||||||
return;
|
return;
|
||||||
|
|
@ -355,7 +441,7 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
||||||
const text = msg.text.trim();
|
const text = msg.text.trim();
|
||||||
if (!text) return;
|
if (!text) return;
|
||||||
|
|
||||||
// Binding code check (6-char hex, case-insensitive)
|
// Binding code check (6-char hex)
|
||||||
const upperText = text.toUpperCase();
|
const upperText = text.toUpperCase();
|
||||||
const bindEntry = this.bindingCodes.get(upperText);
|
const bindEntry = this.bindingCodes.get(upperText);
|
||||||
if (bindEntry) {
|
if (bindEntry) {
|
||||||
|
|
@ -371,13 +457,11 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Rate limit
|
|
||||||
if (!this.rateAllow(msg.externalUserId)) {
|
if (!this.rateAllow(msg.externalUserId)) {
|
||||||
await this.sendMessage(msg.externalUserId, '消息频率过高,请稍后再试(每分钟最多10条)。');
|
await this.sendMessage(msg.externalUserId, '消息频率过高,请稍后再试(每分钟最多10条)。');
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Queue
|
|
||||||
const pendingDepth = this.queueDepths.get(msg.externalUserId) ?? 0;
|
const pendingDepth = this.queueDepths.get(msg.externalUserId) ?? 0;
|
||||||
const accepted = this.enqueue(msg.externalUserId, () => this.routeToAgent(msg.externalUserId, text, msg));
|
const accepted = this.enqueue(msg.externalUserId, () => this.routeToAgent(msg.externalUserId, text, msg));
|
||||||
if (!accepted) {
|
if (!accepted) {
|
||||||
|
|
@ -415,7 +499,6 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
||||||
const instance = await this.instanceRepo.findByWecomExternalUserId(externalUserId);
|
const instance = await this.instanceRepo.findByWecomExternalUserId(externalUserId);
|
||||||
|
|
||||||
if (!instance) {
|
if (!instance) {
|
||||||
this.logger.warn(`No WeCom binding for externalUserId=${externalUserId}`);
|
|
||||||
await this.sendMessage(
|
await this.sendMessage(
|
||||||
externalUserId,
|
externalUserId,
|
||||||
'👋 你还没有绑定专属小龙虾。\n\n步骤:\n1. 打开 IT0 App\n2. 创建或选择一只小龙虾\n3. 点击「绑定微信」获取验证码\n4. 把验证码发给我就好了~',
|
'👋 你还没有绑定专属小龙虾。\n\n步骤:\n1. 打开 IT0 App\n2. 创建或选择一只小龙虾\n3. 点击「绑定微信」获取验证码\n4. 把验证码发给我就好了~',
|
||||||
|
|
@ -424,36 +507,30 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (instance.status !== 'running') {
|
if (instance.status !== 'running') {
|
||||||
const statusHint: Record<string, string> = {
|
const hints: Record<string, string> = {
|
||||||
stopped: `💤 小龙虾「${instance.name}」正在休息,请在 IT0 App 中点击启动后再来找我~`,
|
stopped: `💤 小龙虾「${instance.name}」正在休息,请在 IT0 App 中点击启动后再来找我~`,
|
||||||
starting: `⏳ 小龙虾「${instance.name}」还在启动中,请等待约1分钟后重试。`,
|
starting: `⏳ 小龙虾「${instance.name}」还在启动中,请等待约1分钟后重试。`,
|
||||||
error: `⚠️ 小龙虾「${instance.name}」遇到了问题,请在 IT0 App 中检查状态。`,
|
error: `⚠️ 小龙虾「${instance.name}」遇到了问题,请在 IT0 App 中检查状态。`,
|
||||||
};
|
};
|
||||||
await this.sendMessage(
|
await this.sendMessage(
|
||||||
externalUserId,
|
externalUserId,
|
||||||
statusHint[instance.status] ?? `小龙虾「${instance.name}」当前无法接收指令(${instance.status}),请在 IT0 App 中处理。`,
|
hints[instance.status] ?? `小龙虾「${instance.name}」当前无法接收指令(${instance.status}),请在 IT0 App 中处理。`,
|
||||||
);
|
);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!instance.serverHost) {
|
if (!instance.serverHost) {
|
||||||
this.logger.error(`Instance ${instance.id} has no serverHost`);
|
|
||||||
await this.sendMessage(externalUserId, '小龙虾配置异常(缺少服务器地址),请联系管理员。');
|
await this.sendMessage(externalUserId, '小龙虾配置异常(缺少服务器地址),请联系管理员。');
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const asyncBridgeUrl = `http://${instance.serverHost}:${instance.hostPort}/task-async`;
|
const asyncBridgeUrl = `http://${instance.serverHost}:${instance.hostPort}/task-async`;
|
||||||
const callbackUrl = `${this.agentCallbackBaseUrl}/api/v1/agent/channels/wecom/bridge-callback`;
|
const callbackUrl = `${this.agentCallbackBaseUrl}/api/v1/agent/channels/wecom/bridge-callback`;
|
||||||
this.logger.log(
|
this.logger.log(`WeCom routing msgId=${msg.msgId} → instance ${instance.id} @ ${asyncBridgeUrl}`);
|
||||||
`WeCom routing msgId=${msg.msgId} → instance ${instance.id} (${instance.name}) @ ${asyncBridgeUrl}`,
|
|
||||||
);
|
|
||||||
|
|
||||||
// Immediate ack
|
|
||||||
await this.sendMessage(externalUserId, '🤔 小虾米正在思考,稍等~');
|
await this.sendMessage(externalUserId, '🤔 小虾米正在思考,稍等~');
|
||||||
|
|
||||||
// Progress reminder after 25s
|
let thinkingTimer: NodeJS.Timeout | undefined = setTimeout(() => {
|
||||||
let thinkingTimer: NodeJS.Timeout | undefined;
|
|
||||||
thinkingTimer = setTimeout(() => {
|
|
||||||
this.sendMessage(externalUserId, '⏳ 还在努力想呢,这个任务有点复杂,请再等一下~').catch(() => {});
|
this.sendMessage(externalUserId, '⏳ 还在努力想呢,这个任务有点复杂,请再等一下~').catch(() => {});
|
||||||
}, THINKING_REMINDER_MS);
|
}, THINKING_REMINDER_MS);
|
||||||
if (thinkingTimer.unref) thinkingTimer.unref();
|
if (thinkingTimer.unref) thinkingTimer.unref();
|
||||||
|
|
@ -475,12 +552,9 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
||||||
const ack = await this.httpPostJson<{ ok: boolean; pending?: boolean; error?: string }>(
|
const ack = await this.httpPostJson<{ ok: boolean; pending?: boolean; error?: string }>(
|
||||||
asyncBridgeUrl,
|
asyncBridgeUrl,
|
||||||
{
|
{
|
||||||
prompt: text,
|
prompt: text, sessionKey: `agent:main:wx-${externalUserId}`,
|
||||||
sessionKey: `agent:main:wx-${externalUserId}`,
|
idempotencyKey: msg.msgId, timeoutSeconds: TASK_TIMEOUT_S,
|
||||||
idempotencyKey: msg.msgId,
|
callbackUrl, callbackData: { externalUserId, msgId: msg.msgId },
|
||||||
timeoutSeconds: TASK_TIMEOUT_S,
|
|
||||||
callbackUrl,
|
|
||||||
callbackData: { externalUserId, msgId: msg.msgId },
|
|
||||||
},
|
},
|
||||||
15_000,
|
15_000,
|
||||||
);
|
);
|
||||||
|
|
@ -488,11 +562,9 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
||||||
if (!ack.ok) {
|
if (!ack.ok) {
|
||||||
this.pendingCallbacks.delete(msg.msgId);
|
this.pendingCallbacks.delete(msg.msgId);
|
||||||
const bridgeError = ack.error ?? '';
|
const bridgeError = ack.error ?? '';
|
||||||
if (bridgeError.includes('not connected') || bridgeError.includes('Gateway not connected')) {
|
reply = bridgeError.includes('not connected') || bridgeError.includes('Gateway not connected')
|
||||||
reply = `🔄 小虾米正在重启,请等待约30秒后重试。`;
|
? `🔄 小虾米正在重启,请等待约30秒后重试。`
|
||||||
} else {
|
: `小虾米遇到了问题,请稍后重试。`;
|
||||||
reply = `小虾米遇到了问题,请稍后重试。`;
|
|
||||||
}
|
|
||||||
this.logger.warn(`WeCom bridge rejected task for instance ${instance.id}: ${bridgeError}`);
|
this.logger.warn(`WeCom bridge rejected task for instance ${instance.id}: ${bridgeError}`);
|
||||||
} else {
|
} else {
|
||||||
reply = await callbackPromise;
|
reply = await callbackPromise;
|
||||||
|
|
@ -504,6 +576,7 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
||||||
reply = this.buildErrorReply(e.message, instance.name, !!e.isTimeout);
|
reply = this.buildErrorReply(e.message, instance.name, !!e.isTimeout);
|
||||||
} finally {
|
} finally {
|
||||||
clearTimeout(thinkingTimer);
|
clearTimeout(thinkingTimer);
|
||||||
|
thinkingTimer = undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
await this.sendMessage(externalUserId, reply);
|
await this.sendMessage(externalUserId, reply);
|
||||||
|
|
@ -516,9 +589,8 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
||||||
`建议:\n• 把任务拆成更小的步骤\n• 简化指令后重试\n• 如果问题复杂,可以分多轮来说`
|
`建议:\n• 把任务拆成更小的步骤\n• 简化指令后重试\n• 如果问题复杂,可以分多轮来说`
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
if (error.includes('disconnected') || error.includes('not connected')) {
|
if (error.includes('disconnected') || error.includes('not connected'))
|
||||||
return `🔄 「${instanceName}」与服务的连接中断了,请等待约30秒后重试。`;
|
return `🔄 「${instanceName}」与服务的连接中断了,请等待约30秒后重试。`;
|
||||||
}
|
|
||||||
if (error.includes('aborted')) return `⚠️ 任务被中止了,请重新发送。`;
|
if (error.includes('aborted')) return `⚠️ 任务被中止了,请重新发送。`;
|
||||||
if (error.includes('shutting down')) return `🔄 服务正在重启,请稍后重试。`;
|
if (error.includes('shutting down')) return `🔄 服务正在重启,请稍后重试。`;
|
||||||
return `😰 小虾米遇到了点问题,请稍后重试。如果持续出现,请联系管理员。`;
|
return `😰 小虾米遇到了点问题,请稍后重试。如果持续出现,请联系管理员。`;
|
||||||
|
|
@ -526,18 +598,10 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
||||||
|
|
||||||
// ── Send message ───────────────────────────────────────────────────────────
|
// ── Send message ───────────────────────────────────────────────────────────
|
||||||
|
|
||||||
/**
|
|
||||||
* Send a text message to a WeChat user via 微信客服 send_msg API.
|
|
||||||
* Chunked if over WECOM_MAX_CHARS. Retries once on error.
|
|
||||||
*/
|
|
||||||
private async sendMessage(externalUserId: string, content: string): Promise<void> {
|
private async sendMessage(externalUserId: string, content: string): Promise<void> {
|
||||||
const safe = content.replace(/\s+at\s+\S+:\d+:\d+/g, '').trim() || '(空响应)';
|
const safe = content.replace(/\s+at\s+\S+:\d+:\d+/g, '').trim() || '(空响应)';
|
||||||
const chunks: string[] = [];
|
|
||||||
for (let i = 0; i < safe.length; i += WECOM_MAX_CHARS) {
|
for (let i = 0; i < safe.length; i += WECOM_MAX_CHARS) {
|
||||||
chunks.push(safe.slice(i, i + WECOM_MAX_CHARS));
|
await this.sendChunkWithRetry(externalUserId, safe.slice(i, i + WECOM_MAX_CHARS));
|
||||||
}
|
|
||||||
for (const chunk of chunks) {
|
|
||||||
await this.sendChunkWithRetry(externalUserId, chunk);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -545,47 +609,40 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
||||||
for (let attempt = 0; attempt < 2; attempt++) {
|
for (let attempt = 0; attempt < 2; attempt++) {
|
||||||
try {
|
try {
|
||||||
const token = await this.getAccessToken();
|
const token = await this.getAccessToken();
|
||||||
const res = await this.wecomPost<{ errcode: number; errmsg: string; msgid?: string }>(
|
const res = await this.wecomPost<{ errcode: number; errmsg: string }>(
|
||||||
`/cgi-bin/kf/send_msg?access_token=${token}`,
|
`/cgi-bin/kf/send_msg?access_token=${token}`,
|
||||||
{
|
{ touser: externalUserId, open_kfid: this.openKfId, msgtype: 'text', text: { content: chunk } },
|
||||||
touser: externalUserId,
|
|
||||||
open_kfid: this.openKfId,
|
|
||||||
msgtype: 'text',
|
|
||||||
text: { content: chunk },
|
|
||||||
},
|
|
||||||
);
|
);
|
||||||
if (res.errcode === 0) return; // success
|
if (res.errcode === 0) return;
|
||||||
this.logger.warn(
|
this.logger.warn(
|
||||||
`WeCom send_msg attempt ${attempt + 1} failed: ${res.errcode} ${res.errmsg} externalUserId=${externalUserId}`,
|
`WeCom send_msg attempt ${attempt + 1} errcode=${res.errcode} ${res.errmsg} externalUserId=${externalUserId}`,
|
||||||
);
|
);
|
||||||
if (attempt === 0) {
|
|
||||||
await new Promise(r => setTimeout(r, SEND_RETRY_DELAY_MS));
|
|
||||||
}
|
|
||||||
} catch (e: any) {
|
} catch (e: any) {
|
||||||
this.logger.warn(
|
this.logger.warn(`WeCom sendChunk attempt ${attempt + 1} threw: ${e.message}`);
|
||||||
`WeCom sendMessage attempt ${attempt + 1} threw for externalUserId=${externalUserId}: ${e.message}`,
|
|
||||||
);
|
|
||||||
if (attempt === 0) {
|
|
||||||
await new Promise(r => setTimeout(r, SEND_RETRY_DELAY_MS));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
if (attempt === 0) await new Promise(r => setTimeout(r, SEND_RETRY_DELAY_MS));
|
||||||
}
|
}
|
||||||
this.logger.error(`WeCom sendMessage failed after 2 attempts for externalUserId=${externalUserId}`);
|
this.logger.error(`WeCom sendMessage failed after 2 attempts for externalUserId=${externalUserId}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Access token ───────────────────────────────────────────────────────────
|
// ── Access token (with refresh mutex) ─────────────────────────────────────
|
||||||
|
|
||||||
private async getAccessToken(): Promise<string> {
|
private async getAccessToken(): Promise<string> {
|
||||||
if (this.tokenCache && Date.now() < this.tokenExpiresAt - 300_000) {
|
if (this.tokenCache && Date.now() < this.tokenExpiresAt - 300_000) {
|
||||||
return this.tokenCache;
|
return this.tokenCache;
|
||||||
}
|
}
|
||||||
const res = await this.wecomGet<{
|
// Mutex: reuse in-flight refresh if one is already running
|
||||||
errcode: number;
|
if (this.tokenRefreshPromise) return this.tokenRefreshPromise;
|
||||||
errmsg: string;
|
this.tokenRefreshPromise = this.refreshToken().finally(() => {
|
||||||
access_token: string;
|
this.tokenRefreshPromise = undefined;
|
||||||
expires_in: number;
|
});
|
||||||
}>(`/cgi-bin/gettoken?corpid=${this.corpId}&corpsecret=${this.kfSecret}`);
|
return this.tokenRefreshPromise;
|
||||||
|
}
|
||||||
|
|
||||||
|
private async refreshToken(): Promise<string> {
|
||||||
|
const res = await this.wecomGet<{
|
||||||
|
errcode: number; errmsg: string; access_token: string; expires_in: number;
|
||||||
|
}>(`/cgi-bin/gettoken?corpid=${this.corpId}&corpsecret=${this.kfSecret}`);
|
||||||
if (res.errcode !== 0) throw new Error(`WeCom gettoken error: ${res.errcode} ${res.errmsg}`);
|
if (res.errcode !== 0) throw new Error(`WeCom gettoken error: ${res.errcode} ${res.errmsg}`);
|
||||||
this.tokenCache = res.access_token;
|
this.tokenCache = res.access_token;
|
||||||
this.tokenExpiresAt = Date.now() + res.expires_in * 1000;
|
this.tokenExpiresAt = Date.now() + res.expires_in * 1000;
|
||||||
|
|
@ -603,7 +660,7 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
||||||
const next = tail
|
const next = tail
|
||||||
.then(task)
|
.then(task)
|
||||||
.catch((e: Error) => {
|
.catch((e: Error) => {
|
||||||
try { this.logger.error(`Queue task error (${userId}):`, e.message); } catch { /* ignore */ }
|
try { this.logger.error(`Queue task error (${userId}):`, e.message); } catch { /* */ }
|
||||||
})
|
})
|
||||||
.finally(() => {
|
.finally(() => {
|
||||||
const remaining = (this.queueDepths.get(userId) ?? 1) - 1;
|
const remaining = (this.queueDepths.get(userId) ?? 1) - 1;
|
||||||
|
|
@ -623,7 +680,7 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
||||||
|
|
||||||
private rateAllow(userId: string): boolean {
|
private rateAllow(userId: string): boolean {
|
||||||
const now = Date.now();
|
const now = Date.now();
|
||||||
const timestamps = (this.rateWindows.get(userId) ?? []).filter((t) => now - t < 60_000);
|
const timestamps = (this.rateWindows.get(userId) ?? []).filter(t => now - t < 60_000);
|
||||||
if (timestamps.length >= RATE_LIMIT_PER_MIN) {
|
if (timestamps.length >= RATE_LIMIT_PER_MIN) {
|
||||||
this.rateWindows.set(userId, timestamps);
|
this.rateWindows.set(userId, timestamps);
|
||||||
return false;
|
return false;
|
||||||
|
|
@ -637,18 +694,16 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
||||||
|
|
||||||
private periodicCleanup(): void {
|
private periodicCleanup(): void {
|
||||||
const now = Date.now();
|
const now = Date.now();
|
||||||
for (const [id, ts] of this.dedup) {
|
for (const [id, ts] of this.dedup)
|
||||||
if (now - ts > DEDUP_TTL_MS) this.dedup.delete(id);
|
if (now - ts > DEDUP_TTL_MS) this.dedup.delete(id);
|
||||||
}
|
|
||||||
for (const [userId, timestamps] of this.rateWindows) {
|
for (const [userId, timestamps] of this.rateWindows) {
|
||||||
const fresh = timestamps.filter((t) => now - t < 60_000);
|
const fresh = timestamps.filter(t => now - t < 60_000);
|
||||||
if (fresh.length === 0) this.rateWindows.delete(userId);
|
if (fresh.length === 0) this.rateWindows.delete(userId);
|
||||||
else this.rateWindows.set(userId, fresh);
|
else this.rateWindows.set(userId, fresh);
|
||||||
}
|
}
|
||||||
for (const [code, entry] of this.bindingCodes) {
|
for (const [code, entry] of this.bindingCodes)
|
||||||
if (now > entry.expiresAt) this.bindingCodes.delete(code);
|
if (now > entry.expiresAt) this.bindingCodes.delete(code);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// ── HTTP helpers ───────────────────────────────────────────────────────────
|
// ── HTTP helpers ───────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
@ -657,13 +712,10 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
||||||
const req = https.request(
|
const req = https.request(
|
||||||
{ hostname: WECOM_API_HOST, path, method: 'GET', timeout: 10_000 },
|
{ hostname: WECOM_API_HOST, path, method: 'GET', timeout: 10_000 },
|
||||||
(res) => {
|
(res) => {
|
||||||
let data = ''; let totalBytes = 0;
|
let data = ''; let total = 0;
|
||||||
res.on('data', (chunk: Buffer) => {
|
res.on('data', (c: Buffer) => { total += c.length; if (total <= MAX_RESPONSE_BYTES) data += c.toString(); });
|
||||||
totalBytes += chunk.length;
|
|
||||||
if (totalBytes <= MAX_RESPONSE_BYTES) data += chunk.toString();
|
|
||||||
});
|
|
||||||
res.on('end', () => {
|
res.on('end', () => {
|
||||||
if (totalBytes > MAX_RESPONSE_BYTES) { reject(new Error('Response too large')); return; }
|
if (total > MAX_RESPONSE_BYTES) { reject(new Error('Response too large')); return; }
|
||||||
try { resolve(JSON.parse(data) as T); } catch (e) { reject(e); }
|
try { resolve(JSON.parse(data) as T); } catch (e) { reject(e); }
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
|
|
@ -680,20 +732,14 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
||||||
const req = https.request(
|
const req = https.request(
|
||||||
{
|
{
|
||||||
hostname: WECOM_API_HOST, path, method: 'POST',
|
hostname: WECOM_API_HOST, path, method: 'POST',
|
||||||
headers: {
|
headers: { 'Content-Type': 'application/json', 'Content-Length': Buffer.byteLength(body) },
|
||||||
'Content-Type': 'application/json',
|
|
||||||
'Content-Length': Buffer.byteLength(body),
|
|
||||||
},
|
|
||||||
timeout: 10_000,
|
timeout: 10_000,
|
||||||
},
|
},
|
||||||
(res) => {
|
(res) => {
|
||||||
let data = ''; let totalBytes = 0;
|
let data = ''; let total = 0;
|
||||||
res.on('data', (chunk: Buffer) => {
|
res.on('data', (c: Buffer) => { total += c.length; if (total <= MAX_RESPONSE_BYTES) data += c.toString(); });
|
||||||
totalBytes += chunk.length;
|
|
||||||
if (totalBytes <= MAX_RESPONSE_BYTES) data += chunk.toString();
|
|
||||||
});
|
|
||||||
res.on('end', () => {
|
res.on('end', () => {
|
||||||
if (totalBytes > MAX_RESPONSE_BYTES) { reject(new Error('Response too large')); return; }
|
if (total > MAX_RESPONSE_BYTES) { reject(new Error('Response too large')); return; }
|
||||||
try { resolve(JSON.parse(data) as T); } catch (e) { reject(e); }
|
try { resolve(JSON.parse(data) as T); } catch (e) { reject(e); }
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
|
|
@ -711,21 +757,16 @@ export class WecomRouterService implements OnModuleInit, OnModuleDestroy {
|
||||||
const body = JSON.stringify(payload);
|
const body = JSON.stringify(payload);
|
||||||
const req = http.request(
|
const req = http.request(
|
||||||
{
|
{
|
||||||
hostname: parsed.hostname,
|
hostname: parsed.hostname, port: parseInt(parsed.port, 10),
|
||||||
port: parseInt(parsed.port, 10),
|
path: parsed.pathname, method: 'POST',
|
||||||
path: parsed.pathname,
|
|
||||||
method: 'POST',
|
|
||||||
headers: { 'Content-Type': 'application/json', 'Content-Length': Buffer.byteLength(body) },
|
headers: { 'Content-Type': 'application/json', 'Content-Length': Buffer.byteLength(body) },
|
||||||
timeout: timeoutMs,
|
timeout: timeoutMs,
|
||||||
},
|
},
|
||||||
(res) => {
|
(res) => {
|
||||||
let data = ''; let totalBytes = 0;
|
let data = ''; let total = 0;
|
||||||
res.on('data', (c: Buffer) => {
|
res.on('data', (c: Buffer) => { total += c.length; if (total <= MAX_RESPONSE_BYTES) data += c.toString(); });
|
||||||
totalBytes += c.length;
|
|
||||||
if (totalBytes <= MAX_RESPONSE_BYTES) data += c.toString();
|
|
||||||
});
|
|
||||||
res.on('end', () => {
|
res.on('end', () => {
|
||||||
if (totalBytes > MAX_RESPONSE_BYTES) { reject(new Error('Bridge response too large')); return; }
|
if (total > MAX_RESPONSE_BYTES) { reject(new Error('Bridge response too large')); return; }
|
||||||
try { resolve(JSON.parse(data) as T); } catch (e) { reject(e); }
|
try { resolve(JSON.parse(data) as T); } catch (e) { reject(e); }
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue