feat(dingtalk): unified DingTalk bot router with binding flow

- Add DingTalkRouterService: maintains single DingTalk Stream WS
  connection, handles binding codes, routes messages to agent containers
- Add AgentChannelController: POST bind/:id, GET status/:id, POST unbind/:id
- Add findByDingTalkUserId() to AgentInstanceRepository
- Add dingTalkUserId field to AgentInstance entity + migration 011
- Register DingTalkRouterService + AgentChannelController in AgentModule
- Add IT0_DINGTALK_CLIENT_ID/SECRET env vars to docker-compose.yml
- Flutter: DingTalk bind UI in _InstanceCard (bottom sheet with code
  display, countdown, auto-poll, open DingTalk deep link, bound badge)

Robustness improvements in DingTalkRouterService:
  - Concurrent connect guard (connecting flag)
  - Periodic cleanup timer for dedup/rateWindows/bindingCodes maps
  - Non-text message graceful reply
  - Empty senderStaffId guard
  - serverHost null guard before bridge call
  - unref() cleanup timers from event loop

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
hailin 2026-03-08 08:12:27 -07:00
parent 20e96f31cd
commit 8751c85881
8 changed files with 1114 additions and 2 deletions

View File

@ -150,6 +150,8 @@ services:
- INTERNAL_API_KEY=${INTERNAL_API_KEY:-changeme-internal-key}
- VAULT_MASTER_KEY=${VAULT_MASTER_KEY:-dev-vault-key}
- AGENT_SERVICE_PUBLIC_URL=${AGENT_SERVICE_PUBLIC_URL}
- IT0_DINGTALK_CLIENT_ID=${IT0_DINGTALK_CLIENT_ID:-}
- IT0_DINGTALK_CLIENT_SECRET=${IT0_DINGTALK_CLIENT_SECRET:-}
healthcheck:
test: ["CMD-SHELL", "node -e \"require('http').get('http://localhost:3002/',r=>{process.exit(r.statusCode<500?0:1)}).on('error',()=>process.exit(1))\""]
interval: 30s

View File

