diff --git a/deploy/docker/docker-compose.yml b/deploy/docker/docker-compose.yml index 2e7046d..b5ca95a 100644 --- a/deploy/docker/docker-compose.yml +++ b/deploy/docker/docker-compose.yml @@ -152,6 +152,8 @@ services: - AGENT_SERVICE_PUBLIC_URL=${AGENT_SERVICE_PUBLIC_URL} - IT0_DINGTALK_CLIENT_ID=${IT0_DINGTALK_CLIENT_ID:-} - IT0_DINGTALK_CLIENT_SECRET=${IT0_DINGTALK_CLIENT_SECRET:-} + - IT0_FEISHU_APP_ID=${IT0_FEISHU_APP_ID:-} + - IT0_FEISHU_APP_SECRET=${IT0_FEISHU_APP_SECRET:-} - IT0_BASE_URL=${IT0_BASE_URL:-https://it0api.szaiai.com} - OPENCLAW_LLM_GATEWAY_URL=${OPENCLAW_LLM_GATEWAY_URL:-http://154.84.135.121:3008} - OPENCLAW_LLM_GATEWAY_KEY=${OPENCLAW_LLM_GATEWAY_KEY:-} diff --git a/packages/gateway/config/kong.yml b/packages/gateway/config/kong.yml index d8ef4a0..d48a3cb 100644 --- a/packages/gateway/config/kong.yml +++ b/packages/gateway/config/kong.yml @@ -30,6 +30,15 @@ services: - /api/v1/agent/channels/dingtalk/oauth/callback strip_path: false + # Public Feishu OAuth callback — no JWT (Feishu redirects here after user taps Authorize) + - name: feishu-oauth-public + url: http://agent-service:3002 + routes: + - name: feishu-oauth-callback + paths: + - /api/v1/agent/channels/feishu/oauth/callback + strip_path: false + - name: agent-service url: http://agent-service:3002 routes: diff --git a/packages/services/agent-service/src/agent.module.ts b/packages/services/agent-service/src/agent.module.ts index 1c02dde..f8520af 100644 --- a/packages/services/agent-service/src/agent.module.ts +++ b/packages/services/agent-service/src/agent.module.ts @@ -53,6 +53,7 @@ import { AgentInstanceDeployService } from './infrastructure/services/agent-inst import { AgentInstanceController } from './interfaces/rest/controllers/agent-instance.controller'; import { AgentChannelController } from './interfaces/rest/controllers/agent-channel.controller'; import { DingTalkRouterService } from './infrastructure/dingtalk/dingtalk-router.service'; +import { FeishuRouterService } from './infrastructure/feishu/feishu-router.service'; import { SystemPromptBuilder } from './infrastructure/engines/claude-code-cli/system-prompt-builder'; @Module({ @@ -101,6 +102,7 @@ import { SystemPromptBuilder } from './infrastructure/engines/claude-code-cli/sy AgentInstanceRepository, AgentInstanceDeployService, DingTalkRouterService, + FeishuRouterService, SystemPromptBuilder, ], }) diff --git a/packages/services/agent-service/src/infrastructure/feishu/feishu-router.service.ts b/packages/services/agent-service/src/infrastructure/feishu/feishu-router.service.ts new file mode 100644 index 0000000..818f981 --- /dev/null +++ b/packages/services/agent-service/src/infrastructure/feishu/feishu-router.service.ts @@ -0,0 +1,679 @@ +/** + * Feishu Router Service + * + * IT0 unified Feishu bot — one central long-connection for all agent instances. + * + * Responsibilities: + * 1. Maintain a Feishu WebSocket long-connection using @larksuiteoapi/node-sdk. + * 2. Handle binding codes: user sends code → maps their Feishu open_id to an instance. + * 3. Route regular messages: looks up bound instance → POSTs to bridge /task-async → replies. + * + * Required env vars: + * IT0_FEISHU_APP_ID — Feishu app_id (cli_xxx) + * IT0_FEISHU_APP_SECRET — Feishu app_secret + * + * Binding flow (code): + * 1. Frontend calls POST /api/v1/agent/channels/feishu/bind/:instanceId + * 2. Backend returns { code: "A3K9F2", expiresAt } + * 3. User opens Feishu → IT0 bot → sends "A3K9F2" + * 4. This service matches code → saves feishuUserId → replies "✅ 绑定成功" + * + * Binding flow (OAuth): + * 1. Frontend calls GET /api/v1/agent/channels/feishu/oauth/init?instanceId=xxx + * 2. Backend returns { oauthUrl } + * 3. User opens oauthUrl → taps Authorize → Feishu redirects to our callback + * 4. Callback exchanges code → gets open_id → saves binding + * + * Robustness guarantees: + * - SDK handles WS reconnection automatically (exponential backoff, token refresh) + * - Per-user serial queue prevents concurrent LLM calls for same user + * - Dedup map prevents duplicate message processing on reconnect + * - Rate limit: 10 messages/minute per user + * - CALLBACK_TIMEOUT_MS safety valve if bridge crashes (180s) + * - Periodic cleanup of in-memory maps (5-min interval) + */ + +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import * as https from 'https'; +import * as http from 'http'; +import * as crypto from 'crypto'; +import * as lark from '@larksuiteoapi/node-sdk'; +import { AgentInstanceRepository } from '../repositories/agent-instance.repository'; + +// ── Types ───────────────────────────────────────────────────────────────────── + +interface FeishuMsg { + openId: string; // sender.sender_id.open_id + messageId: string; // message.message_id + text: string; // parsed text content + chatId: string; // message.chat_id +} + +interface BindingEntry { + instanceId: string; + expiresAt: number; +} + +// ── Constants ───────────────────────────────────────────────────────────────── + +const FEISHU_MAX_CHARS = 4000; // stay below Feishu's 4096-char limit +const CODE_TTL_MS = 15 * 60 * 1000; // 15 min +const OAUTH_STATE_TTL_MS = 10 * 60 * 1000; // 10 min +const TASK_TIMEOUT_S = 120; +const CALLBACK_TIMEOUT_MS = 180_000; +const THINKING_REMINDER_MS = 25_000; +const DEDUP_TTL_MS = 10 * 60 * 1000; +const RATE_LIMIT_PER_MIN = 10; +const QUEUE_MAX_DEPTH = 5; +const CLEANUP_INTERVAL_MS = 5 * 60 * 1000; +const MAX_RESPONSE_BYTES = 256 * 1024; + +const FEISHU_API_HOST = 'open.feishu.cn'; + +// ── Service ─────────────────────────────────────────────────────────────────── + +@Injectable() +export class FeishuRouterService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(FeishuRouterService.name); + private readonly appId: string; + private readonly appSecret: string; + private readonly enabled: boolean; + private readonly agentCallbackBaseUrl: string; + + // Feishu SDK clients + private httpClient!: lark.Client; + private wsClient!: lark.WSClient; + private stopping = false; + private cleanupTimer?: NodeJS.Timeout; + + // State + private readonly bindingCodes = new Map(); + private readonly oauthStates = new Map(); + private readonly dedup = new Map(); + private readonly rateWindows = new Map(); + private readonly queueTails = new Map>(); + private readonly queueDepths = new Map(); + private readonly pendingCallbacks = new Map void; + reject: (e: Error) => void; + timer: NodeJS.Timeout; + }>(); + + constructor( + private readonly configService: ConfigService, + private readonly instanceRepo: AgentInstanceRepository, + ) { + this.appId = this.configService.get('IT0_FEISHU_APP_ID', ''); + this.appSecret = this.configService.get('IT0_FEISHU_APP_SECRET', ''); + this.enabled = !!(this.appId && this.appSecret); + this.agentCallbackBaseUrl = this.configService.get('AGENT_SERVICE_PUBLIC_URL', ''); + } + + onModuleInit(): void { + if (!this.enabled) { + this.logger.warn('IT0_FEISHU_APP_ID/SECRET not set — Feishu router disabled'); + return; + } + this.logger.log('Feishu router starting...'); + + // HTTP client for sending messages + this.httpClient = new lark.Client({ + appId: this.appId, + appSecret: this.appSecret, + appType: lark.AppType.SelfBuild, + loggerLevel: lark.LoggerLevel.warn, + }); + + // WebSocket long-connection client — SDK handles auth, reconnect, ping/pong, ACK + this.wsClient = new lark.WSClient({ + appID: this.appId, + appSecret: this.appSecret, + loggerLevel: lark.LoggerLevel.warn, + }); + + this.wsClient.start({ + 'im.message.receive_v1': async (data: any) => { + try { + await this.handleIncomingEvent(data); + } catch (e: any) { + this.logger.error('Uncaught error in Feishu message handler:', e.message); + } + }, + }); + + this.cleanupTimer = setInterval(() => this.periodicCleanup(), CLEANUP_INTERVAL_MS); + if (this.cleanupTimer.unref) this.cleanupTimer.unref(); + + this.logger.log('Feishu router started (long-connection)'); + } + + onModuleDestroy(): void { + this.stopping = true; + clearInterval(this.cleanupTimer); + for (const [, cb] of this.pendingCallbacks) { + clearTimeout(cb.timer); + cb.reject(new Error('Service shutting down')); + } + this.pendingCallbacks.clear(); + } + + isEnabled(): boolean { + return this.enabled; + } + + // ── Async bridge callback ────────────────────────────────────────────────── + + resolveCallbackReply(msgId: string, ok: boolean, content: string, isTimeout?: boolean): void { + const cb = this.pendingCallbacks.get(msgId); + if (!cb) { + this.logger.warn(`Received callback for unknown msgId=${msgId} (already resolved or timed out)`); + return; + } + this.pendingCallbacks.delete(msgId); + clearTimeout(cb.timer); + if (ok) { + cb.resolve(content); + } else { + const err: Error & { isTimeout?: boolean } = new Error(content); + err.isTimeout = isTimeout ?? content.toLowerCase().includes('timeout'); + cb.reject(err); + } + } + + // ── Binding code API ─────────────────────────────────────────────────────── + + generateBindingCode(instanceId: string): { code: string; expiresAt: number } { + for (const [code, entry] of this.bindingCodes) { + if (entry.instanceId === instanceId) this.bindingCodes.delete(code); + } + const code = crypto.randomBytes(3).toString('hex').toUpperCase(); + const expiresAt = Date.now() + CODE_TTL_MS; + this.bindingCodes.set(code, { instanceId, expiresAt }); + return { code, expiresAt }; + } + + // ── OAuth API ────────────────────────────────────────────────────────────── + + generateOAuthUrl(instanceId: string): { oauthUrl: string; state: string } { + for (const [s, entry] of this.oauthStates) { + if (entry.instanceId === instanceId) this.oauthStates.delete(s); + } + const state = crypto.randomBytes(16).toString('hex'); + const expiresAt = Date.now() + OAUTH_STATE_TTL_MS; + this.oauthStates.set(state, { instanceId, expiresAt }); + + const baseUrl = this.configService.get('IT0_BASE_URL', 'https://it0api.szaiai.com'); + const redirectUri = `${baseUrl}/api/v1/agent/channels/feishu/oauth/callback`; + const params = new URLSearchParams({ + app_id: this.appId, + redirect_uri: redirectUri, + state, + }); + return { state, oauthUrl: `https://open.feishu.cn/open-apis/authen/v1/authorize?${params}` }; + } + + /** + * Complete OAuth flow: exchange auth code → get open_id → save binding → greet user. + * Feishu open_id works directly for proactive messaging — no staffId lookup needed. + */ + async completeOAuthBinding(code: string, state: string): Promise<{ instanceId: string; instanceName: string }> { + const entry = this.oauthStates.get(state); + if (!entry) throw new Error('无效或已过期的授权状态,请重新绑定'); + if (Date.now() > entry.expiresAt) { + this.oauthStates.delete(state); + throw new Error('授权已超时,请重新绑定'); + } + this.oauthStates.delete(state); + + // Exchange code for user access token + const appToken = await this.getAppAccessToken(); + const tokenRes = await this.httpsPost<{ code: number; msg: string; data: { access_token: string } }>( + FEISHU_API_HOST, + '/open-apis/authen/v1/access_token', + { grant_type: 'authorization_code', code }, + { 'Authorization': `Bearer ${appToken}` }, + ); + if (tokenRes.code !== 0) throw new Error(`Feishu OAuth token error: ${tokenRes.msg}`); + + // Get user info (open_id, union_id) + const userRes = await this.httpsGet<{ code: number; msg: string; data: { open_id: string; union_id: string; name: string } }>( + FEISHU_API_HOST, + '/open-apis/authen/v1/user_info', + { 'Authorization': `Bearer ${tokenRes.data.access_token}` }, + ); + if (userRes.code !== 0) throw new Error(`Feishu user info error: ${userRes.msg}`); + + const openId = userRes.data.open_id; + if (!openId) throw new Error('无法获取飞书用户身份,请重试'); + + this.logger.log(`OAuth user: openId=${openId} name=${userRes.data.name} for instance=${entry.instanceId}`); + + const instance = await this.instanceRepo.findById(entry.instanceId); + if (!instance) throw new Error('智能体实例不存在'); + + instance.feishuUserId = openId; + await this.instanceRepo.save(instance); + + this.logger.log(`Feishu OAuth binding saved: instance ${entry.instanceId} → feishuUserId=${openId}`); + + // Proactive greeting + this.sendGreeting(openId, userRes.data.name ?? '同学', instance.name).catch((e: Error) => + this.logger.warn(`Feishu greeting failed (non-fatal): ${e.message}`), + ); + + return { instanceId: entry.instanceId, instanceName: instance.name }; + } + + private async sendGreeting(openId: string, userName: string, agentName: string): Promise { + const greeting = + `👋 ${userName},你好!我是你的 AI 智能体助手「${agentName}」。\n\n` + + `从现在起,你可以直接在这里向我发送指令,我会自主地帮你完成工作任务。\n\n` + + `例如:\n• 查询服务器状态\n• 执行运维脚本\n• 管理文件和进程\n\n` + + `有什么需要帮忙的,直接说吧!`; + await this.sendMessage(openId, greeting); + this.logger.log(`Feishu greeting sent to openId=${openId}`); + } + + // ── Incoming event handling ──────────────────────────────────────────────── + + private async handleIncomingEvent(data: any): Promise { + const sender = data?.sender; + const message = data?.message; + + if (!sender || !message) return; + if (sender.sender_type !== 'user') return; // ignore bot messages + + const openId = sender.sender_id?.open_id; + const messageId = message.message_id; + const chatType = message.chat_type; // 'p2p' or 'group' + const chatId = message.chat_id; + const msgType = message.msg_type; + + if (!openId || !messageId) return; + + // Only handle P2P (direct message) or group @mention + // For group chats, ignore messages that don't @mention the bot + // For now, only handle P2P + if (chatType !== 'p2p') return; + + this.logger.log(`Feishu message: openId=${openId} msgType=${msgType} msgId=${messageId}`); + + // Parse text content + if (msgType !== 'text') { + await this.sendMessage(openId, '我目前只能处理文字消息~\n图片、语音请转换成文字后再发给我。'); + return; + } + + let text = ''; + try { + const content = JSON.parse(message.content ?? '{}'); + text = (content.text ?? '').trim(); + } catch { + return; + } + if (!text) return; + + // Dedup + if (this.dedup.has(messageId)) return; + this.dedup.set(messageId, Date.now()); + + const msg: FeishuMsg = { openId, messageId, text, chatId }; + + // Binding code check (6-char hex, case-insensitive) + const upperText = text.toUpperCase(); + const bindEntry = this.bindingCodes.get(upperText); + if (bindEntry) { + if (Date.now() > bindEntry.expiresAt) { + this.bindingCodes.delete(upperText); + await this.sendMessage(openId, '验证码已过期,请重新在 IT0 App 中生成新的验证码。'); + return; + } + this.bindingCodes.delete(upperText); + this.completeBinding(bindEntry.instanceId, openId, msg).catch((e: Error) => + this.logger.error('completeBinding error:', e.message), + ); + return; + } + + // Rate limit + if (!this.rateAllow(openId)) { + await this.sendMessage(openId, '消息频率过高,请稍后再试(每分钟最多10条)。'); + return; + } + + // Queue + const pendingDepth = this.queueDepths.get(openId) ?? 0; + const accepted = this.enqueue(openId, () => this.routeToAgent(openId, text, msg)); + if (!accepted) { + await this.sendMessage(openId, '消息太多了,请稍后再说~(当前排队已满,最多5条)'); + } else if (pendingDepth > 0) { + await this.sendMessage(openId, `📋 消息已收到,前面还有 ${pendingDepth} 条在处理,请稍候~`); + } + } + + // ── Binding completion ───────────────────────────────────────────────────── + + private async completeBinding(instanceId: string, openId: string, msg: FeishuMsg): Promise { + try { + const instance = await this.instanceRepo.findById(instanceId); + if (!instance) { + await this.sendMessage(openId, '绑定失败:智能体实例不存在,请重新操作。'); + return; + } + instance.feishuUserId = openId; + await this.instanceRepo.save(instance); + this.logger.log(`Feishu code-binding: instance ${instanceId} → openId=${openId}`); + await this.sendMessage( + openId, + `✅ 绑定成功!\n\n你的小龙虾「${instance.name}」已与飞书绑定。\n\n现在直接发消息给我,我会帮你转达给它!`, + ); + } catch (e: any) { + this.logger.error('Feishu completeBinding error:', e.message); + await this.sendMessage(openId, '绑定时出现错误,请稍后重试。'); + } + } + + // ── Message routing ──────────────────────────────────────────────────────── + + private async routeToAgent(openId: string, text: string, msg: FeishuMsg): Promise { + const instance = await this.instanceRepo.findByFeishuUserId(openId); + + if (!instance) { + this.logger.warn(`No Feishu binding for openId=${openId}`); + await this.sendMessage( + openId, + '👋 你还没有绑定专属小龙虾。\n\n步骤:\n1. 打开 IT0 App\n2. 创建或选择一只小龙虾\n3. 点击「绑定飞书」获取验证码\n4. 把验证码发给我就好了~', + ); + return; + } + + if (instance.status !== 'running') { + this.logger.warn(`Instance ${instance.id} not running: status=${instance.status}`); + const statusHint: Record = { + stopped: `💤 小龙虾「${instance.name}」正在休息,请在 IT0 App 中点击启动后再来找我~`, + starting: `⏳ 小龙虾「${instance.name}」还在启动中,请等待约1分钟后重试。`, + error: `⚠️ 小龙虾「${instance.name}」遇到了问题,请在 IT0 App 中检查状态。`, + }; + await this.sendMessage( + openId, + statusHint[instance.status] ?? `小龙虾「${instance.name}」当前无法接收指令(${instance.status}),请在 IT0 App 中处理。`, + ); + return; + } + + if (!instance.serverHost) { + this.logger.error(`Instance ${instance.id} has no serverHost`); + await this.sendMessage(openId, '小龙虾配置异常(缺少服务器地址),请联系管理员。'); + return; + } + + const asyncBridgeUrl = `http://${instance.serverHost}:${instance.hostPort}/task-async`; + const callbackUrl = `${this.agentCallbackBaseUrl}/api/v1/agent/channels/feishu/bridge-callback`; + this.logger.log( + `Feishu routing msgId=${msg.messageId} → instance ${instance.id} (${instance.name}) ` + + `async @ ${asyncBridgeUrl}`, + ); + + // Immediate ack + await this.sendMessage(openId, '🤔 小虾米正在思考,稍等~'); + + // Progress reminder after 25s + let thinkingTimer: NodeJS.Timeout | undefined; + thinkingTimer = setTimeout(() => { + this.sendMessage(openId, '⏳ 还在努力想呢,这个任务有点复杂,请再等一下~').catch(() => {}); + }, THINKING_REMINDER_MS); + if (thinkingTimer.unref) thinkingTimer.unref(); + + let reply = ''; + try { + const callbackPromise = new Promise((resolve, reject) => { + const timer = setTimeout(() => { + this.pendingCallbacks.delete(msg.messageId); + const err: Error & { isTimeout?: boolean } = new Error(`Async bridge callback timeout after ${CALLBACK_TIMEOUT_MS / 1000}s`); + err.isTimeout = true; + reject(err); + }, CALLBACK_TIMEOUT_MS); + this.pendingCallbacks.set(msg.messageId, { resolve, reject, timer }); + }); + + const ack = await this.httpPostJson<{ ok: boolean; pending?: boolean; error?: string }>( + asyncBridgeUrl, + { + prompt: text, + sessionKey: `agent:main:fs-${openId}`, + idempotencyKey: msg.messageId, + timeoutSeconds: TASK_TIMEOUT_S, + callbackUrl, + callbackData: { openId, msgId: msg.messageId }, + }, + 15_000, + ); + + if (!ack.ok) { + this.pendingCallbacks.delete(msg.messageId); + const bridgeError = ack.error ?? ''; + if (bridgeError.includes('not connected') || bridgeError.includes('Gateway not connected')) { + reply = `🔄 小虾米正在重启,请等待约30秒后重试。`; + } else { + reply = `小虾米遇到了问题,请稍后重试。`; + } + this.logger.warn(`Bridge rejected Feishu task for instance ${instance.id}: ${bridgeError}`); + } else { + reply = await callbackPromise; + this.logger.log(`Feishu bridge callback received, replyLen=${reply.length}`); + } + } catch (e: any) { + this.pendingCallbacks.delete(msg.messageId); + this.logger.error(`Feishu async bridge failed for instance ${instance.id}:`, e.message); + reply = this.buildErrorReply(e.message, instance.name, !!e.isTimeout); + } finally { + clearTimeout(thinkingTimer); + } + + await this.sendMessage(openId, reply); + } + + private buildErrorReply(error: string, instanceName: string, isTimeout: boolean): string { + if (isTimeout) { + return ( + `⏱️ 这个任务花的时间太长了,小虾米超时了。\n\n` + + `建议:\n• 把任务拆成更小的步骤\n• 简化指令后重试\n• 如果问题复杂,可以分多轮来说` + ); + } + if (error.includes('disconnected') || error.includes('not connected')) { + return `🔄 「${instanceName}」与服务的连接中断了,请等待约30秒后重试。`; + } + if (error.includes('aborted')) return `⚠️ 任务被中止了,请重新发送。`; + if (error.includes('shutting down')) return `🔄 服务正在重启,请稍后重试。`; + return `😰 小虾米遇到了点问题,请稍后重试。如果持续出现,请联系管理员。`; + } + + // ── Feishu message send ──────────────────────────────────────────────────── + + /** + * Send a text message to a user by open_id, chunked if over FEISHU_MAX_CHARS. + * Uses Feishu IM API POST /open-apis/im/v1/messages?receive_id_type=open_id. + */ + private async sendMessage(openId: string, content: string): Promise { + const safe = content.replace(/\s+at\s+\S+:\d+:\d+/g, '').trim() || '(空响应)'; + const chunks: string[] = []; + for (let i = 0; i < safe.length; i += FEISHU_MAX_CHARS) { + chunks.push(safe.slice(i, i + FEISHU_MAX_CHARS)); + } + for (const chunk of chunks) { + try { + await this.httpClient.im.message.create({ + params: { receive_id_type: 'open_id' }, + data: { + receive_id: openId, + msg_type: 'text', + content: JSON.stringify({ text: chunk }), + }, + }); + } catch (e: any) { + this.logger.error(`Feishu sendMessage failed for openId=${openId}:`, e.message); + } + } + } + + // ── Token ────────────────────────────────────────────────────────────────── + + private tokenCache = ''; + private tokenExpiresAt = 0; + + private async getAppAccessToken(): Promise { + if (this.tokenCache && Date.now() < this.tokenExpiresAt - 300_000) { + return this.tokenCache; + } + const res = await this.httpsPost<{ code: number; msg: string; app_access_token: string; expire: number }>( + FEISHU_API_HOST, + '/open-apis/auth/v3/app_access_token/internal', + { app_id: this.appId, app_secret: this.appSecret }, + ); + if (res.code !== 0) throw new Error(`Feishu token error: ${res.msg}`); + this.tokenCache = res.app_access_token; + this.tokenExpiresAt = Date.now() + res.expire * 1000; + return this.tokenCache; + } + + // ── Per-user serial queue ────────────────────────────────────────────────── + + private enqueue(userId: string, task: () => Promise): boolean { + const depth = this.queueDepths.get(userId) ?? 0; + if (depth >= QUEUE_MAX_DEPTH) return false; + this.queueDepths.set(userId, depth + 1); + const tail = this.queueTails.get(userId) ?? Promise.resolve(); + const next = tail + .then(task) + .catch((e: Error) => { + try { this.logger.error(`Queue task error (${userId}):`, e.message); } catch { /* ignore */ } + }) + .finally(() => { + const remaining = (this.queueDepths.get(userId) ?? 1) - 1; + if (remaining <= 0) { + this.queueDepths.delete(userId); + this.queueTails.delete(userId); + } else { + this.queueDepths.set(userId, remaining); + } + }) + .catch(() => {}); + this.queueTails.set(userId, next); + return true; + } + + // ── Rate limiter ─────────────────────────────────────────────────────────── + + private rateAllow(userId: string): boolean { + const now = Date.now(); + const timestamps = (this.rateWindows.get(userId) ?? []).filter((t) => now - t < 60_000); + if (timestamps.length >= RATE_LIMIT_PER_MIN) { + this.rateWindows.set(userId, timestamps); + return false; + } + timestamps.push(now); + this.rateWindows.set(userId, timestamps); + return true; + } + + // ── Periodic cleanup ─────────────────────────────────────────────────────── + + private periodicCleanup(): void { + const now = Date.now(); + for (const [id, ts] of this.dedup) { + if (now - ts > DEDUP_TTL_MS) this.dedup.delete(id); + } + for (const [userId, timestamps] of this.rateWindows) { + const fresh = timestamps.filter((t) => now - t < 60_000); + if (fresh.length === 0) this.rateWindows.delete(userId); + else this.rateWindows.set(userId, fresh); + } + for (const [code, entry] of this.bindingCodes) { + if (now > entry.expiresAt) this.bindingCodes.delete(code); + } + for (const [state, entry] of this.oauthStates) { + if (now > entry.expiresAt) this.oauthStates.delete(state); + } + } + + // ── HTTP helpers ─────────────────────────────────────────────────────────── + + private httpsGet(hostname: string, path: string, headers: Record = {}): Promise { + return new Promise((resolve, reject) => { + const req = https.request( + { hostname, path, method: 'GET', headers, timeout: 10_000 }, + (res) => { + let data = ''; let totalBytes = 0; + res.on('data', (chunk: Buffer) => { totalBytes += chunk.length; if (totalBytes <= MAX_RESPONSE_BYTES) data += chunk.toString(); }); + res.on('end', () => { + if (totalBytes > MAX_RESPONSE_BYTES) { reject(new Error('Response too large')); return; } + try { resolve(JSON.parse(data) as T); } catch (e) { reject(e); } + }); + }, + ); + req.on('timeout', () => { req.destroy(); reject(new Error('Feishu API GET timeout')); }); + req.on('error', reject); + req.end(); + }); + } + + private httpsPost( + hostname: string, + path: string, + payload: object, + extraHeaders: Record = {}, + ): Promise { + return new Promise((resolve, reject) => { + const body = JSON.stringify(payload); + const req = https.request( + { + hostname, path, method: 'POST', + headers: { 'Content-Type': 'application/json', 'Content-Length': Buffer.byteLength(body), ...extraHeaders }, + timeout: 10_000, + }, + (res) => { + let data = ''; let totalBytes = 0; + res.on('data', (chunk: Buffer) => { totalBytes += chunk.length; if (totalBytes <= MAX_RESPONSE_BYTES) data += chunk.toString(); }); + res.on('end', () => { + if (totalBytes > MAX_RESPONSE_BYTES) { reject(new Error('Response too large')); return; } + try { resolve(JSON.parse(data) as T); } catch (e) { reject(e); } + }); + }, + ); + req.on('timeout', () => { req.destroy(); reject(new Error('Feishu API POST timeout')); }); + req.on('error', reject); + req.write(body); + req.end(); + }); + } + + private httpPostJson(url: string, payload: object, timeoutMs = 35_000): Promise { + return new Promise((resolve, reject) => { + const parsed = new URL(url); + const body = JSON.stringify(payload); + const req = http.request( + { + hostname: parsed.hostname, + port: parseInt(parsed.port, 10), + path: parsed.pathname, + method: 'POST', + headers: { 'Content-Type': 'application/json', 'Content-Length': Buffer.byteLength(body) }, + timeout: timeoutMs, + }, + (res) => { + let data = ''; let totalBytes = 0; + res.on('data', (c: Buffer) => { totalBytes += c.length; if (totalBytes <= MAX_RESPONSE_BYTES) data += c.toString(); }); + res.on('end', () => { + if (totalBytes > MAX_RESPONSE_BYTES) { reject(new Error('Bridge response too large')); return; } + try { resolve(JSON.parse(data) as T); } catch (e) { reject(e); } + }); + }, + ); + req.on('timeout', () => { req.destroy(); reject(new Error('Bridge request timeout')); }); + req.on('error', reject); + req.write(body); + req.end(); + }); + } +} diff --git a/packages/services/agent-service/src/interfaces/rest/controllers/agent-channel.controller.ts b/packages/services/agent-service/src/interfaces/rest/controllers/agent-channel.controller.ts index 940ba28..93de40e 100644 --- a/packages/services/agent-service/src/interfaces/rest/controllers/agent-channel.controller.ts +++ b/packages/services/agent-service/src/interfaces/rest/controllers/agent-channel.controller.ts @@ -13,6 +13,7 @@ import { } from '@nestjs/common'; import { Response } from 'express'; import { DingTalkRouterService } from '../../../infrastructure/dingtalk/dingtalk-router.service'; +import { FeishuRouterService } from '../../../infrastructure/feishu/feishu-router.service'; import { AgentInstanceRepository } from '../../../infrastructure/repositories/agent-instance.repository'; @Controller('api/v1/agent/channels') @@ -21,6 +22,7 @@ export class AgentChannelController { constructor( private readonly dingTalkRouter: DingTalkRouterService, + private readonly feishuRouter: FeishuRouterService, private readonly instanceRepo: AgentInstanceRepository, ) {} @@ -129,6 +131,95 @@ export class AgentChannelController { return { received: true }; } + // ── Feishu endpoints ────────────────────────────────────────────────────── + + @Post('feishu/bind/:instanceId') + async feishuGenerateBindCode(@Param('instanceId') instanceId: string) { + if (!this.feishuRouter.isEnabled()) { + throw new ServiceUnavailableException('Feishu integration not configured on this server'); + } + const inst = await this.instanceRepo.findById(instanceId); + if (!inst) throw new NotFoundException(`Instance ${instanceId} not found`); + const { code, expiresAt } = this.feishuRouter.generateBindingCode(instanceId); + return { code, expiresAt }; + } + + @Get('feishu/status/:instanceId') + async feishuGetBindStatus(@Param('instanceId') instanceId: string) { + const inst = await this.instanceRepo.findById(instanceId); + if (!inst) throw new NotFoundException(`Instance ${instanceId} not found`); + return { bound: !!inst.feishuUserId }; + } + + @Post('feishu/unbind/:instanceId') + async feishuUnbind(@Param('instanceId') instanceId: string) { + const inst = await this.instanceRepo.findById(instanceId); + if (!inst) throw new NotFoundException(`Instance ${instanceId} not found`); + inst.feishuUserId = undefined; + await this.instanceRepo.save(inst); + return { unbound: true }; + } + + @Get('feishu/oauth/init') + async feishuOauthInit(@Query('instanceId') instanceId: string) { + if (!instanceId) throw new BadRequestException('instanceId is required'); + if (!this.feishuRouter.isEnabled()) { + throw new ServiceUnavailableException('Feishu integration not configured'); + } + const inst = await this.instanceRepo.findById(instanceId); + if (!inst) throw new NotFoundException(`Instance ${instanceId} not found`); + const { oauthUrl, state } = this.feishuRouter.generateOAuthUrl(instanceId); + return { oauthUrl, state }; + } + + /** Feishu OAuth callback — PUBLIC endpoint, no JWT. */ + @Get('feishu/oauth/callback') + async feishuOauthCallback( + @Query('code') code: string, + @Query('state') state: string, + @Res() res: Response, + ) { + if (!code || !state) { + return res.status(400).send(this.htmlPage('参数错误', '缺少 code 或 state 参数,请重新绑定。', false)); + } + try { + const { instanceName } = await this.feishuRouter.completeOAuthBinding(code, state); + return res.send(this.htmlPage( + '绑定成功 ✅', + `「${instanceName}」已成功与您的飞书账号绑定!\n现在可以关闭此页面,在飞书中与 iAgent 机器人对话了。`, + true, + )); + } catch (e: any) { + return res.status(400).send(this.htmlPage('绑定失败', e.message ?? '请返回 iAgent App 重新操作。', false)); + } + } + + /** + * Feishu bridge async-task callback — called by OpenClaw bridge. + * Unauthenticated; internal service only. + */ + @Post('feishu/bridge-callback') + handleFeishuBridgeCallback( + @Body() body: { + ok: boolean; + result?: string; + error?: string; + isTimeout?: boolean; + callbackData: { openId: string; msgId: string }; + }, + ) { + const { ok, result, error, isTimeout, callbackData } = body; + const { openId, msgId } = callbackData ?? {}; + this.logger.log( + `Feishu bridge callback: ok=${ok} msgId=${msgId} openId=${openId} ` + + `${ok ? `replyLen=${result?.length ?? 0}` : `error=${error} isTimeout=${isTimeout}`}`, + ); + this.feishuRouter.resolveCallbackReply( + msgId, ok, ok ? (result ?? '') : (error ?? '智能体没有返回内容。'), isTimeout, + ); + return { received: true }; + } + private htmlPage(title: string, message: string, success: boolean): string { const color = success ? '#22C55E' : '#EF4444'; const icon = success ? '✅' : '❌'; diff --git a/packages/shared/database/src/migrations/012-add-feishu-binding.sql b/packages/shared/database/src/migrations/012-add-feishu-binding.sql new file mode 100644 index 0000000..7959cca --- /dev/null +++ b/packages/shared/database/src/migrations/012-add-feishu-binding.sql @@ -0,0 +1,9 @@ +-- Migration: Add Feishu binding to agent_instances +-- Feishu open_id is per-app unique and can be used directly for proactive messaging. + +ALTER TABLE agent_instances + ADD COLUMN IF NOT EXISTS feishu_user_id VARCHAR(100); + +CREATE INDEX IF NOT EXISTS idx_agent_instances_feishu_user + ON agent_instances(feishu_user_id) + WHERE feishu_user_id IS NOT NULL;