@ -1,6 +1,10 @@
import 'dart:async';
import 'package:flutter/material.dart';
import 'package:flutter/services.dart';
import 'package:flutter_riverpod/flutter_riverpod.dart';
import 'package:it0_app/l10n/app_localizations.dart';
import 'package:url_launcher/url_launcher.dart';
import '../../../../core/config/api_endpoints.dart';
import '../../../../core/errors/error_handler.dart';
import '../../../../core/network/dio_client.dart';
@ -21,6 +25,7 @@ class AgentInstance {
final String containerName;
final String status; // deploying | running | stopped | error | removed
final String? errorMessage;
final String? dingTalkUserId;
final DateTime createdAt;
const AgentInstance({
@ -32,6 +37,7 @@ class AgentInstance {
required this.containerName,
required this.status,
this.errorMessage,
this.dingTalkUserId,
required this.createdAt,
});
@ -44,6 +50,7 @@ class AgentInstance {
containerName: j['containerName'] as String? ?? '',
status: j['status'] as String? ?? 'unknown',
errorMessage: j['errorMessage'] as String?,
dingTalkUserId: j['dingTalkUserId'] as String?,
createdAt: DateTime.tryParse(j['createdAt'] as String? ?? '') ?? DateTime.now(),
);
}
@ -207,6 +214,7 @@ class MyAgentsPage extends ConsumerWidget {
instance: inst,
onDismiss: () => _handleDismiss(context, ref, inst),
onRename: () => _handleRename(context, ref, inst),
onRefresh: () => ref.invalidate(myInstancesProvider),
);
},
childCount: instances.length * 2 - 1,
@ -313,8 +321,9 @@ class _InstanceCard extends StatelessWidget {
final AgentInstance instance;
final VoidCallback? onDismiss;
final VoidCallback? onRename;
final VoidCallback? onRefresh;
const _InstanceCard({required this.instance, this.onDismiss, this.onRename});
const _InstanceCard({required this.instance, this.onDismiss, this.onRename, this.onRefresh});
void _showActions(BuildContext context) {
showModalBottomSheet(
@ -341,6 +350,34 @@ class _InstanceCard extends StatelessWidget {
onTap: () { Navigator.pop(ctx); onRename?.call(); },
),
const Divider(height: 1),
// DingTalk binding
ListTile(
leading: Icon(
Icons.chat_bubble_outline,
color: instance.dingTalkUserId != null
? const Color(0xFF1A73E8)
: AppColors.textMuted,
),
title: Text(
instance.dingTalkUserId != null ? '重新绑定钉钉' : '绑定钉钉',
),
subtitle: instance.dingTalkUserId != null
? const Text('已绑定,点击可解绑或重新绑定', style: TextStyle(fontSize: 11))
: null,
onTap: () {
Navigator.pop(ctx);
_showDingTalkBindSheet(context);
},
),
if (instance.dingTalkUserId != null) ...[
const Divider(height: 1),
ListTile(
leading: const Icon(Icons.link_off, color: AppColors.textMuted),
title: const Text('解绑钉钉'),
onTap: () { Navigator.pop(ctx); _handleUnbind(context); },
),
],
const Divider(height: 1),
ListTile(
leading: const Icon(Icons.person_remove_outlined, color: AppColors.error),
title: Text(AppLocalizations.of(ctx).dismissButton, style: const TextStyle(color: AppColors.error)),
@ -353,6 +390,56 @@ class _InstanceCard extends StatelessWidget {
);
}
void _showDingTalkBindSheet(BuildContext context) {
showModalBottomSheet(
context: context,
isScrollControlled: true,
backgroundColor: Colors.transparent,
builder: (ctx) => _DingTalkBindSheet(instance: instance, onBound: onRefresh),
);
}
Future<void> _handleUnbind(BuildContext context) async {
// Find Dio via context (we use a builder trick below)
final confirmed = await showDialog<bool>(
context: context,
builder: (ctx) => AlertDialog(
shape: RoundedRectangleBorder(borderRadius: BorderRadius.circular(16)),
title: const Text('解绑钉钉'),
content: Text('确定要解除「${instance.name}」与钉钉的绑定吗?'),
actions: [
TextButton(onPressed: () => Navigator.pop(ctx, false), child: const Text('取消')),
FilledButton(
style: FilledButton.styleFrom(backgroundColor: AppColors.error),
onPressed: () => Navigator.pop(ctx, true),
child: const Text('解绑'),
),
],
),
);
if (confirmed != true || !context.mounted) return;
try {
// Use ProviderScope to get dio we rely on the consumer in the parent
// The parent _buildInstanceList passes onRefresh which calls ref.invalidate
// For unbind we need to fire the API; use a ProviderScope.containerOf trick:
final container = ProviderScope.containerOf(context);
final dio = container.read(dioClientProvider);
await dio.post('${ApiEndpoints.agent}/channels/dingtalk/unbind/${instance.id}');
onRefresh?.call();
if (context.mounted) {
ScaffoldMessenger.of(context).showSnackBar(
const SnackBar(content: Text('钉钉已解绑')),
);
}
} catch (e) {
if (context.mounted) {
ScaffoldMessenger.of(context).showSnackBar(
SnackBar(content: Text('解绑失败:${ErrorHandler.friendlyMessage(e)}'), backgroundColor: AppColors.error),
);
}
}
}
@override
Widget build(BuildContext context) {
final statusColor = _statusColors[instance.status] ?? AppColors.textMuted;
@ -456,6 +543,26 @@ class _InstanceCard extends StatelessWidget {
),
),
// DingTalk binding indicator
if (instance.dingTalkUserId != null) ...[
const SizedBox(height: 8),
Container(
padding: const EdgeInsets.symmetric(horizontal: 10, vertical: 6),
decoration: BoxDecoration(
color: const Color(0xFF1A73E8).withOpacity(0.08),
borderRadius: BorderRadius.circular(8),
),
child: const Row(
mainAxisSize: MainAxisSize.min,
children: [
Icon(Icons.chat_bubble_outline, size: 13, color: Color(0xFF1A73E8)),
SizedBox(width: 5),
Text('已绑定钉钉', style: TextStyle(fontSize: 11, color: Color(0xFF1A73E8), fontWeight: FontWeight.w500)),
],
),
),
],
// Error message
if (instance.errorMessage != null) ...[
const SizedBox(height: 8),
@ -610,6 +717,357 @@ class _TemplateChip extends StatelessWidget {
}
}
// ---------------------------------------------------------------------------
// DingTalk bind bottom sheet
// ---------------------------------------------------------------------------
class _DingTalkBindSheet extends StatefulWidget {
final AgentInstance instance;
final VoidCallback? onBound;
const _DingTalkBindSheet({required this.instance, this.onBound});
@override
State<_DingTalkBindSheet> createState() => _DingTalkBindSheetState();
}
class _DingTalkBindSheetState extends State<_DingTalkBindSheet> {
static const _dtScheme = 'dingtalk://';
_BindPhase _phase = _BindPhase.loading;
String _code = '';
DateTime? _expiresAt;
String? _errorMsg;
Timer? _pollTimer;
Timer? _countdownTimer;
int _secondsLeft = 0;
@override
void initState() {
super.initState();
_fetchCode();
}
@override
void dispose() {
_pollTimer?.cancel();
_countdownTimer?.cancel();
super.dispose();
}
Future<void> _fetchCode() async {
setState(() { _phase = _BindPhase.loading; _errorMsg = null; });
try {
final container = ProviderScope.containerOf(context);
final dio = container.read(dioClientProvider);
final res = await dio.post(
'${ApiEndpoints.agent}/channels/dingtalk/bind/${widget.instance.id}',
);
final data = res.data as Map<String, dynamic>;
final code = data['code'] as String;
final expiresAtMs = data['expiresAt'] as int;
final expiresAt = DateTime.fromMillisecondsSinceEpoch(expiresAtMs);
setState(() {
_code = code;
_expiresAt = expiresAt;
_secondsLeft = expiresAt.difference(DateTime.now()).inSeconds.clamp(0, 600);
_phase = _BindPhase.waitingCode;
});
_startCountdown();
_startPolling();
} catch (e) {
setState(() {
_phase = _BindPhase.error;
_errorMsg = ErrorHandler.friendlyMessage(e);
});
}
}
void _startCountdown() {
_countdownTimer?.cancel();
_countdownTimer = Timer.periodic(const Duration(seconds: 1), (_) {
if (!mounted) return;
final left = _expiresAt!.difference(DateTime.now()).inSeconds;
if (left <= 0) {
_countdownTimer?.cancel();
setState(() { _phase = _BindPhase.expired; });
_pollTimer?.cancel();
} else {
setState(() { _secondsLeft = left; });
}
});
}
void _startPolling() {
_pollTimer?.cancel();
_pollTimer = Timer.periodic(const Duration(seconds: 2), (_) async {
if (!mounted) return;
try {
final container = ProviderScope.containerOf(context);
final dio = container.read(dioClientProvider);
final res = await dio.get(
'${ApiEndpoints.agent}/channels/dingtalk/status/${widget.instance.id}',
);
final bound = (res.data as Map<String, dynamic>)['bound'] as bool? ?? false;
if (bound && mounted) {
_pollTimer?.cancel();
_countdownTimer?.cancel();
setState(() { _phase = _BindPhase.success; });
widget.onBound?.call();
}
} catch (_) {
// network hiccup keep polling
}
});
}
Future<void> _openDingTalk() async {
final uri = Uri.parse(_dtScheme);
if (await canLaunchUrl(uri)) {
await launchUrl(uri);
} else {
if (mounted) {
ScaffoldMessenger.of(context).showSnackBar(
const SnackBar(content: Text('未检测到钉钉,请先安装钉钉')),
);
}
}
}
@override
Widget build(BuildContext context) {
return Container(
decoration: const BoxDecoration(
color: Color(0xFF1A1D2E),
borderRadius: BorderRadius.vertical(top: Radius.circular(24)),
),
padding: const EdgeInsets.fromLTRB(24, 12, 24, 32),
child: Column(
mainAxisSize: MainAxisSize.min,
children: [
// Drag handle
Container(
width: 36, height: 4,
margin: const EdgeInsets.only(bottom: 20),
decoration: BoxDecoration(
color: AppColors.textMuted.withOpacity(0.3),
borderRadius: BorderRadius.circular(2),
),
),
// Title
Row(
children: [
const Icon(Icons.chat_bubble_outline, color: Color(0xFF1A73E8), size: 22),
const SizedBox(width: 10),
Text(
'绑定钉钉',
style: const TextStyle(fontSize: 18, fontWeight: FontWeight.bold, color: AppColors.textPrimary),
),
const Spacer(),
Text(
'${widget.instance.name}',
style: const TextStyle(fontSize: 12, color: AppColors.textMuted),
),
],
),
const SizedBox(height: 24),
// Content by phase
_buildPhaseContent(),
const SizedBox(height: 8),
],
),
);
}
Widget _buildPhaseContent() {
return switch (_phase) {
_BindPhase.loading => const Padding(padding: EdgeInsets.all(32), child: CircularProgressIndicator()),
_BindPhase.error => _buildError(),
_BindPhase.expired => _buildExpired(),
_BindPhase.success => _buildSuccess(),
_BindPhase.waitingCode => _buildWaitingCode(),
};
}
Widget _buildWaitingCode() {
final mm = (_secondsLeft ~/ 60).toString().padLeft(2, '0');
final ss = (_secondsLeft % 60).toString().padLeft(2, '0');
return Column(
children: [
// Steps
_buildStep('1', '打开钉钉,找到 IT0 机器人对话'),
const SizedBox(height: 10),
_buildStep('2', '发送以下验证码(区分大小写)'),
const SizedBox(height: 16),
// Code display
GestureDetector(
onTap: () {
Clipboard.setData(ClipboardData(text: _code));
ScaffoldMessenger.of(context).showSnackBar(
const SnackBar(content: Text('验证码已复制'), duration: Duration(seconds: 1)),
);
},
child: Container(
padding: const EdgeInsets.symmetric(horizontal: 28, vertical: 16),
decoration: BoxDecoration(
color: const Color(0xFF1A73E8).withOpacity(0.12),
borderRadius: BorderRadius.circular(16),
border: Border.all(color: const Color(0xFF1A73E8).withOpacity(0.4)),
),
child: Row(
mainAxisSize: MainAxisSize.min,
children: [
Text(
_code,
style: const TextStyle(
fontSize: 36,
fontWeight: FontWeight.bold,
color: Color(0xFF1A73E8),
letterSpacing: 6,
fontFamily: 'monospace',
),
),
const SizedBox(width: 12),
const Icon(Icons.copy_outlined, size: 16, color: Color(0xFF1A73E8)),
],
),
),
),
const SizedBox(height: 8),
Text(
'有效期 $mm:$ss',
style: TextStyle(
fontSize: 12,
color: _secondsLeft < 60 ? AppColors.error : AppColors.textMuted,
),
),
const SizedBox(height: 20),
// Open DingTalk button
SizedBox(
width: double.infinity,
child: OutlinedButton.icon(
icon: const Icon(Icons.open_in_new, size: 16),
label: const Text('打开钉钉'),
onPressed: _openDingTalk,
style: OutlinedButton.styleFrom(
foregroundColor: const Color(0xFF1A73E8),
side: const BorderSide(color: Color(0xFF1A73E8)),
padding: const EdgeInsets.symmetric(vertical: 12),
shape: RoundedRectangleBorder(borderRadius: BorderRadius.circular(10)),
),
),
),
const SizedBox(height: 10),
const Row(
mainAxisAlignment: MainAxisAlignment.center,
children: [
SizedBox(width: 12, height: 12, child: CircularProgressIndicator(strokeWidth: 1.5)),
SizedBox(width: 8),
Text('等待绑定完成…', style: TextStyle(fontSize: 12, color: AppColors.textMuted)),
],
),
],
);
}
Widget _buildStep(String num, String text) {
return Row(
children: [
Container(
width: 22, height: 22,
decoration: BoxDecoration(
color: const Color(0xFF1A73E8).withOpacity(0.15),
shape: BoxShape.circle,
),
child: Center(
child: Text(num, style: const TextStyle(fontSize: 12, color: Color(0xFF1A73E8), fontWeight: FontWeight.bold)),
),
),
const SizedBox(width: 10),
Expanded(child: Text(text, style: const TextStyle(fontSize: 13, color: AppColors.textSecondary))),
],
);
}
Widget _buildSuccess() {
return Column(
children: [
const Icon(Icons.check_circle_rounded, color: Color(0xFF22C55E), size: 64),
const SizedBox(height: 16),
const Text('绑定成功!', style: TextStyle(fontSize: 20, fontWeight: FontWeight.bold, color: AppColors.textPrimary)),
const SizedBox(height: 8),
Text(
'${widget.instance.name}」已与钉钉绑定\n现在可以直接在钉钉中与它对话了',
textAlign: TextAlign.center,
style: const TextStyle(fontSize: 13, color: AppColors.textMuted, height: 1.6),
),
const SizedBox(height: 24),
SizedBox(
width: double.infinity,
child: FilledButton(
onPressed: () => Navigator.pop(context),
style: FilledButton.styleFrom(
backgroundColor: const Color(0xFF22C55E),
padding: const EdgeInsets.symmetric(vertical: 12),
shape: RoundedRectangleBorder(borderRadius: BorderRadius.circular(10)),
),
child: const Text('完成'),
),
),
],
);
}
Widget _buildExpired() {
return Column(
children: [
const Icon(Icons.timer_off_outlined, color: AppColors.textMuted, size: 48),
const SizedBox(height: 12),
const Text('验证码已过期', style: TextStyle(fontSize: 16, fontWeight: FontWeight.w600, color: AppColors.textPrimary)),
const SizedBox(height: 8),
const Text('请重新获取验证码', style: TextStyle(fontSize: 13, color: AppColors.textMuted)),
const SizedBox(height: 20),
SizedBox(
width: double.infinity,
child: FilledButton(
onPressed: _fetchCode,
style: FilledButton.styleFrom(
padding: const EdgeInsets.symmetric(vertical: 12),
shape: RoundedRectangleBorder(borderRadius: BorderRadius.circular(10)),
),
child: const Text('重新获取'),
),
),
],
);
}
Widget _buildError() {
return Column(
children: [
const Icon(Icons.error_outline, color: AppColors.error, size: 48),
const SizedBox(height: 12),
Text(_errorMsg ?? '获取验证码失败', style: const TextStyle(fontSize: 13, color: AppColors.error), textAlign: TextAlign.center),
const SizedBox(height: 20),
SizedBox(
width: double.infinity,
child: OutlinedButton(
onPressed: _fetchCode,
child: const Text('重试'),
),
),
],
);
}
}
enum _BindPhase { loading, waitingCode, success, expired, error }
// ---------------------------------------------------------------------------
// Dismiss confirm dialog
// ---------------------------------------------------------------------------

View File

@ -52,6 +52,8 @@ import { AgentInstance } from './domain/entities/agent-instance.entity';
import { AgentInstanceRepository } from './infrastructure/repositories/agent-instance.repository';
import { AgentInstanceDeployService } from './infrastructure/services/agent-instance-deploy.service';
import { AgentInstanceController } from './interfaces/rest/controllers/agent-instance.controller';
import { AgentChannelController } from './interfaces/rest/controllers/agent-channel.controller';
import { DingTalkRouterService } from './infrastructure/dingtalk/dingtalk-router.service';
import { SystemPromptBuilder } from './infrastructure/engines/claude-code-cli/system-prompt-builder';
@Module({
@ -68,7 +70,7 @@ import { SystemPromptBuilder } from './infrastructure/engines/claude-code-cli/sy
AgentController, SessionController, RiskRulesController,
TenantAgentConfigController, AgentConfigController, VoiceConfigController,
VoiceSessionController, SkillsController, HooksController,
AgentInstanceController,
AgentInstanceController, AgentChannelController,
],
providers: [
AgentStreamGateway,
@ -100,6 +102,7 @@ import { SystemPromptBuilder } from './infrastructure/engines/claude-code-cli/sy
OpenAISttService,
AgentInstanceRepository,
AgentInstanceDeployService,
DingTalkRouterService,
SystemPromptBuilder,
],
})

View File

@ -44,6 +44,9 @@ export class AgentInstance {
@Column({ type: 'text', name: 'openclaw_token_iv', nullable: true })
openclawTokenIv?: string;
@Column({ type: 'varchar', length: 100, name: 'dingtalk_user_id', nullable: true })
dingTalkUserId?: string;
@Column({ type: 'jsonb', default: {} })
config!: Record<string, unknown>;

View File

@ -0,0 +1,583 @@
/**
* DingTalk Router Service
*
* IT0 unified DingTalk bot one central connection for all agent instances.
*
* Responsibilities:
* 1. Maintain a DingTalk Stream connection using IT0's official App credentials.
* 2. Handle binding codes: user sends code maps their DingTalk ID to an agent instance.
* 3. Route regular messages: looks up bound instance POSTs to bridge /task replies.
*
* Required env vars:
* IT0_DINGTALK_CLIENT_ID IT0 official DingTalk AppKey
* IT0_DINGTALK_CLIENT_SECRET IT0 official DingTalk AppSecret
*
* Binding flow:
* 1. Frontend calls POST /api/v1/agent/channels/dingtalk/bind/:instanceId
* 2. Backend returns { code: "A3K9F2", expiresAt }
* 3. User opens DingTalk IT0 bot sends "A3K9F2"
* 4. This service matches the code saves dingTalkUserId replies "✅ 绑定成功"
* 5. Frontend polls GET /api/v1/agent/channels/dingtalk/status/:instanceId { bound: true }
*/
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import * as https from 'https';
import * as crypto from 'crypto';
import * as http from 'http';
import WebSocket from 'ws';
import { AgentInstanceRepository } from '../repositories/agent-instance.repository';
// ── Types ─────────────────────────────────────────────────────────────────────
interface DtFrame {
type: string;
headers: Record<string, string>;
data?: string;
}
interface BotMsg {
senderStaffId: string;
sessionWebhook: string;
/** Unix timestamp in ms — after this the webhook URL is no longer valid */
sessionWebhookExpiredTime: number;
text?: { content: string };
msgtype?: string;
conversationId: string;
msgId: string;
}
interface BindingEntry {
instanceId: string;
expiresAt: number;
}
// ── Constants ─────────────────────────────────────────────────────────────────
const DINGTALK_MAX_CHARS = 4800;
const CODE_TTL_MS = 5 * 60 * 1000; // 5 min
const TOKEN_REFRESH_BUFFER = 300; // seconds before expiry
const WS_RECONNECT_BASE_MS = 2_000;
const WS_RECONNECT_MAX_MS = 60_000;
const TASK_TIMEOUT_S = 30;
const DEDUP_TTL_MS = 10 * 60 * 1000;
const RATE_LIMIT_PER_MIN = 10;
const QUEUE_MAX_DEPTH = 5;
const CLEANUP_INTERVAL_MS = 5 * 60 * 1000; // periodic map cleanup every 5 min
// ── Service ───────────────────────────────────────────────────────────────────
@Injectable()
export class DingTalkRouterService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(DingTalkRouterService.name);
private readonly clientId: string;
private readonly clientSecret: string;
private readonly enabled: boolean;
// Token
private token = '';
private tokenExpiresAt = 0;
private tokenRefreshTimer?: NodeJS.Timeout;
private tokenRefreshPromise: Promise<string> | null = null;
// WS
private ws: WebSocket | null = null;
private connecting = false; // guard against concurrent connectStream() calls
private reconnectDelay = WS_RECONNECT_BASE_MS;
private stopping = false;
private reconnectTimer?: NodeJS.Timeout;
private cleanupTimer?: NodeJS.Timeout;
// State
private readonly bindingCodes = new Map<string, BindingEntry>(); // code → entry
private readonly dedup = new Map<string, number>(); // msgId → ts
private readonly rateWindows = new Map<string, number[]>(); // userId → timestamps
private readonly queueTails = new Map<string, Promise<void>>(); // userId → tail
private readonly queueDepths = new Map<string, number>(); // userId → depth
constructor(
private readonly configService: ConfigService,
private readonly instanceRepo: AgentInstanceRepository,
) {
this.clientId = this.configService.get<string>('IT0_DINGTALK_CLIENT_ID', '');
this.clientSecret = this.configService.get<string>('IT0_DINGTALK_CLIENT_SECRET', '');
this.enabled = !!(this.clientId && this.clientSecret);
}
onModuleInit(): void {
if (!this.enabled) {
this.logger.warn('IT0_DINGTALK_CLIENT_ID/SECRET not set — DingTalk router disabled');
return;
}
this.logger.log('DingTalk router starting...');
this.connectStream().catch((e) =>
this.logger.error('Initial stream connection failed:', (e as Error).message),
);
// Periodic cleanup to prevent in-memory map growth
this.cleanupTimer = setInterval(() => this.periodicCleanup(), CLEANUP_INTERVAL_MS);
if (this.cleanupTimer.unref) this.cleanupTimer.unref();
}
onModuleDestroy(): void {
this.stopping = true;
clearInterval(this.cleanupTimer);
clearTimeout(this.reconnectTimer);
clearTimeout(this.tokenRefreshTimer);
this.ws?.close();
}
isEnabled(): boolean {
return this.enabled;
}
// ── Binding code API (called by controller) ────────────────────────────────
generateBindingCode(instanceId: string): { code: string; expiresAt: number } {
// Invalidate any existing code for this instance
for (const [code, entry] of this.bindingCodes) {
if (entry.instanceId === instanceId) this.bindingCodes.delete(code);
}
const code = crypto.randomBytes(3).toString('hex').toUpperCase(); // e.g. "A3K9F2"
const expiresAt = Date.now() + CODE_TTL_MS;
this.bindingCodes.set(code, { instanceId, expiresAt });
return { code, expiresAt };
}
// ── Token management ───────────────────────────────────────────────────────
private async getToken(): Promise<string> {
if (this.token && Date.now() < this.tokenExpiresAt - TOKEN_REFRESH_BUFFER * 1000) {
return this.token;
}
if (this.tokenRefreshPromise) return this.tokenRefreshPromise;
this.tokenRefreshPromise = this.refreshToken().finally(() => {
this.tokenRefreshPromise = null;
});
return this.tokenRefreshPromise;
}
private async refreshToken(): Promise<string> {
const { accessToken, expireIn } = await this.httpsPost<{ accessToken: string; expireIn: number }>(
'api.dingtalk.com', '/v1.0/oauth2/accessToken',
{ appKey: this.clientId, appSecret: this.clientSecret },
);
this.token = accessToken;
this.tokenExpiresAt = Date.now() + expireIn * 1000;
clearTimeout(this.tokenRefreshTimer);
const refreshInMs = Math.max((expireIn - TOKEN_REFRESH_BUFFER) * 1000, 60_000);
this.tokenRefreshTimer = setTimeout(() => {
this.getToken().catch((e: Error) => this.logger.error('Token refresh failed:', e.message));
}, refreshInMs);
if (this.tokenRefreshTimer.unref) this.tokenRefreshTimer.unref();
this.logger.log(`DingTalk token refreshed, valid ${expireIn}s`);
return this.token;
}
// ── Stream connection ──────────────────────────────────────────────────────
private async connectStream(): Promise<void> {
if (this.stopping || this.connecting) return;
this.connecting = true;
try {
await this.doConnect();
} finally {
this.connecting = false;
}
}
private async doConnect(): Promise<void> {
let token: string;
try {
token = await this.getToken();
} catch (e: any) {
this.logger.error('Cannot get DingTalk token:', e.message);
this.scheduleReconnect();
return;
}
let wsInfo: { endpoint: string; ticket: string };
try {
wsInfo = await this.httpsPost<{ endpoint: string; ticket: string }>(
'api.dingtalk.com', '/v1.0/gateway/connections/open',
{
clientId: this.clientId,
clientSecret: this.clientSecret,
subscriptions: [{ type: 'CALLBACK', topic: '/v1.0/im/bot/messages/get' }],
ua: 'it0-dingtalk-router/1.0',
localIp: '127.0.0.1',
},
{ 'x-acs-dingtalk-access-token': token },
);
} catch (e: any) {
this.logger.error('Failed to get stream endpoint:', e.message);
this.scheduleReconnect();
return;
}
const ws = new WebSocket(`${wsInfo.endpoint}?ticket=${encodeURIComponent(wsInfo.ticket)}`);
this.ws = ws;
ws.on('open', () => {
this.logger.log('DingTalk stream connected');
this.reconnectDelay = WS_RECONNECT_BASE_MS;
});
ws.on('message', (raw) => {
let frame: DtFrame;
try { frame = JSON.parse(raw.toString()); } catch { return; }
this.handleFrame(ws, frame);
});
ws.on('close', (code, reason) => {
if (this.stopping) return;
this.logger.warn(`Stream closed (${code}: ${reason.toString()})`);
this.scheduleReconnect();
});
ws.on('error', (e) => {
// 'close' fires after 'error' so reconnect is handled there
this.logger.error('Stream WS error:', e.message);
});
}
private scheduleReconnect(): void {
if (this.stopping) return;
clearTimeout(this.reconnectTimer);
this.reconnectTimer = setTimeout(() => {
this.connectStream().catch((e: Error) =>
this.logger.error('Reconnect failed:', e.message),
);
}, this.reconnectDelay);
this.reconnectDelay = Math.min(this.reconnectDelay * 2, WS_RECONNECT_MAX_MS);
}
// ── Frame handling ─────────────────────────────────────────────────────────
private handleFrame(ws: WebSocket, frame: DtFrame): void {
if (frame.type === 'PING') {
ws.send(JSON.stringify({ code: 200, headers: frame.headers, message: 'OK', data: '' }));
return;
}
if (
frame.type === 'CALLBACK' &&
frame.headers?.['topic'] === '/v1.0/im/bot/messages/get' &&
frame.data
) {
let msg: BotMsg;
try { msg = JSON.parse(frame.data); } catch { return; }
// ACK within 1.5s — must be synchronous before any async work
ws.send(JSON.stringify({ code: 200, headers: frame.headers, message: 'OK', data: '' }));
this.dispatchMessage(msg);
}
}
private dispatchMessage(msg: BotMsg): void {
const userId = msg.senderStaffId?.trim();
if (!userId) {
this.logger.warn('Received message with empty senderStaffId, ignoring');
return;
}
// Non-text message types (image, file, richText, audio, video, etc.)
const text = msg.text?.content?.trim() ?? '';
if (!text) {
this.reply(msg, '我目前只能处理文字消息,请发送文字与小龙虾沟通。');
return;
}
// Deduplication
if (this.dedup.has(msg.msgId)) return;
this.dedup.set(msg.msgId, Date.now());
// Binding code check (case-insensitive, strip surrounding whitespace)
const upperText = text.toUpperCase();
const entry = this.bindingCodes.get(upperText);
if (entry) {
if (Date.now() > entry.expiresAt) {
this.bindingCodes.delete(upperText);
this.reply(msg, '验证码已过期,请重新在 IT0 App 中生成新的验证码。');
return;
}
this.bindingCodes.delete(upperText);
this.completeBinding(entry.instanceId, userId, msg).catch((e: Error) =>
this.logger.error('completeBinding unhandled error:', e.message),
);
return;
}
// Rate limit
if (!this.rateAllow(userId)) {
this.reply(msg, '消息频率过高请稍后再试每分钟最多10条。');
return;
}
// Route to agent container (serial per-user queue)
const accepted = this.enqueue(userId, () => this.routeToAgent(userId, text, msg));
if (!accepted) {
this.reply(msg, '当前请求排队已满最多5条请稍后再试。');
}
}
// ── Binding completion ─────────────────────────────────────────────────────
private async completeBinding(instanceId: string, dingTalkUserId: string, msg: BotMsg): Promise<void> {
try {
const instance = await this.instanceRepo.findById(instanceId);
if (!instance) {
this.reply(msg, '绑定失败:智能体实例不存在,请重新操作。');
return;
}
instance.dingTalkUserId = dingTalkUserId;
await this.instanceRepo.save(instance);
this.logger.log(`Bound instance ${instanceId} to DingTalk user ${dingTalkUserId}`);
this.reply(
msg,
`✅ 绑定成功!\n\n你的小龙虾「${instance.name}」已与钉钉绑定。\n\n现在直接发消息给我我会帮你转达给它`,
);
} catch (e: any) {
this.logger.error('Binding failed:', e.message);
this.reply(msg, '绑定时出现错误,请稍后重试。');
}
}
// ── Message routing ────────────────────────────────────────────────────────
private async routeToAgent(userId: string, text: string, msg: BotMsg): Promise<void> {
const instance = await this.instanceRepo.findByDingTalkUserId(userId);
if (!instance) {
this.reply(
msg,
'你还没有绑定小龙虾。\n\n请在 IT0 App 中创建一只小龙虾,然后点击「绑定钉钉」获取验证码。',
);
return;
}
if (instance.status !== 'running') {
this.reply(msg, `小龙虾「${instance.name}」当前状态为 ${instance.status},暂时无法接收指令。`);
return;
}
if (!instance.serverHost) {
this.logger.error(`Instance ${instance.id} has no serverHost configured`);
this.reply(msg, '小龙虾配置异常(缺少服务器地址),请联系管理员。');
return;
}
const bridgeUrl = `http://${instance.serverHost}:${instance.hostPort}/task`;
let reply: string;
try {
const result = await this.httpPostJson<{ ok: boolean; result?: unknown; error?: string }>(
bridgeUrl,
{
prompt: text,
sessionKey: `agent:main:dt-${userId}`,
timeoutSeconds: TASK_TIMEOUT_S,
},
(TASK_TIMEOUT_S + 5) * 1000,
);
if (result.ok && result.result !== undefined) {
reply = typeof result.result === 'string'
? result.result
: JSON.stringify(result.result, null, 2);
} else {
reply = result.error ?? '智能体没有返回内容。';
}
} catch (e: any) {
this.logger.error(`Bridge call failed for instance ${instance.id}:`, e.message);
reply = '与小龙虾通信时出现错误,请稍后重试。';
}
this.reply(msg, reply);
}
// ── Reply (chunked) ────────────────────────────────────────────────────────
private reply(msg: BotMsg, content: string): void {
if (Date.now() > msg.sessionWebhookExpiredTime) {
this.logger.warn('sessionWebhook expired, cannot reply to msgId=' + msg.msgId);
return;
}
// Strip stack-trace lines to avoid leaking internals
const safe = content.replace(/\s+at\s+\S+:\d+:\d+/g, '').trim();
const chunks: string[] = [];
for (let i = 0; i < safe.length; i += DINGTALK_MAX_CHARS) {
chunks.push(safe.slice(i, i + DINGTALK_MAX_CHARS));
}
if (chunks.length === 0) chunks.push('(空响应)');
chunks.forEach((chunk, idx) =>
setTimeout(() => this.sendWebhook(msg.sessionWebhook, chunk), idx * 300),
);
}
private sendWebhook(webhook: string, content: string): void {
try {
const url = new URL(webhook);
const body = JSON.stringify({ msgtype: 'text', text: { content } });
const req = https.request(
{
hostname: url.hostname,
path: url.pathname + url.search,
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(body),
},
timeout: 10_000,
},
(res) => { res.resume(); },
);
req.on('timeout', () => { req.destroy(); });
req.on('error', (e) => this.logger.error('Webhook error:', e.message));
req.write(body);
req.end();
} catch (e: any) {
this.logger.error('sendWebhook failed:', e.message);
}
}
// ── Per-user serial queue ──────────────────────────────────────────────────
private enqueue(userId: string, task: () => Promise<void>): boolean {
const depth = this.queueDepths.get(userId) ?? 0;
if (depth >= QUEUE_MAX_DEPTH) return false;
this.queueDepths.set(userId, depth + 1);
const tail = this.queueTails.get(userId) ?? Promise.resolve();
const next = tail
.then(task)
.catch((e: Error) => this.logger.error(`Queue task error (${userId}):`, e.message))
.finally(() => {
const remaining = (this.queueDepths.get(userId) ?? 1) - 1;
if (remaining <= 0) {
this.queueDepths.delete(userId);
this.queueTails.delete(userId);
} else {
this.queueDepths.set(userId, remaining);
}
});
this.queueTails.set(userId, next);
return true;
}
// ── Rate limiter (sliding window) ─────────────────────────────────────────
private rateAllow(userId: string): boolean {
const now = Date.now();
const timestamps = (this.rateWindows.get(userId) ?? []).filter((t) => now - t < 60_000);
if (timestamps.length >= RATE_LIMIT_PER_MIN) {
this.rateWindows.set(userId, timestamps); // store pruned list
return false;
}
timestamps.push(now);
this.rateWindows.set(userId, timestamps);
return true;
}
// ── Periodic cleanup (prevent unbounded map growth) ────────────────────────
private periodicCleanup(): void {
const now = Date.now();
// Dedup: remove entries older than DEDUP_TTL_MS
for (const [id, ts] of this.dedup) {
if (now - ts > DEDUP_TTL_MS) this.dedup.delete(id);
}
// Rate windows: remove users with no recent activity
for (const [userId, timestamps] of this.rateWindows) {
const fresh = timestamps.filter((t) => now - t < 60_000);
if (fresh.length === 0) this.rateWindows.delete(userId);
else this.rateWindows.set(userId, fresh);
}
// Binding codes: remove expired codes
for (const [code, entry] of this.bindingCodes) {
if (now > entry.expiresAt) this.bindingCodes.delete(code);
}
}
// ── HTTP helpers ───────────────────────────────────────────────────────────
private httpsPost<T>(
hostname: string,
path: string,
payload: object,
extraHeaders: Record<string, string> = {},
): Promise<T> {
return new Promise((resolve, reject) => {
const body = JSON.stringify(payload);
const req = https.request(
{
hostname, path, method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(body),
...extraHeaders,
},
timeout: 10_000,
},
(res) => {
let data = '';
res.on('data', (c) => (data += c));
res.on('end', () => {
try {
const json = JSON.parse(data);
if (res.statusCode && res.statusCode >= 400) {
reject(new Error(`HTTP ${res.statusCode}: ${data.slice(0, 200)}`));
} else {
resolve(json as T);
}
} catch (e) { reject(e); }
});
},
);
req.on('timeout', () => { req.destroy(); reject(new Error('Request timeout')); });
req.on('error', reject);
req.write(body);
req.end();
});
}
/** HTTP POST to an internal bridge container (plain http, no TLS) */
private httpPostJson<T>(url: string, payload: object, timeoutMs = 35_000): Promise<T> {
return new Promise((resolve, reject) => {
const parsed = new URL(url);
const body = JSON.stringify(payload);
const req = http.request(
{
hostname: parsed.hostname,
port: parseInt(parsed.port, 10),
path: parsed.pathname,
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(body),
},
timeout: timeoutMs,
},
(res) => {
let data = '';
res.on('data', (c) => (data += c));
res.on('end', () => {
try {
const json = JSON.parse(data);
if (res.statusCode && res.statusCode >= 400) {
reject(new Error(`Bridge HTTP ${res.statusCode}: ${data.slice(0, 200)}`));
} else {
resolve(json as T);
}
} catch (e) { reject(e); }
});
},
);
req.on('timeout', () => { req.destroy(); reject(new Error('Bridge request timeout')); });
req.on('error', reject);
req.write(body);
req.end();
});
}
}

View File

@ -26,6 +26,10 @@ export class AgentInstanceRepository {
return this.repo.find({ where: { poolServerId, status: 'running' } as any });
}
findByDingTalkUserId(dingTalkUserId: string): Promise<AgentInstance | null> {
return this.repo.findOne({ where: { dingTalkUserId } as any });
}
save(instance: AgentInstance): Promise<AgentInstance> {
return this.repo.save(instance);
}

View File

@ -0,0 +1,49 @@
import {
Controller,
Post,
Get,
Param,
NotFoundException,
ServiceUnavailableException,
} from '@nestjs/common';
import { DingTalkRouterService } from '../../../infrastructure/dingtalk/dingtalk-router.service';
import { AgentInstanceRepository } from '../../../infrastructure/repositories/agent-instance.repository';
@Controller('api/v1/agent/channels')
export class AgentChannelController {
constructor(
private readonly dingTalkRouter: DingTalkRouterService,
private readonly instanceRepo: AgentInstanceRepository,
) {}
/** Generate a 6-char binding code for the given agent instance */
@Post('dingtalk/bind/:instanceId')
async generateBindCode(@Param('instanceId') instanceId: string) {
if (!this.dingTalkRouter.isEnabled()) {
throw new ServiceUnavailableException('DingTalk integration not configured on this server');
}
const inst = await this.instanceRepo.findById(instanceId);
if (!inst) throw new NotFoundException(`Instance ${instanceId} not found`);
const { code, expiresAt } = this.dingTalkRouter.generateBindingCode(instanceId);
return { code, expiresAt };
}
/** Check if the given instance is already bound to a DingTalk user */
@Get('dingtalk/status/:instanceId')
async getBindStatus(@Param('instanceId') instanceId: string) {
const inst = await this.instanceRepo.findById(instanceId);
if (!inst) throw new NotFoundException(`Instance ${instanceId} not found`);
return { bound: !!inst.dingTalkUserId };
}
/** Unbind the DingTalk user from the given agent instance */
@Post('dingtalk/unbind/:instanceId')
async unbind(@Param('instanceId') instanceId: string) {
const inst = await this.instanceRepo.findById(instanceId);
if (!inst) throw new NotFoundException(`Instance ${instanceId} not found`);
inst.dingTalkUserId = undefined;
await this.instanceRepo.save(inst);
return { unbound: true };
}
}

View File

@ -0,0 +1,10 @@
-- Add DingTalk user binding to agent_instances
-- Allows a DingTalk user to be bound to a specific agent instance
-- so the central DingTalk router can forward messages to the correct container.
ALTER TABLE agent_instances
ADD COLUMN IF NOT EXISTS dingtalk_user_id VARCHAR(100); -- DingTalk senderStaffId
CREATE INDEX IF NOT EXISTS idx_agent_instances_dingtalk_user
ON agent_instances(dingtalk_user_id)
WHERE dingtalk_user_id IS NOT NULL;