119 KiB
IT0 后端开发指导文档
IT Operations Intelligent Agent — 基于 Claude Code 本体的服务器集群运维智能体
1. 项目概述
1.1 定位
IT0 是一个服务器集群运维智能体平台,核心理念是将 Claude Code 作为运维大脑,通过自建的调度层和安全层,实现 AI 驱动的自动化运维。
1.2 核心设计原则
- 引擎可替换:Claude Code CLI / Claude API / 自研 Agent 可无缝切换
- 安全第一:三层防御模型(分级权限 + Hook 拦截 + 架构隔离)
- 人在回路:高风险操作必须人工审批,AI 只负责"看"和"想"
- 通信多元:支持语音、文字、短信、自动电话、社交通信、邮件等多渠道通知与交互
- 多租户 SaaS:从第一天起支持多租户隔离,未来可对外提供服务
1.3 技术栈
| 层面 | 技术选型 |
|---|---|
| 主体语言 | TypeScript (NestJS) |
| 架构模式 | DDD + Clean Architecture + 微服务 |
| 数据库 | PostgreSQL (每个服务独立 schema) |
| 消息队列 | Redis Streams / Bull Queue |
| 服务通信 | gRPC (服务间) + REST (对外 API) + WebSocket (实时推送) |
| 容器化 | Docker + Docker Compose (开发) / Kubernetes (生产) |
| API 网关 | Kong (DB-less 模式,声明式配置) |
| 监控 | Prometheus + Grafana |
| 日志 | ELK Stack / Loki |
| AI Skills | Anthropic Claude Skills(.claude/skills/ 目录规范) |
| 语音对话引擎 | Pipecat (Python) + faster-whisper (STT) + Kokoro-82M (TTS) + Silero VAD |
| 电话线路 | Twilio Voice(仅作为 Pipecat 的电话线路载体,拨号/接听) |
2. 整体架构
2.1 系统架构图
┌─────────────────────────────────────────────────────────────┐
│ Flutter Android App │
│ (语音/文字交互 + 仪表盘 + 任务审批 + 日志流) │
└──────────────────────────┬──────────────────────────────────┘
│ HTTPS / WebSocket
▼
┌─────────────────────────────────────────────────────────────┐
│ API Gateway (Kong) │
│ 路由 / 认证 / 限流 / 日志 │
└──────┬────────┬────────┬────────┬────────┬────────┬─────────┘
│ │ │ │ │ │
▼ ▼ ▼ ▼ ▼ ▼
┌─────────┐┌────────┐┌────────┐┌────────┐┌────────┐┌────────┐
│ auth ││ agent ││ ops ││ inv ││monitor ││ comm │
│ service ││service ││service ││service ││service ││service │
└─────────┘└───┬────┘└────────┘└────────┘└────────┘└────────┘
│ │
▼ ▼
┌──────────────────────────┐ ┌──────────────────────┐
│ Agent Engine Layer │ │ 通信渠道适配器 │
│ ┌──────────────────────┐ │ │ ┌──────┐ ┌────────┐ │
│ │ Claude Code CLI │ │ │ │ SMS │ │ IM │ │
│ │ Claude API │ │ │ │Email │ │ Push │ │
│ │ Custom Agent │ │ │ └──────┘ └────────┘ │
│ └──────────────────────┘ │ └──────────┬───────────┘
└──────────┬───────────────┘ │
│ ┌─────────▼────────────┐
│ │ voice-service │
│ │ (Pipecat + Whisper) │
│ │ 实时语音对话引擎 │
│ └──────────────────────┘
│ SSH / kubectl / docker
▼
┌──────────────────────────┐
│ 目标服务器集群 │
│ prod-1 / prod-2 / k8s │
└──────────────────────────┘
2.2 微服务拆分
| 服务 | 端口 | 职责 | 数据库 Schema |
|---|---|---|---|
api-gateway |
8000 | 路由、认证、限流 | — |
auth-service |
3001 | 用户认证、RBAC、API Key | it0_auth |
agent-service |
3002 | 智能体引擎、会话管理、命令执行 | it0_agent |
ops-service |
3003 | 运维任务、审批工作流、Runbook | it0_ops |
inventory-service |
3004 | 资产管理、凭证管理、SSH 配置 | it0_inventory |
monitor-service |
3005 | 健康检查、指标、告警 | it0_monitor |
comm-service |
3006 | 多渠道通信(推送/短信/邮件/IM) | it0_comm |
audit-service |
3007 | 操作审计、合规日志 | it0_audit |
voice-service |
3008 | 实时语音对话引擎(Pipecat + Whisper + Kokoro) | — (无状态) |
3. DDD 限界上下文详细设计
3.1 Agent Core(智能体核心)— agent-service
这是整个系统的大脑,也是架构灵活性的关键所在。
3.1.1 Clean Architecture 分层
agent-service/
├── src/
│ ├── domain/ # 领域层(零依赖)
│ │ ├── entities/
│ │ │ ├── agent-session.entity.ts # 会话聚合根
│ │ │ ├── agent-task.entity.ts # 任务实体
│ │ │ ├── command-record.entity.ts # 命令记录
│ │ │ └── standing-order.entity.ts # ★ 驻留指令(自治任务定义)
│ │ ├── value-objects/
│ │ │ ├── agent-engine-type.vo.ts # 引擎类型(ClaudeCode/API/Custom)
│ │ │ ├── command-risk-level.vo.ts # 风险等级(L0-L3)
│ │ │ ├── task-status.vo.ts # 任务状态
│ │ │ └── session-id.vo.ts
│ │ ├── ports/ # 端口(接口定义)
│ │ │ ├── inbound/
│ │ │ │ ├── execute-task.port.ts # 执行任务用例
│ │ │ │ ├── approve-command.port.ts # 审批命令用例
│ │ │ │ ├── switch-engine.port.ts # 切换引擎用例
│ │ │ │ └── manage-standing-order.port.ts # ★ 驻留指令管理
│ │ │ └── outbound/
│ │ │ ├── agent-engine.port.ts # ★ 核心抽象 — 智能体引擎接口
│ │ │ ├── session-repository.port.ts
│ │ │ ├── command-guard.port.ts # 命令安全守卫
│ │ │ ├── event-publisher.port.ts
│ │ │ └── stream-emitter.port.ts # 流式输出发射器
│ │ ├── events/
│ │ │ ├── command-blocked.event.ts
│ │ │ ├── task-completed.event.ts
│ │ │ └── approval-required.event.ts
│ │ └── services/
│ │ ├── command-risk-classifier.ts # 命令风险分级(领域服务)
│ │ ├── session-manager.ts
│ │ └── standing-order-extractor.ts # ★ 对话 → 驻留指令提取
│ │
│ ├── application/ # 应用层(用例编排)
│ │ ├── use-cases/
│ │ │ ├── execute-task.use-case.ts
│ │ │ ├── approve-command.use-case.ts
│ │ │ ├── cancel-task.use-case.ts
│ │ │ ├── get-session-history.use-case.ts
│ │ │ └── switch-engine.use-case.ts
│ │ └── dto/
│ │ ├── execute-task.dto.ts
│ │ ├── task-result.dto.ts
│ │ └── stream-event.dto.ts
│ │
│ ├── infrastructure/ # 基础设施层(适配器实现)
│ │ ├── engines/ # ★ 引擎适配器(Strategy 模式)
│ │ │ ├── agent-engine.interface.ts # 统一接口
│ │ │ ├── claude-code-cli/
│ │ │ │ ├── claude-code-engine.ts # Claude Code CLI 适配器
│ │ │ │ ├── cli-process-manager.ts # 进程管理(spawn/kill/timeout)
│ │ │ │ ├── stream-json-parser.ts # stream-json 输出解析
│ │ │ │ ├── system-prompt-builder.ts # 运维专用 prompt 构建
│ │ │ │ └── hook-config-manager.ts # .claude/settings.json 管理
│ │ │ ├── claude-api/
│ │ │ │ ├── claude-api-engine.ts # Claude API 直连适配器
│ │ │ │ ├── tool-executor.ts # 工具执行器(Bash/Read/Write)
│ │ │ │ └── agent-loop.ts # 自建 Agent Loop
│ │ │ └── custom/
│ │ │ └── custom-engine.ts # 未来自研引擎预留
│ │ ├── guards/ # 命令安全守卫
│ │ │ ├── command-guard.service.ts # 命令拦截实现
│ │ │ ├── risk-patterns.ts # 风险命令正则库
│ │ │ └── pre-tool-use-hook.py # Claude Code Hook 脚本
│ │ ├── persistence/
│ │ │ ├── typeorm/
│ │ │ │ ├── session.orm-entity.ts
│ │ │ │ ├── task.orm-entity.ts
│ │ │ │ └── command-record.orm-entity.ts
│ │ │ └── repositories/
│ │ │ └── session.repository.ts
│ │ └── messaging/
│ │ ├── event-publisher.service.ts
│ │ └── stream-emitter.service.ts # WebSocket 流推送
│ │
│ └── interfaces/ # 接口层(控制器)
│ ├── rest/
│ │ ├── agent.controller.ts
│ │ └── session.controller.ts
│ └── ws/
│ └── agent-stream.gateway.ts # WebSocket 网关
3.1.2 ★ 核心接口:AgentEngine(引擎抽象)
这是整个架构灵活性的核心,使用 Strategy + Adapter 模式:
// domain/ports/outbound/agent-engine.port.ts
/**
* 智能体引擎端口 — 所有引擎实现必须遵循此接口
* Claude Code CLI / Claude API / 自研 Agent 均通过此接口接入
*/
export interface AgentEnginePort {
/** 引擎类型标识 */
readonly engineType: AgentEngineType;
/**
* 执行任务(流式输出)
* @returns AsyncGenerator,逐步 yield 流式事件
*/
executeTask(params: EngineTaskParams): AsyncGenerator<EngineStreamEvent>;
/** 取消正在执行的任务 */
cancelTask(sessionId: string): Promise<void>;
/** 继续会话(追加指令) */
continueSession(sessionId: string, message: string): AsyncGenerator<EngineStreamEvent>;
/** 健康检查 */
healthCheck(): Promise<boolean>;
}
/** 引擎任务参数 */
export interface EngineTaskParams {
sessionId: string;
prompt: string;
systemPrompt: string;
allowedTools: string[];
maxTurns: number;
maxBudgetUsd?: number;
/** 上下文附加信息(服务器清单、历史操作等) */
context?: Record<string, unknown>;
/** Skill 调用:如果指定,prompt 格式为 /{skillName} {args} */
skill?: {
name: string;
arguments: string;
};
}
/** 引擎流式事件(统一格式,各适配器负责转换) */
export type EngineStreamEvent =
| { type: 'thinking'; content: string }
| { type: 'text'; content: string }
| { type: 'tool_use'; toolName: string; input: Record<string, unknown> }
| { type: 'tool_result'; toolName: string; output: string; isError: boolean }
| { type: 'approval_required'; command: string; riskLevel: RiskLevel; taskId: string }
| { type: 'completed'; summary: string; tokensUsed?: number }
| { type: 'error'; message: string; code: string };
3.1.3 Claude Code CLI 引擎实现
// infrastructure/engines/claude-code-cli/claude-code-engine.ts
@Injectable()
export class ClaudeCodeCliEngine implements AgentEnginePort {
readonly engineType = AgentEngineType.CLAUDE_CODE_CLI;
constructor(
private readonly processManager: CliProcessManager,
private readonly streamParser: StreamJsonParser,
private readonly promptBuilder: SystemPromptBuilder,
private readonly configService: ConfigService,
) {}
async *executeTask(params: EngineTaskParams): AsyncGenerator<EngineStreamEvent> {
const systemPromptPath = await this.promptBuilder.buildAndSave(params);
const args = [
'-p', params.prompt,
'--system-prompt-file', systemPromptPath,
'--output-format', 'stream-json',
'--allowedTools', params.allowedTools.join(','),
'--max-turns', String(params.maxTurns),
'--session-id', params.sessionId,
'--verbose',
'--include-partial-messages',
];
if (params.maxBudgetUsd) {
args.push('--max-budget-usd', String(params.maxBudgetUsd));
}
const process = this.processManager.spawn('claude', args);
for await (const rawEvent of this.streamParser.parse(process.stdout)) {
// 将 Claude Code 原始 stream-json 事件转换为统一 EngineStreamEvent
const event = this.mapToEngineEvent(rawEvent);
if (event) yield event;
}
const exitCode = await this.processManager.waitForExit(process);
if (exitCode !== 0) {
yield { type: 'error', message: `Claude Code exited with code ${exitCode}`, code: 'CLI_EXIT_ERROR' };
}
}
// ... 其他方法
}
3.1.4 Claude API 引擎实现(备选)
// infrastructure/engines/claude-api/claude-api-engine.ts
@Injectable()
export class ClaudeApiEngine implements AgentEnginePort {
readonly engineType = AgentEngineType.CLAUDE_API;
constructor(
private readonly toolExecutor: ToolExecutor, // 自建的 Bash/Read/Write 执行器
private readonly configService: ConfigService,
) {}
async *executeTask(params: EngineTaskParams): AsyncGenerator<EngineStreamEvent> {
const client = new Anthropic({ apiKey: this.configService.get('ANTHROPIC_API_KEY') });
const tools = this.buildToolDefinitions(params.allowedTools);
let messages: MessageParam[] = [{ role: 'user', content: params.prompt }];
// 自建 Agent Loop
for (let turn = 0; turn < params.maxTurns; turn++) {
const stream = client.messages.stream({
model: this.configService.get('CLAUDE_MODEL', 'claude-sonnet-4-5-20250929'),
max_tokens: 8192,
system: params.systemPrompt,
messages,
tools,
});
for await (const event of stream) {
const mapped = this.mapStreamEvent(event);
if (mapped) yield mapped;
}
const response = await stream.finalMessage();
// 检查是否需要执行工具
const toolUseBlocks = response.content.filter(b => b.type === 'tool_use');
if (toolUseBlocks.length === 0) break; // 无工具调用,结束
// 执行工具并收集结果
const toolResults = [];
for (const block of toolUseBlocks) {
const result = await this.toolExecutor.execute(block.name, block.input);
yield { type: 'tool_result', toolName: block.name, output: result.output, isError: result.isError };
toolResults.push({ type: 'tool_result' as const, tool_use_id: block.id, content: result.output });
}
messages = [...messages,
{ role: 'assistant', content: response.content },
{ role: 'user', content: toolResults },
];
}
}
}
3.1.5 引擎切换机制
// infrastructure/engines/engine-registry.ts
@Injectable()
export class EngineRegistry {
private engines = new Map<AgentEngineType, AgentEnginePort>();
constructor(
@Optional() private readonly cliEngine: ClaudeCodeCliEngine,
@Optional() private readonly apiEngine: ClaudeApiEngine,
@Optional() private readonly customEngine: CustomEngine,
private readonly configService: ConfigService,
) {
if (cliEngine) this.engines.set(AgentEngineType.CLAUDE_CODE_CLI, cliEngine);
if (apiEngine) this.engines.set(AgentEngineType.CLAUDE_API, apiEngine);
if (customEngine) this.engines.set(AgentEngineType.CUSTOM, customEngine);
}
/** 获取当前活跃引擎 */
getActiveEngine(): AgentEnginePort {
const type = this.configService.get<AgentEngineType>(
'AGENT_ENGINE_TYPE',
AgentEngineType.CLAUDE_CODE_CLI,
);
const engine = this.engines.get(type);
if (!engine) throw new Error(`Engine ${type} not registered`);
return engine;
}
/** 运行时切换引擎 */
switchEngine(type: AgentEngineType): AgentEnginePort {
const engine = this.engines.get(type);
if (!engine) throw new Error(`Engine ${type} not available`);
// 更新配置
this.configService.set('AGENT_ENGINE_TYPE', type);
return engine;
}
/** 列出可用引擎 */
listAvailable(): AgentEngineType[] {
return [...this.engines.keys()];
}
}
3.2 Operations(运维操作)— ops-service
3.2.1 目录结构
ops-service/
├── src/
│ ├── domain/
│ │ ├── entities/
│ │ │ ├── ops-task.entity.ts # 运维任务聚合根
│ │ │ ├── approval-request.entity.ts # 审批请求
│ │ │ ├── runbook.entity.ts # 运维手册
│ │ │ ├── standing-order.entity.ts # ★ 驻留指令聚合根
│ │ │ └── standing-order-execution.entity.ts # ★ 执行记录
│ │ ├── value-objects/
│ │ │ ├── task-type.vo.ts # 巡检/部署/回滚/扩缩容/故障恢复
│ │ │ ├── approval-status.vo.ts # pending/approved/rejected
│ │ │ └── execution-result.vo.ts
│ │ ├── ports/
│ │ │ ├── inbound/
│ │ │ │ ├── create-task.port.ts
│ │ │ │ ├── approve-task.port.ts
│ │ │ │ └── execute-runbook.port.ts
│ │ │ └── outbound/
│ │ │ ├── task-repository.port.ts
│ │ │ ├── agent-client.port.ts # 调用 agent-service
│ │ │ ├── notification.port.ts # 调用 comm-service
│ │ │ └── audit-logger.port.ts # 调用 audit-service
│ │ └── services/
│ │ └── approval-workflow.service.ts # 审批流程领域服务
│ │
│ ├── application/
│ │ └── use-cases/
│ │ ├── create-inspection-task.use-case.ts # 创建巡检任务
│ │ ├── create-deployment-task.use-case.ts # 创建部署任务
│ │ ├── approve-task.use-case.ts
│ │ ├── schedule-recurring-task.use-case.ts # 定时任务
│ │ ├── execute-runbook.use-case.ts
│ │ ├── create-standing-order.use-case.ts # ★ 创建驻留指令
│ │ ├── update-standing-order.use-case.ts # ★ 修改驻留指令
│ │ └── execute-standing-order.use-case.ts # ★ 执行驻留指令
│ │
│ ├── infrastructure/
│ │ ├── persistence/typeorm/
│ │ ├── clients/
│ │ │ ├── agent-client.service.ts # gRPC 调用 agent-service
│ │ │ └── comm-client.service.ts # gRPC 调用 comm-service
│ │ └── schedulers/
│ │ ├── cron-scheduler.service.ts # 定时巡检调度
│ │ └── standing-order-executor.ts # ★ 驻留指令执行引擎
│ │
│ └── interfaces/
│ └── rest/
│ ├── task.controller.ts
│ ├── approval.controller.ts
│ └── runbook.controller.ts
3.2.2 审批工作流
// domain/services/approval-workflow.service.ts
export class ApprovalWorkflowService {
/**
* 审批流程:
* 1. agent-service 执行任务时遇到 Level 2 命令
* 2. 发布 approval_required 事件
* 3. ops-service 创建审批请求
* 4. comm-service 通知人类(推送/短信/电话)
* 5. 人类通过 Flutter App 或回复短信/邮件 审批
* 6. ops-service 发布 approval_granted 事件
* 7. agent-service 继续执行
*
* 超时策略:
* - Level 1: 5 分钟超时,自动通过
* - Level 2: 30 分钟超时,自动拒绝
* - 紧急模式: 管理员可设置特定 Runbook 自动审批
*/
async processApproval(request: ApprovalRequest): Promise<ApprovalResult> {
// ...
}
}
3.3 Inventory(资产管理)— inventory-service
inventory-service/
├── src/
│ ├── domain/
│ │ ├── entities/
│ │ │ ├── server.entity.ts # 服务器实体
│ │ │ ├── cluster.entity.ts # 集群聚合根
│ │ │ ├── credential.entity.ts # 凭证(加密存储)
│ │ │ └── ssh-config.entity.ts # SSH 配置
│ │ ├── value-objects/
│ │ │ ├── server-role.vo.ts # web/db/cache/worker/gateway
│ │ │ ├── environment.vo.ts # dev/staging/prod
│ │ │ └── connection-info.vo.ts # IP/端口/用户
│ │ └── ports/
│ │ └── outbound/
│ │ ├── server-repository.port.ts
│ │ └── credential-vault.port.ts # 凭证保险库
│ │
│ ├── infrastructure/
│ │ ├── vault/
│ │ │ └── credential-vault.service.ts # AES-256 加密存储
│ │ └── persistence/typeorm/
│ │
│ └── interfaces/
│ └── rest/
│ ├── server.controller.ts
│ └── cluster.controller.ts
关键:凭证安全
// infrastructure/vault/credential-vault.service.ts
@Injectable()
export class CredentialVaultService implements CredentialVaultPort {
/**
* 凭证存储规范:
* - SSH 私钥、密码等敏感信息 AES-256-GCM 加密后存入 PostgreSQL
* - 加密密钥从环境变量 VAULT_MASTER_KEY 读取
* - 读取时解密到内存,绝不写入日志或磁盘临时文件
* - Agent 需要 SSH 时,通过 inventory-service API 获取临时凭证
* (凭证有 TTL,用后即弃)
*/
}
3.4 Monitoring(监控告警)— monitor-service
monitor-service/
├── src/
│ ├── domain/
│ │ ├── entities/
│ │ │ ├── health-check.entity.ts
│ │ │ ├── alert-rule.entity.ts # 告警规则
│ │ │ ├── alert-event.entity.ts # 告警事件
│ │ │ └── metric-snapshot.entity.ts
│ │ ├── value-objects/
│ │ │ ├── alert-severity.vo.ts # info/warning/critical/fatal
│ │ │ ├── check-type.vo.ts # ping/http/tcp/ssh/custom
│ │ │ └── metric-type.vo.ts # cpu/mem/disk/network/custom
│ │ └── ports/outbound/
│ │ ├── metric-collector.port.ts # 指标采集
│ │ └── alert-dispatcher.port.ts # 告警分发 → comm-service
│ │
│ ├── infrastructure/
│ │ ├── collectors/
│ │ │ ├── ssh-collector.service.ts # SSH 远程采集
│ │ │ ├── prometheus-collector.ts # Prometheus 拉取
│ │ │ └── agent-collector.ts # 通过 agent-service 执行采集命令
│ │ └── schedulers/
│ │ └── health-check-scheduler.ts # 定时健康检查
│ │
│ └── interfaces/
│ └── rest/
│ ├── health-check.controller.ts
│ └── alert.controller.ts
告警 → 自动修复流程:
monitor-service 检测到异常
↓ 发布 AlertEvent
ops-service 接收告警
↓ 匹配 Runbook(自动修复手册)
↓ 如果有匹配的 Runbook 且风险等级允许
agent-service 执行修复
↓ 如果需要审批
comm-service 通知人类(推送+短信+电话升级)
↓ 人类审批
agent-service 继续执行
↓
monitor-service 验证恢复
3.5 Communication(多渠道通信)— comm-service
这是联系人类的统一出口。
comm-service/
├── src/
│ ├── domain/
│ │ ├── entities/
│ │ │ ├── message.entity.ts # 消息聚合根
│ │ │ ├── contact.entity.ts # 联系人
│ │ │ ├── channel-config.entity.ts # 渠道配置
│ │ │ └── escalation-policy.entity.ts # 升级策略
│ │ ├── value-objects/
│ │ │ ├── channel-type.vo.ts # push/sms/voice_call/email/telegram/wechat/voice_dialog
│ │ │ ├── message-priority.vo.ts # low/normal/high/urgent
│ │ │ └── delivery-status.vo.ts # pending/sent/delivered/failed
│ │ └── ports/outbound/
│ │ └── channel-adapter.port.ts # ★ 通信渠道抽象接口
│ │
│ ├── infrastructure/
│ │ ├── channels/ # ★ 渠道适配器(Strategy 模式)
│ │ │ ├── channel-adapter.interface.ts
│ │ │ ├── websocket/
│ │ │ │ └── ws-channel.adapter.ts # WebSocket 实时推送
│ │ │ ├── sms/
│ │ │ │ └── twilio-sms.adapter.ts # Twilio SMS
│ │ │ ├── voice/
│ │ │ │ ├── pipecat-voice.adapter.ts # ★ 主要:Pipecat 实时语音对话
│ │ │ │ └── twilio-voice.adapter.ts # Twilio 电话线路(Pipecat 拨号载体)
│ │ │ ├── email/
│ │ │ │ └── smtp-email.adapter.ts # SMTP 邮件
│ │ │ ├── telegram/
│ │ │ │ └── telegram-bot.adapter.ts # Telegram Bot
│ │ │ ├── wechat/
│ │ │ │ └── wechat-work.adapter.ts # 企业微信
│ │ │ └── voice-client/
│ │ │ └── voice-service-client.ts # HTTP/gRPC 调用 voice-service
│ │ ├── escalation/
│ │ │ └── escalation-engine.service.ts # 升级引擎
│ │ └── persistence/typeorm/
│ │
│ └── interfaces/
│ ├── rest/
│ │ ├── message.controller.ts
│ │ └── contact.controller.ts
│ └── ws/
│ └── realtime.gateway.ts
3.5.1 通信渠道抽象
// domain/ports/outbound/channel-adapter.port.ts
export interface ChannelAdapterPort {
readonly channelType: ChannelType;
/** 发送消息(文字/语音/富文本) */
send(params: SendMessageParams): Promise<DeliveryResult>;
/** 接收消息(双向渠道如 Telegram/微信) */
onMessage?(callback: (msg: IncomingMessage) => void): void;
/** 渠道是否可用 */
isAvailable(): Promise<boolean>;
}
export interface SendMessageParams {
to: Contact;
content: string;
priority: MessagePriority;
/** 语音电话场景:TTS 文本 */
ttsText?: string;
/** 审批场景:附带审批链接或回复指令 */
approvalAction?: {
taskId: string;
approveKeyword: string; // 如 "Y" 或 "approve"
rejectKeyword: string; // 如 "N" 或 "reject"
};
}
3.5.2 升级策略(Escalation)
// infrastructure/escalation/escalation-engine.service.ts
/**
* 通知升级策略(分层递进):
*
* ┌─── 第一层:推送 + 电话 ─────────────────────────────────────┐
* │ 1. WebSocket + FCM 推送到 Flutter App(通知卡片) │
* │ 用户打开 App → Pipecat 语音对话(App 内 WebSocket 音频) │
* │ 2. 如果 2 分钟内无响应 → Pipecat 通过 Twilio 拨出电话 │
* │ 管理员接听 → Agent 直接开口说话(同一 Pipecat 引擎) │
* │ "您好,prod-2 磁盘 97%,我建议清理日志,您同意吗?" │
* │ 管理员语音回复 → Agent 理解并执行 │
* └────────────────────────────────────────────────────────────┘
*
* ┌─── 第二层:扩大通知 ──────────────────────────────────────┐
* │ 如果 5 分钟内仍无响应 → 短信 + IM (Telegram/企业微信) │
* │ 如果 10 分钟内无响应 → 通知备用联系人(重复第一层) │
* │ 如果 20 分钟内无响应 → 邮件通知管理组(完整报告) │
* └────────────────────────────────────────────────────────────┘
*
* 升级策略可按告警级别自定义:
* - info: 只推送 App
* - warning: App + 短信/IM
* - critical: App + Pipecat 电话语音对话
* - fatal: 全渠道 + 备用联系人 + 管理组
*/
3.5.3 语音交互流程(Pipecat 引擎)
IT0 使用 Pipecat(GitHub 10.1k stars)作为实时语音对话引擎,自部署 STT/TTS 模型,不依赖外部语音 API。
主要场景:App 内语音对话
Flutter App(麦克风录音)
↓ WebSocket 音频流 (PCM 16kHz)
voice-service (Pipecat Pipeline)
├── Silero VAD → 检测语音活动 + 打断
├── faster-whisper → STT(语音转文字)
├── Claude API → LLM 理解 + 决策
└── Kokoro-82M → TTS(文字转语音)
↓ WebSocket 音频流
Flutter App 实时播放语音回复
↓ 同步显示实时转写文字(辅助理解 + 留痕)
电话场景:Pipecat 主动拨号(2分钟 App 无响应后)
voice-service (Pipecat) 发起拨号请求
↓ Twilio REST API: 拨出电话到管理员手机
管理员手机响铃 → 接听
↓ Twilio Media Streams (WebSocket 双向音频)
voice-service (Pipecat Pipeline) ← 同一个引擎
├── 与 App 内语音对话使用完全相同的 Pipeline
├── Agent 接通后立即开口:"您好,我是 IT0 Agent..."
└── 唯一区别:音频通过 Twilio 电话线而非 Flutter WebSocket
↓ Twilio Media Streams
管理员通过电话与 Agent 语音对话
关键设计决策:
- Pipecat 是唯一的语音对话引擎,无论音频来自 App 还是电话线
- Twilio 只是电话线路载体(拨号/接听),不负责任何 STT/TTS/IVR 逻辑
- 无 IVR 菜单:不需要"按 1 按 2",Agent 接通就说话,管理员直接语音回复
- 所有 STT/TTS 在本地 GPU 运行(faster-whisper + Kokoro),延迟 < 500ms
- 支持打断(barge-in):用户开始说话时 Pipecat 自动暂停 TTS 播放
3.5.4 Voice Dialogue Service(voice-service)
voice-service 是一个 Python 微服务,基于 Pipecat 框架运行实时语音对话 Pipeline。
voice-service/ # Python (FastAPI + Pipecat)
├── app/
│ ├── main.py # FastAPI 入口
│ ├── config.py # 配置(模型路径、端口、Claude API Key)
│ ├── pipelines/
│ │ ├── base_pipeline.py # ★ Pipecat Pipeline 定义
│ │ │ # VAD → STT → LLM → TTS → Audio Output
│ │ ├── app_transport.py # Flutter WebSocket 音频传输
│ │ └── twilio_transport.py # Twilio Media Streams 音频传输
│ ├── services/
│ │ ├── stt_service.py # faster-whisper 配置(模型: large-v3, GPU)
│ │ ├── tts_service.py # Kokoro-82M 配置(中/英双语)
│ │ ├── vad_service.py # Silero VAD 配置
│ │ └── llm_service.py # Pipecat AnthropicLLMService (Claude API)
│ ├── context/
│ │ ├── session_context.py # 对话上下文管理(从 agent-service 获取)
│ │ └── tool_handler.py # Agent tool 调用转发给 agent-service
│ └── api/
│ ├── health.py # 健康检查
│ ├── session_router.py # 创建/结束语音对话会话
│ └── twilio_webhook.py # Twilio Media Streams WebSocket 端点
├── models/ # 本地模型文件
│ ├── whisper-large-v3/ # faster-whisper 模型
│ ├── kokoro-82m/ # Kokoro TTS 模型
│ └── silero-vad/ # Silero VAD 模型
├── Dockerfile
├── requirements.txt
└── docker-compose.voice.yml
# app/pipelines/base_pipeline.py — Pipecat Pipeline 核心
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.services.anthropic import AnthropicLLMService
from pipecat.services.faster_whisper import FasterWhisperSTTService
from pipecat.services.kokoro import KokoroTTSService
from pipecat.vad.silero import SileroVADAnalyzer
async def create_voice_pipeline(
transport, # AppTransport 或 TwilioTransport
session_context, # 对话上下文(驻留指令、服务器信息等)
):
"""
创建 Pipecat 语音对话 Pipeline
- 音频输入 → VAD → STT → LLM → TTS → 音频输出
- 支持打断:用户说话时自动停止 TTS
- 支持 tool_use:Agent 可调用 agent-service 的工具
"""
stt = FasterWhisperSTTService(
model="large-v3",
language="zh", # 中文为主,自动检测英文
device="cuda",
)
llm = AnthropicLLMService(
model="claude-sonnet-4-5-20250929",
system_prompt=session_context.system_prompt,
tools=session_context.available_tools,
)
tts = KokoroTTSService(
model="kokoro-82m",
voice="zh_female_1", # 中文女声
)
pipeline = Pipeline([
transport.input(), # 音频输入
SileroVADAnalyzer(), # 语音活动检测
stt, # 语音 → 文字
llm, # 文字 → Agent 回复
tts, # 回复 → 语音
transport.output(), # 音频输出
])
return PipelineTask(pipeline, allow_interruptions=True)
# app/pipelines/app_transport.py — Flutter App WebSocket 传输
from pipecat.transports.network.websocket import WebSocketServerTransport
class AppTransport(WebSocketServerTransport):
"""
Flutter App 通过 WebSocket 发送/接收音频流
- 输入:PCM 16kHz 16bit 单声道(Flutter 录音格式)
- 输出:PCM 16kHz 16bit 单声道(Flutter 播放格式)
"""
def __init__(self, websocket):
super().__init__(
websocket=websocket,
params=WebSocketServerParams(
audio_in_sample_rate=16000,
audio_out_sample_rate=16000,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
# app/pipelines/twilio_transport.py — Twilio 电话线兜底传输
from pipecat.transports.services.twilio import TwilioTransport
class TwilioPhoneTransport(TwilioTransport):
"""
Twilio 电话线传输:Pipecat 主动拨号 → 管理员接听 → Agent 直接开口说话
- Pipecat 调用 Twilio REST API 拨出电话
- 接通后通过 Media Streams WebSocket 双向传输音频
- Agent 接通即开始语音汇报,无 IVR 菜单
- 音频编解码:mu-law 8kHz(电话标准)↔ PCM 16kHz(Whisper 输入)
"""
async def initiate_call(self, phone_number: str, voice_session_id: str):
"""主动拨出电话(由 SmartEscalationService 触发)"""
# Twilio REST API 创建呼叫,连接到 Media Streams
pass
Docker 配置:
# docker-compose.voice.yml
voice-service:
build: ./packages/services/voice-service
ports:
- "3008:3008"
environment:
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- AGENT_SERVICE_URL=http://agent-service:3002
- WHISPER_MODEL=large-v3
- KOKORO_MODEL=kokoro-82m
- DEVICE=cuda # GPU 加速(生产环境)
volumes:
- ./models:/app/models # 模型文件挂载
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:3008/health"]
interval: 30s
start_period: 60s # 模型加载时间
3.6 Audit(审计日志)— audit-service
audit-service/
├── src/
│ ├── domain/
│ │ ├── entities/
│ │ │ └── audit-log.entity.ts
│ │ └── value-objects/
│ │ ├── action-type.vo.ts # command_executed/approval/login/config_change
│ │ └── audit-level.vo.ts
│ ├── application/
│ │ └── use-cases/
│ │ ├── record-audit.use-case.ts
│ │ └── query-audit-trail.use-case.ts
│ └── infrastructure/
│ └── persistence/
│ └── append-only-repository.ts # 只追加,不可修改/删除
审计日志规范:
- 所有 agent 执行的命令、结果、时间戳
- 所有审批操作的决策人、时间、理由
- 所有引擎切换操作
- 只追加(Append-Only),不可篡改
3.7 Auth(认证授权)— auth-service
auth-service/
├── src/
│ ├── domain/
│ │ ├── entities/
│ │ │ ├── user.entity.ts
│ │ │ ├── role.entity.ts
│ │ │ └── api-key.entity.ts
│ │ └── value-objects/
│ │ ├── permission.vo.ts # 细粒度权限
│ │ └── role-type.vo.ts # admin/operator/viewer
│ └── infrastructure/
│ ├── jwt/
│ │ └── jwt.strategy.ts
│ └── guards/
│ └── rbac.guard.ts
RBAC 权限模型:
| 角色 | 查看仪表盘 | 发起巡检 | 审批操作 | 管理服务器 | 切换引擎 |
|---|---|---|---|---|---|
| viewer | ✅ | ❌ | ❌ | ❌ | ❌ |
| operator | ✅ | ✅ | ❌ | ❌ | ❌ |
| admin | ✅ | ✅ | ✅ | ✅ | ✅ |
3.8 ★ 自治运维工作流 — 驻留指令(Standing Orders)
这是 IT0 的核心运维范式:管理员通过对话定义任务 → Agent 自主执行 → 遇到无法决策的事件智能升级给人类。
3.8.1 核心概念:驻留指令
驻留指令(Standing Order) 是管理员通过对话与 Agent 协商后生成的持久化自治任务定义。一旦创建,Agent 将按照定义自主执行,无需管理员持续在线。
┌────────────────────────────────────────────────────────────────────────┐
│ IT0 自治运维工作流 │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────────┐ │
│ │ Phase 1 │ │ Phase 2 │ │ Phase 3 │ │
│ │ 对话定义 │───→│ 自治执行 │───→│ 智能升级 │ │
│ │ │ │ │ │ │ │
│ │ 管理员通过 │ │ Agent 按计划 │ │ 遇到无法决策的情况: │ │
│ │ 文字/语音对话 │ │ 或事件触发 │ │ │ │
│ │ 与 Agent 沟通 │ │ 自动执行任务 │ │ 致命 → 语音电话 │ │
│ │ 任务需求 │ │ │ │ 严重 → IM 实时消息 │ │
│ │ │ │ 在决策边界内 │ │ 警告 → 短信/推送 │ │
│ │ Agent 提取为 │ │ 自主处理问题 │ │ 信息 → 邮件/日报 │ │
│ │ 结构化指令 │ │ │ │ │ │
│ │ │ │ 记录所有操作 │ │ 人类响应后继续执行 │ │
│ │ 管理员确认 │ │ 到审计日志 │ │ 或按人类指示调整 │ │
│ └──────────────┘ └──────────────┘ └──────────────────────┘ │
│ │
│ ── 管理员随时可以修改、暂停、删除驻留指令 ── │
└────────────────────────────────────────────────────────────────────────┘
3.8.2 对话 → 指令提取(Task Definition Session)
管理员与 Agent 的对话中,Agent 使用专门的指令提取模式,从自然语言中结构化提取任务定义:
管理员: "以后每天早上8点帮我巡检所有生产服务器,检查CPU、内存、磁盘用量。
如果磁盘超过85%,自动清理超过7天的日志。
如果超过95%,马上电话通知我。
其他异常发短信就行。"
Agent: "收到。让我确认一下您的要求:
📋 驻留指令:每日生产环境巡检
⏰ 触发方式:每天 08:00(cron: 0 8 * * *)
🎯 目标范围:所有 environment=prod 的服务器
📌 自治操作(Agent 可自主执行):
├─ 检查 CPU / 内存 / 磁盘使用率
└─ 磁盘 > 85%:自动清理 7 天前的日志
🚨 升级策略:
├─ 磁盘 > 95%:立即语音电话通知
└─ 其他异常:短信通知
⚠️ 决策边界(Agent 不会自主执行的操作):
├─ ❌ 不会重启任何服务
├─ ❌ 不会修改任何配置文件
└─ ❌ 不会删除非日志文件
是否确认创建?"
管理员: "确认。另外如果CPU持续超过90%超过10分钟,也给我打电话。"
Agent: "已更新。追加条件:
├─ CPU > 90% 持续 10 分钟:语音电话通知
✅ 驻留指令已创建,将从明天 08:00 开始执行。
指令ID: SO-20260208-001"
指令提取流程(agent-service 内部):
// domain/services/standing-order-extractor.ts
/**
* 从对话中提取驻留指令的领域服务
*
* 流程:
* 1. 对话进行中,Agent 检测到用户意图是「定义持续性任务」
* 2. Agent 切换到指令提取模式(System Prompt 追加指令提取 schema)
* 3. Agent 生成结构化的指令草案(JSON)
* 4. 草案经人类确认后持久化为 StandingOrder
* 5. 后续对话修改可增量更新现有指令
*
* 关键:Agent 的 System Prompt 中包含 StandingOrder JSON Schema,
* 使 Agent 输出符合格式的指令定义,而非自由文本。
*/
export class StandingOrderExtractorService {
/**
* 检测对话意图是否为「定义任务」
* 关键词:以后、每天、定时、自动、帮我...检查/清理/巡检...
*/
detectTaskDefinitionIntent(message: string): boolean;
/**
* 将 Agent 的结构化输出解析为 StandingOrder 草案
*/
parseOrderDraft(agentOutput: string): StandingOrderDraft;
/**
* 人类确认后,将草案转为正式驻留指令
*/
confirmOrder(draft: StandingOrderDraft, userId: string): StandingOrder;
}
3.8.3 驻留指令实体(跨 agent-service + ops-service)
// ops-service/domain/entities/standing-order.entity.ts
export class StandingOrder {
id: string; // SO-{date}-{seq}
tenantId: string;
name: string; // "每日生产巡检"
description: string; // 自然语言描述
/** 定义此指令的对话会话ID(溯源) */
definedInSessionId: string;
/** ── 触发方式 ── */
trigger: {
type: 'cron' | 'event' | 'threshold' | 'manual';
cronExpression?: string; // "0 8 * * *"
eventType?: string; // 'alert_fired' | 'deployment_completed'
thresholdCondition?: { // 指标阈值触发
metricType: string;
operator: '>' | '<' | '==' | '>=';
value: number;
durationSeconds: number; // 持续时间
};
};
/** ── 目标范围 ── */
targets: {
serverIds?: string[]; // 指定服务器
clusterIds?: string[]; // 指定集群
allServers?: boolean; // 所有服务器
environmentFilter?: string[]; // ['prod'] 只在生产环境
tagFilter?: Record<string, string>; // 按标签筛选
};
/** ── Agent 执行指令 ── */
agentInstructions: {
prompt: string; // 结构化的执行指令(从对话提取)
skills?: string[]; // 使用的 Skills
runbookId?: string; // 关联的 Runbook(可选)
maxRiskLevel: 0 | 1; // 自治执行允许的最高风险等级(0=只读,1=低风险写入)
maxTurns: number; // Agent 最大对话轮次
maxBudgetUsd?: number; // 单次执行预算上限
};
/** ── 决策边界(Agent 自治范围)── */
decisionBoundary: {
/** 可自动执行的操作白名单 */
allowedActions: string[]; // ['cleanup_logs', 'report_metrics']
/** 必须升级给人类的情况 */
escalateConditions: string[]; // ['disk_full', 'service_down', 'data_loss_risk']
/** 各阈值的通知方式 */
escalationRules: Array<{
condition: string; // "disk_usage > 95%" 或 "cpu > 90% for 10min"
channel: 'voice_call' | 'im' | 'sms' | 'push' | 'email';
priority: 'urgent' | 'high' | 'normal' | 'low';
}>;
};
/** ── 升级策略 ── */
escalationPolicyId: string; // 关联的 comm-service 升级策略
/** ── 生命周期 ── */
status: 'active' | 'paused' | 'archived';
validFrom: Date;
validUntil?: Date; // 可选过期时间
createdBy: string;
lastModifiedInSessionId?: string; // 最后修改此指令的对话ID
createdAt: Date;
updatedAt: Date;
}
3.8.4 自治执行引擎(ops-service + agent-service 协作)
// ops-service/infrastructure/schedulers/standing-order-executor.service.ts
/**
* 驻留指令执行引擎
*
* 职责:
* 1. 根据触发条件(cron/event/threshold)启动执行
* 2. 创建「无人值守会话」(headless session)
* 3. 在 Agent 的决策边界内自主运行
* 4. 记录所有操作到审计日志
* 5. 遇到边界外情况 → 触发智能升级
*/
@Injectable()
export class StandingOrderExecutorService {
constructor(
private readonly orderRepo: StandingOrderRepository,
private readonly agentClient: AgentClientPort,
private readonly commClient: CommClientPort,
private readonly auditLogger: AuditLoggerPort,
) {}
/**
* Cron 触发:Bull Queue 定时任务调度
* 每分钟扫描所有 active 的 cron 型驻留指令,匹配到期的加入执行队列
*/
@Cron('* * * * *')
async scanCronOrders(): Promise<void> {
const dueOrders = await this.orderRepo.findDueCronOrders();
for (const order of dueOrders) {
await this.executionQueue.add('execute-standing-order', {
orderId: order.id,
triggerType: 'cron',
triggeredAt: new Date(),
});
}
}
/**
* 事件触发:监听 Redis Streams 事件
* alert_fired / deployment_completed 等事件匹配 event 型驻留指令
*/
@OnEvent('alert.fired')
async onAlertFired(event: AlertEvent): Promise<void> {
const matchingOrders = await this.orderRepo.findByEventTrigger('alert_fired');
for (const order of matchingOrders) {
if (this.matchesTargets(order, event.serverId)) {
await this.executeOrder(order, { triggerEvent: event });
}
}
}
/**
* 核心执行逻辑
*/
async executeOrder(order: StandingOrder, context: ExecutionContext): Promise<void> {
// 1. 创建无人值守执行记录
const execution = await this.createExecution(order);
// 2. 构建 Agent 任务参数(注入决策边界到 System Prompt)
const taskParams = this.buildTaskParams(order, context);
// 3. 调用 agent-service 执行(gRPC 流式调用)
try {
for await (const event of this.agentClient.executeTask(taskParams)) {
// 记录每一步操作
await this.auditLogger.log({
actionType: 'standing_order_step',
orderId: order.id,
executionId: execution.id,
detail: event,
});
// 4. 检查是否触发升级条件
if (event.type === 'approval_required') {
await this.handleEscalation(order, execution, event);
}
// 5. 检查自定义升级规则
if (this.matchesEscalationRule(order, event)) {
await this.triggerCustomEscalation(order, execution, event);
}
}
execution.status = 'completed';
} catch (error) {
execution.status = 'failed';
execution.error = error.message;
// 执行失败也要通知
await this.notifyExecutionFailure(order, execution, error);
}
await this.orderRepo.saveExecution(execution);
}
/**
* 构建 Agent 任务参数
* 关键:将决策边界注入到 System Prompt 中,约束 Agent 行为
*/
private buildTaskParams(order: StandingOrder, ctx: ExecutionContext): EngineTaskParams {
const boundaryPrompt = `
## 驻留指令执行模式
你正在执行驻留指令「${order.name}」(ID: ${order.id})。
这是一次**无人值守的自治执行**,请严格遵守以下边界:
### 允许的操作(可自主执行)
${order.decisionBoundary.allowedActions.map(a => `- ${a}`).join('\n')}
### 禁止的操作(必须停止并请求人工干预)
${order.decisionBoundary.escalateConditions.map(c => `- 遇到「${c}」→ 立即停止,输出 ESCALATION_REQUIRED`).join('\n')}
### 最高风险等级限制
仅允许执行风险等级 ≤ ${order.agentInstructions.maxRiskLevel} 的命令。
遇到更高风险的命令,输出 ESCALATION_REQUIRED 并说明原因。
### 执行目标
${JSON.stringify(order.targets, null, 2)}
`;
return {
sessionId: `so-${order.id}-${Date.now()}`,
prompt: order.agentInstructions.prompt,
systemPrompt: boundaryPrompt,
allowedTools: this.resolveAllowedTools(order),
maxTurns: order.agentInstructions.maxTurns,
maxBudgetUsd: order.agentInstructions.maxBudgetUsd,
context: {
standingOrderId: order.id,
triggerContext: ctx,
isHeadless: true, // 标记为无人值守模式
},
};
}
}
3.8.5 智能升级(Smart Escalation)
当 Agent 在自治执行中遇到超出决策边界的情况时,按以下三层递进流程升级:
Agent 执行中检测到异常
│
├── 匹配驻留指令中的 escalationRules
│ ├── condition: "disk_usage > 95%" → priority: urgent
│ ├── condition: "cpu > 90% for 10min" → priority: urgent
│ └── condition: "other_anomaly" → priority: normal
│
├── 如果无具体规则匹配 → 使用驻留指令关联的 escalationPolicy
│
└── 三层递进升级
│
├── ⏱ 第一层:唤醒管理员
│ ├── ① App 推送(WebSocket + FCM)→ 全屏通知卡片
│ │ └── 管理员打开 App →「语音对话」→ Pipecat WebSocket 语音对话
│ │ Agent 语音汇报 + 听取指示(支持打断)
│ │
│ ├── ② 2分钟无响应 → Pipecat 通过 Twilio 拨出电话
│ │ 管理员接听 → Agent 直接开口说话(无 IVR 菜单):
│ │ "您好,我是 IT0 Agent。prod-2 磁盘使用率 97%,
│ │ 我建议清理 /var/log 下超过7天的日志。您同意吗?"
│ │ 管理员语音回复 → Agent 理解并执行
│ │ (与 App 内语音对话使用同一 Pipecat 引擎)
│ │
│ └── 管理员通过任意渠道回复后 → SmartEscalationService.onHumanResponse()
│
├── ⏱ 第二层:扩大通知范围(5 分钟内仍无响应)
│ ├── IM 消息(Telegram/企业微信):
│ │ 结构化消息 + 操作按钮(批准/拒绝/查看详情)
│ ├── 短信(简要描述 + 回复 Y/N 审批)
│ └── 通知备用联系人(重复第一层流程)
│
└── ⏱ 第三层:全渠道通知(10+ 分钟无响应)
├── 邮件:完整报告(日志摘录、指标图表、操作链接)
└── 通知管理组所有成员
// ops-service/domain/services/smart-escalation.service.ts
export class SmartEscalationService {
constructor(
private readonly commClient: CommClientPort,
private readonly voiceClient: VoiceServiceClientPort, // ★ voice-service 客户端
private readonly agentClient: AgentClientPort,
private readonly orderRepo: StandingOrderRepository,
private readonly execRepo: StandingOrderExecutionRepository,
) {}
/**
* 根据驻留指令的升级规则 + 当前事件,决定通知渠道和内容
*/
async escalate(
order: StandingOrder,
execution: StandingOrderExecution,
event: EngineStreamEvent,
): Promise<void> {
const rule = this.matchRule(order.decisionBoundary.escalationRules, event);
const notification = this.buildNotification(order, execution, event, rule);
// 1. 第一层:App 推送 + 语音对话预备
await this.commClient.send({
tenantId: order.tenantId,
channel: 'push', // 推送到 App
priority: rule?.priority ?? 'normal',
content: notification,
callbackContext: {
executionId: execution.id,
orderId: order.id,
awaitingAction: 'approve_or_reject',
},
});
// 2. 预创建语音对话会话(用户打开 App 时可立即接入)
const voiceSession = await this.voiceClient.prepareSession({
executionId: execution.id,
agentContext: {
orderId: order.id,
orderName: order.name,
eventSummary: event.summary,
sessionId: execution.sessionId,
},
});
// 3. 启动超时升级调度
await this.scheduleEscalationChain(execution, rule, voiceSession);
// 4. 暂停执行,等待人类响应
execution.status = 'awaiting_human';
execution.escalatedAt = new Date();
execution.escalationChannel = 'push'; // 初始渠道
}
/**
* 超时升级链:App推送 → 电话唤醒 → IM/短信 → 备用联系人 → 邮件/管理组
*/
private async scheduleEscalationChain(
execution: StandingOrderExecution,
rule: EscalationRule | null,
voiceSession: VoiceSessionInfo,
) {
const chain = [
{ delay: 2 * 60_000, action: 'pipecat_phone_call', params: { voiceSessionId: voiceSession.id } },
{ delay: 5 * 60_000, action: 'im_and_sms' },
{ delay: 10 * 60_000, action: 'backup_contacts' },
{ delay: 20 * 60_000, action: 'email_management_group' },
];
// 使用 Bull Queue 延迟任务实现,每一步检查是否已有响应
for (const step of chain) {
await this.escalationQueue.add('escalation-step', {
executionId: execution.id,
...step,
}, { delay: step.delay });
}
}
/**
* 人类响应回调(通过任何渠道回复后触发)
* - App 内语音对话 → voice-service 回调
* - Pipecat 电话语音对话 → voice-service 回调(同一引擎)
* - IM 按钮/文字 → comm-service 回调
* - 短信回复 → comm-service 回调
*/
async onHumanResponse(
executionId: string,
response: HumanResponse,
): Promise<void> {
const execution = await this.execRepo.findById(executionId);
// 取消后续升级链(人已响应)
await this.escalationQueue.removeByExecutionId(executionId);
if (response.action === 'approve') {
await this.agentClient.continueSession(execution.sessionId,
`人工审批已通过。操作者: ${response.respondedBy},渠道: ${response.channel}。继续执行。`
);
execution.status = 'running';
} else if (response.action === 'reject') {
await this.agentClient.cancelTask(execution.sessionId);
execution.status = 'aborted_by_human';
} else if (response.action === 'instruct') {
// 人类通过语音或文字给出详细指示
await this.agentClient.continueSession(execution.sessionId,
`管理员指示(${response.channel}):${response.instruction}`
);
execution.status = 'running';
}
execution.humanResponse = response;
execution.respondedBy = response.respondedBy;
execution.respondedAt = new Date();
}
}
Pipecat 主动拨号(voice-service 通过 Twilio REST API 发起电话):
# voice-service — 主动拨出电话
# app/services/outbound_call_service.py
from twilio.rest import Client as TwilioClient
class OutboundCallService:
"""
当 App 推送 2 分钟无响应时,SmartEscalationService 调用此服务
→ Pipecat 通过 Twilio 拨出电话
→ 管理员接听后 Pipecat 直接开始语音对话(无 IVR 菜单)
"""
async def initiate_call(
self,
phone_number: str,
voice_session_id: str, # 预创建的语音会话 ID
agent_context: dict, # 驻留指令上下文
):
# 1. Twilio 拨出电话,接通后连接到 Pipecat Media Streams
call = self.twilio_client.calls.create(
to=phone_number,
from_=self.twilio_phone_number,
twiml=f'''
<Response>
<Connect>
<Stream url="wss://{self.host}/twilio-stream/{voice_session_id}" />
</Connect>
</Response>
''',
)
# 2. Pipecat Pipeline 在 Media Streams 连接时自动启动
# Agent 接通即开口说话:"您好,我是 IT0 Agent..."
# 无需任何 IVR 菜单或按键操作
return call.sid
// comm-service — 触发 Pipecat 拨号
// SmartEscalationService 在 2 分钟超时后调用
async function triggerPipecatCall(
voiceSessionId: string,
contact: Contact,
) {
// 调用 voice-service API,让 Pipecat 拨出电话
await this.voiceClient.initiateCall({
phoneNumber: contact.phone,
voiceSessionId,
// Pipecat 接通后 Agent 直接开口说话
// 使用与 App 内完全相同的 Pipeline
});
}
3.8.6 对话修改驻留指令
管理员可以随时通过对话修改已有的驻留指令:
管理员: "把每日巡检的时间改成早上7点,另外磁盘清理阈值从85%降到80%。"
Agent: "已更新驻留指令 SO-20260208-001:
├─ ⏰ 触发时间:08:00 → 07:00(cron: 0 7 * * *)
└─ 📌 磁盘清理阈值:85% → 80%
其他配置保持不变。变更已生效。"
管理员: "暂停这个指令,我这周在做服务器迁移。"
Agent: "已暂停驻留指令 SO-20260208-001「每日生产巡检」。
⏸ 状态:active → paused
需要恢复时告诉我即可。"
实现要点:
- Agent System Prompt 中包含当前租户所有活跃的驻留指令列表
- Agent 检测到修改意图时,调用
update_standing_ordertool - 所有修改记录关联对话 session ID(完整溯源)
- 修改操作写入审计日志
3.8.7 执行结果汇报
┌─────────────────────────────────────────────────────────────────┐
│ 驻留指令执行汇报方式 │
│ │
│ ┌───────────────────┐ │
│ │ 正常完成 │ → 静默记录审计日志 │
│ │(一切正常) │ 可在 Web Admin / Flutter 查看 │
│ └───────────────────┘ │
│ │
│ ┌───────────────────┐ │
│ │ 有发现但已自治处理 │ → 推送通知 + 执行摘要 │
│ │(如自动清理了日志) │ "已清理 prod-2 的 3.2GB 旧日志" │
│ └───────────────────┘ │
│ │
│ ┌───────────────────┐ │
│ │ 需人工决策 │ → 按 escalationRules 升级 │
│ │(超出决策边界) │ 电话/IM/短信/邮件 │
│ └───────────────────┘ │
│ │
│ ┌───────────────────┐ │
│ │ 执行失败 │ → 立即推送 + 短信 │
│ │(Agent 异常/超时) │ 含错误详情和建议 │
│ └───────────────────┘ │
│ │
│ ── 日报/周报 ── │
│ 每日 20:00 汇总当天所有驻留指令执行情况 │
│ 推送到 Flutter App / 邮件 │
└─────────────────────────────────────────────────────────────────┘
3.8.8 相关 Tools(agent-service 注册的工具)
// agent-service 的 tool 定义(供 Agent 调用)
const standingOrderTools = [
{
name: 'create_standing_order',
description: '创建新的驻留指令(当用户要求定义持续性/定时任务时使用)',
input_schema: { /* StandingOrderDraft JSON Schema */ },
},
{
name: 'update_standing_order',
description: '修改已有的驻留指令(当用户要求调整任务参数时使用)',
input_schema: {
orderId: { type: 'string' },
changes: { /* 部分更新的字段 */ },
},
},
{
name: 'pause_standing_order',
description: '暂停驻留指令(用户要求暂停某个定时任务时使用)',
input_schema: { orderId: { type: 'string' } },
},
{
name: 'resume_standing_order',
description: '恢复已暂停的驻留指令',
input_schema: { orderId: { type: 'string' } },
},
{
name: 'list_standing_orders',
description: '列出当前所有驻留指令及其状态',
input_schema: { statusFilter: { type: 'string', enum: ['active', 'paused', 'all'] } },
},
{
name: 'report_escalation',
description: '无人值守执行中遇到超出决策边界的情况,请求人工干预',
input_schema: {
severity: { type: 'string', enum: ['fatal', 'critical', 'warning', 'info'] },
situation: { type: 'string', description: '当前状况描述' },
suggestedAction: { type: 'string', description: 'Agent 建议的操作' },
requiresImmediate: { type: 'boolean' },
},
},
];
4. 数据库设计(PostgreSQL)
多租户说明:根据 §7 的 Schema-per-Tenant 策略,以下业务表存在于每个租户的独立 Schema(
it0_t_{tenantId})中。 所有表额外包含tenant_id列作为防御性纵深——即使 Schema 隔离失败,行级tenant_id仍可防止数据泄露。 公共管理表(tenants,plans等)位于it0_sharedSchema,参见 §7.5。
4.1 agent-service — it0_agent schema
-- 智能体会话
CREATE TABLE agent_sessions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id VARCHAR(20) NOT NULL, -- ★ 防御性纵深:冗余租户标识
engine_type VARCHAR(20) NOT NULL, -- 'claude_code_cli' | 'claude_api' | 'custom'
status VARCHAR(20) NOT NULL DEFAULT 'active',
system_prompt TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
metadata JSONB DEFAULT '{}'
);
-- 命令执行记录
CREATE TABLE command_records (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id VARCHAR(20) NOT NULL,
session_id UUID NOT NULL REFERENCES agent_sessions(id),
command TEXT NOT NULL,
risk_level SMALLINT NOT NULL DEFAULT 0, -- 0-3
status VARCHAR(20) NOT NULL, -- 'executed' | 'blocked' | 'approved' | 'rejected'
output TEXT,
error TEXT,
approved_by UUID, -- 审批人
executed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
duration_ms INTEGER
);
CREATE INDEX idx_command_records_session ON command_records(session_id);
CREATE INDEX idx_command_records_risk ON command_records(risk_level) WHERE risk_level >= 2;
4.2 ops-service — it0_ops schema
-- 运维任务
CREATE TABLE ops_tasks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id VARCHAR(20) NOT NULL,
type VARCHAR(30) NOT NULL, -- 'inspection' | 'deployment' | 'rollback' | 'scaling' | 'recovery'
title VARCHAR(200) NOT NULL,
description TEXT,
status VARCHAR(20) NOT NULL DEFAULT 'pending',
priority SMALLINT NOT NULL DEFAULT 1,
target_servers UUID[] NOT NULL, -- 目标服务器 ID 列表
runbook_id UUID,
agent_session_id UUID,
created_by UUID NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ,
result_summary TEXT
);
-- 审批请求
CREATE TABLE approval_requests (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id VARCHAR(20) NOT NULL,
task_id UUID NOT NULL REFERENCES ops_tasks(id),
command TEXT NOT NULL,
risk_level SMALLINT NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'pending',
decided_by UUID,
decided_at TIMESTAMPTZ,
reason TEXT,
expires_at TIMESTAMPTZ NOT NULL,
notification_channels VARCHAR(20)[] -- 已通知的渠道
);
-- 运维手册(Runbook)
CREATE TABLE runbooks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id VARCHAR(20) NOT NULL,
name VARCHAR(200) NOT NULL,
description TEXT,
trigger_type VARCHAR(30), -- 'manual' | 'alert' | 'schedule'
prompt_template TEXT NOT NULL, -- Agent 执行的 prompt 模板
allowed_tools VARCHAR(50)[] NOT NULL,
max_risk_level SMALLINT NOT NULL DEFAULT 1, -- 此 Runbook 允许的最高风险等级
auto_approve BOOLEAN NOT NULL DEFAULT FALSE,
is_active BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- ★ 驻留指令(Standing Orders)— 自治运维任务定义
CREATE TABLE standing_orders (
id VARCHAR(30) PRIMARY KEY, -- SO-{date}-{seq}
tenant_id VARCHAR(20) NOT NULL,
name VARCHAR(200) NOT NULL,
description TEXT,
-- 对话溯源
defined_in_session_id UUID NOT NULL, -- 定义此指令的对话ID
last_modified_in_session_id UUID, -- 最后修改的对话ID
-- 触发方式
trigger_type VARCHAR(20) NOT NULL, -- 'cron' | 'event' | 'threshold' | 'manual'
trigger_config JSONB NOT NULL, -- { cronExpression, eventType, thresholdCondition }
-- 目标范围
targets JSONB NOT NULL, -- { serverIds, clusterIds, allServers, environmentFilter, tagFilter }
-- Agent 执行参数
agent_prompt TEXT NOT NULL, -- 从对话提取的结构化执行指令
agent_skills VARCHAR(100)[], -- 使用的 Skills 名称列表
runbook_id UUID REFERENCES runbooks(id), -- 关联的 Runbook(可选)
max_risk_level SMALLINT NOT NULL DEFAULT 0, -- 自治执行最高风险等级(0=只读,1=低风险写入)
max_turns INTEGER NOT NULL DEFAULT 20,
max_budget_usd NUMERIC(6,2),
-- 决策边界
decision_boundary JSONB NOT NULL, -- { allowedActions, escalateConditions, escalationRules }
-- 升级策略
escalation_policy_id UUID,
-- 生命周期
status VARCHAR(20) NOT NULL DEFAULT 'active', -- 'active' | 'paused' | 'archived'
valid_from TIMESTAMPTZ NOT NULL DEFAULT NOW(),
valid_until TIMESTAMPTZ,
-- 元信息
created_by UUID NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_standing_orders_status ON standing_orders(status) WHERE status = 'active';
CREATE INDEX idx_standing_orders_trigger ON standing_orders(trigger_type);
-- 驻留指令执行记录
CREATE TABLE standing_order_executions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id VARCHAR(20) NOT NULL,
order_id VARCHAR(30) NOT NULL REFERENCES standing_orders(id),
-- 触发信息
trigger_type VARCHAR(20) NOT NULL,
trigger_context JSONB, -- 触发上下文(告警事件、cron 时间等)
-- 执行状态
session_id VARCHAR(100), -- Agent 会话ID
status VARCHAR(20) NOT NULL DEFAULT 'running',
-- 'running' | 'completed' | 'failed' | 'awaiting_human' | 'aborted_by_human'
-- 结果
result_summary TEXT, -- Agent 执行摘要
actions_taken JSONB DEFAULT '[]', -- 执行的操作列表
error TEXT,
-- 升级信息
escalated_at TIMESTAMPTZ,
escalation_channel VARCHAR(20), -- 升级使用的渠道
human_response JSONB, -- 人类响应内容
responded_by UUID,
responded_at TIMESTAMPTZ,
-- 时间
started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ,
duration_ms INTEGER
);
CREATE INDEX idx_so_executions_order ON standing_order_executions(order_id, started_at DESC);
CREATE INDEX idx_so_executions_status ON standing_order_executions(status) WHERE status IN ('running', 'awaiting_human');
4.3 inventory-service — it0_inventory schema
-- 服务器
CREATE TABLE servers (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id VARCHAR(20) NOT NULL,
name VARCHAR(100) NOT NULL UNIQUE,
host VARCHAR(255) NOT NULL, -- IP 或主机名(内网 IP 也可以)
port INTEGER NOT NULL DEFAULT 22,
environment VARCHAR(20) NOT NULL, -- 'dev' | 'staging' | 'prod'
role VARCHAR(30) NOT NULL, -- 'web' | 'db' | 'cache' | 'worker' | 'gateway'
cluster_id UUID REFERENCES clusters(id),
ssh_user VARCHAR(50) NOT NULL, -- SSH 登录用户名
credential_id UUID NOT NULL REFERENCES credentials(id), -- SSH 密钥或密码
-- ★ SSH ProxyJump 跳板机配置
network_type VARCHAR(20) NOT NULL DEFAULT 'public', -- 'public' | 'private'(内网需跳板机)
jump_server_id UUID REFERENCES servers(id), -- 跳板机(自引用,NULL=直连)
-- 额外 SSH 选项
ssh_options JSONB DEFAULT '{}', -- 自定义 SSH 参数,如 StrictHostKeyChecking 等
tags JSONB DEFAULT '{}',
status VARCHAR(20) NOT NULL DEFAULT 'active',
description TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_servers_environment ON servers(environment);
CREATE INDEX idx_servers_cluster ON servers(cluster_id);
-- 集群
CREATE TABLE clusters (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id VARCHAR(20) NOT NULL,
name VARCHAR(100) NOT NULL UNIQUE,
type VARCHAR(20) NOT NULL, -- 'bare_metal' | 'k8s' | 'docker_swarm'
environment VARCHAR(20) NOT NULL,
description TEXT,
kubeconfig TEXT, -- 加密存储
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- 凭证(加密存储)
CREATE TABLE credentials (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id VARCHAR(20) NOT NULL,
name VARCHAR(100) NOT NULL,
type VARCHAR(20) NOT NULL, -- 'ssh_key' | 'ssh_password' | 'token' | 'kubeconfig'
encrypted_value BYTEA NOT NULL, -- AES-256-GCM 加密
iv BYTEA NOT NULL,
-- 元信息(非敏感)
fingerprint VARCHAR(100), -- SSH key 指纹(用于展示,不含私钥)
key_type VARCHAR(20), -- 'rsa' | 'ed25519' | 'ecdsa'(仅 ssh_key 类型)
created_by UUID,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
expires_at TIMESTAMPTZ,
last_used_at TIMESTAMPTZ
);
/**
* SSH 连接构建逻辑(agent-service 中实现):
*
* 1. 查询目标服务器:server = getServer(serverId)
* 2. 查询凭证:credential = getCredential(server.credential_id)
* 3. 判断连接方式:
*
* A) 直连(network_type = 'public' 或 jump_server_id = NULL):
* ssh -i /tmp/key_{uuid} -p {port} {ssh_user}@{host}
*
* B) 跳板机(network_type = 'private' 且 jump_server_id != NULL):
* jumpServer = getServer(server.jump_server_id)
* jumpCred = getCredential(jumpServer.credential_id)
* ssh -J {jump_user}@{jump_host}:{jump_port} -i /tmp/key_{uuid} -p {port} {ssh_user}@{host}
*
* C) 多级跳板(jump_server 自身也有 jump_server_id):
* 递归构建 ProxyJump 链:ssh -J jump1,jump2 target
*
* 4. 凭证通过临时文件注入(TTL=5min,用后删除)
* 5. 额外 SSH 选项从 server.ssh_options 读取
*/
4.4 monitor-service — it0_monitor schema
-- 告警规则
CREATE TABLE alert_rules (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id VARCHAR(20) NOT NULL,
name VARCHAR(200) NOT NULL,
metric_type VARCHAR(30) NOT NULL, -- 'cpu' | 'memory' | 'disk' | 'network' | 'custom'
condition JSONB NOT NULL, -- { "operator": ">", "threshold": 90, "duration_seconds": 300 }
severity VARCHAR(20) NOT NULL, -- 'info' | 'warning' | 'critical' | 'fatal'
target_servers UUID[], -- NULL = 所有服务器
runbook_id UUID, -- 自动修复关联的 Runbook
is_active BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- 告警事件
CREATE TABLE alert_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id VARCHAR(20) NOT NULL,
rule_id UUID NOT NULL REFERENCES alert_rules(id),
server_id UUID NOT NULL,
severity VARCHAR(20) NOT NULL,
message TEXT NOT NULL,
metric_value DOUBLE PRECISION,
status VARCHAR(20) NOT NULL DEFAULT 'firing', -- 'firing' | 'resolved' | 'acknowledged'
fired_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
resolved_at TIMESTAMPTZ,
acknowledged_by UUID
);
-- 指标快照(时序数据,定期归档)
CREATE TABLE metric_snapshots (
id BIGSERIAL PRIMARY KEY,
tenant_id VARCHAR(20) NOT NULL,
server_id UUID NOT NULL,
metric_type VARCHAR(30) NOT NULL,
value DOUBLE PRECISION NOT NULL,
recorded_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- 分区表优化(按月)
-- CREATE TABLE metric_snapshots_2026_02 PARTITION OF metric_snapshots
-- FOR VALUES FROM ('2026-02-01') TO ('2026-03-01');
4.5 comm-service — it0_comm schema
-- 联系人
CREATE TABLE contacts (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id VARCHAR(20) NOT NULL,
user_id UUID NOT NULL,
name VARCHAR(100) NOT NULL,
phone VARCHAR(20), -- 短信/电话
email VARCHAR(200),
telegram_id VARCHAR(100),
wechat_id VARCHAR(100),
preferred_channel VARCHAR(20) NOT NULL DEFAULT 'websocket',
is_active BOOLEAN NOT NULL DEFAULT TRUE
);
-- 消息记录
CREATE TABLE messages (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id VARCHAR(20) NOT NULL,
contact_id UUID NOT NULL REFERENCES contacts(id),
channel_type VARCHAR(20) NOT NULL,
direction VARCHAR(10) NOT NULL, -- 'outbound' | 'inbound'
content TEXT NOT NULL,
priority VARCHAR(10) NOT NULL DEFAULT 'normal',
delivery_status VARCHAR(20) NOT NULL DEFAULT 'pending',
related_task_id UUID, -- 关联的运维任务
sent_at TIMESTAMPTZ,
delivered_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- 升级策略
CREATE TABLE escalation_policies (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id VARCHAR(20) NOT NULL,
name VARCHAR(100) NOT NULL,
severity VARCHAR(20) NOT NULL, -- 匹配告警级别
steps JSONB NOT NULL, -- [{"delay_seconds": 0, "channel": "websocket"}, {"delay_seconds": 120, "channel": "sms"}, ...]
is_default BOOLEAN NOT NULL DEFAULT FALSE
);
4.6 audit-service — it0_audit schema
-- 审计日志(只追加,不可修改/删除)
CREATE TABLE audit_logs (
id BIGSERIAL PRIMARY KEY,
tenant_id VARCHAR(20) NOT NULL,
action_type VARCHAR(50) NOT NULL, -- 'command_executed' | 'approval_decided' | 'engine_switched' | etc.
actor_type VARCHAR(20) NOT NULL, -- 'agent' | 'user' | 'system'
actor_id VARCHAR(100),
resource_type VARCHAR(50), -- 'server' | 'task' | 'session'
resource_id VARCHAR(100),
detail JSONB NOT NULL,
ip_address INET,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- 性能索引
CREATE INDEX idx_audit_logs_action ON audit_logs(action_type, created_at);
CREATE INDEX idx_audit_logs_actor ON audit_logs(actor_id, created_at);
CREATE INDEX idx_audit_logs_resource ON audit_logs(resource_type, resource_id);
-- 防篡改:REVOKE DELETE, UPDATE ON audit_logs FROM app_user;
5. 服务间通信
5.1 通信矩阵
┌─────────────────────────────────────────────┐
│ 事件总线 (Redis Streams) │
└──┬────┬────┬────┬────┬────┬────┬────────────┘
│ │ │ │ │ │ │
┌────┴─┐ ┌┴────┐ ┌─┴──┐ ┌┴───┐ ┌─┴──┐ ┌──┴──┐
│auth │ │agent│ │ops │ │inv │ │mon │ │comm │
└──────┘ └──┬──┘ └─┬──┘ └────┘ └──┬─┘ └─────┘
│ │ │
gRPC 同步调用: │
ops → agent (执行任务) │
agent → inv (获取凭证) │
mon → ops (触发自动修复) ───────────┘
ops → comm (通知人类)
5.2 关键事件流
// 事件定义(所有服务共享的 proto/event 包)
// agent-service 发布
interface CommandExecutedEvent {
sessionId: string;
command: string;
riskLevel: number;
output: string;
timestamp: Date;
}
interface ApprovalRequiredEvent {
taskId: string;
command: string;
riskLevel: number;
targetServer: string;
expiresAt: Date;
}
// monitor-service 发布
interface AlertFiredEvent {
ruleId: string;
serverId: string;
severity: 'info' | 'warning' | 'critical' | 'fatal';
message: string;
metricValue: number;
}
// ops-service 发布
interface TaskCompletedEvent {
taskId: string;
result: 'success' | 'failure' | 'partial';
summary: string;
}
6. 安全架构
6.1 三层防御模型
Layer 1: 命令分级(agent-service 领域层)
├── Level 0: 只读命令 → 自动执行
├── Level 1: 低风险写入 → 前端确认
├── Level 2: 高风险操作 → 多渠道审批
└── Level 3: 禁止命令 → 直接拦截
Layer 2: Hook 拦截(Claude Code PreToolUse)
└── pre-tool-use-hook.py
├── 正则匹配危险模式
├── 写入审批队列
└── 返回 block/allow
Layer 3: 架构隔离(目标服务器侧)
├── 受限 SSH 用户 (rbash)
├── K8s RBAC (ops-agent ClusterRole)
├── 数据库只读账号
└── 网络隔离 (Security Group)
6.2 凭证安全流程
agent-service 需要 SSH 到 prod-1
↓ gRPC 请求
inventory-service 接收请求
↓ 验证调用者权限
↓ 从 PostgreSQL 读取加密凭证
↓ 内存中解密
↓ 生成临时凭证(TTL=5min)
↓ 返回
agent-service 使用临时凭证执行 SSH
↓ 完成后
临时凭证自动过期
7. 多租户 SaaS 架构
7.1 隔离策略
采用 Schema-per-Tenant(同一 PostgreSQL 实例,每个租户独立 Schema)+ 行级隔离混合模式:
┌─────────────────────────────────────────────────────────────┐
│ PostgreSQL 实例 │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ it0_shared │ │ it0_t_001 │ │ it0_t_002 │ ... │
│ │ (公共表) │ │ (租户A数据) │ │ (租户B数据) │ │
│ │ │ │ │ │ │ │
│ │ tenants │ │ servers │ │ servers │ │
│ │ plans │ │ credentials │ │ credentials │ │
│ │ billing │ │ tasks │ │ tasks │ │
│ │ global_cfg │ │ sessions │ │ sessions │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘
| 表类别 | 隔离方式 | 说明 |
|---|---|---|
| tenants, plans, billing | 公共 Schema (it0_shared) |
跨租户的 SaaS 管理数据 |
| servers, credentials, tasks, sessions, skills, runbooks, alerts, contacts, audit_logs | 租户独立 Schema (it0_t_{tenantId}) |
完全隔离,互不可见 |
7.2 租户上下文传播(AsyncLocalStorage)
所有微服务共享同一个 TenantContext 模式,基于 Node.js AsyncLocalStorage:
// packages/shared/tenant/tenant-context.service.ts
import { AsyncLocalStorage } from 'async_hooks';
export interface TenantInfo {
tenantId: string;
tenantName: string;
plan: 'free' | 'pro' | 'enterprise';
schemaName: string; // 'it0_t_{tenantId}'
}
export class TenantContextService {
private static storage = new AsyncLocalStorage<TenantInfo>();
/** 设置当前请求的租户上下文 */
static run<T>(tenant: TenantInfo, fn: () => T): T {
return this.storage.run(tenant, fn);
}
/** 获取当前租户 */
static getTenant(): TenantInfo {
const tenant = this.storage.getStore();
if (!tenant) throw new Error('Tenant context not initialized');
return tenant;
}
static getTenantId(): string {
return this.getTenant().tenantId;
}
static getSchemaName(): string {
return this.getTenant().schemaName;
}
}
7.3 请求级租户注入
// packages/shared/tenant/tenant.middleware.ts
/**
* NestJS 中间件:从 JWT / API Key / 请求头提取 tenantId
* 注入到 AsyncLocalStorage,后续所有数据库操作自动切换 Schema
*/
@Injectable()
export class TenantMiddleware implements NestMiddleware {
constructor(private readonly tenantRepo: TenantRepository) {}
async use(req: Request, res: Response, next: NextFunction) {
// 优先级:JWT claim > X-Tenant-Id header > API Key 关联
const tenantId = this.extractTenantId(req);
if (!tenantId) throw new UnauthorizedException('Tenant not identified');
const tenant = await this.tenantRepo.findById(tenantId);
if (!tenant) throw new NotFoundException('Tenant not found');
TenantContextService.run({
tenantId: tenant.id,
tenantName: tenant.name,
plan: tenant.plan,
schemaName: `it0_t_${tenant.id}`,
}, () => next());
}
}
7.4 TypeORM 动态 Schema 切换
// packages/shared/tenant/tenant-aware.repository.ts
/**
* 所有租户隔离的 Repository 基类
* 自动在查询前切换 PostgreSQL search_path 到租户 Schema
*/
export abstract class TenantAwareRepository<T> {
constructor(
protected readonly dataSource: DataSource,
protected readonly entity: EntityTarget<T>,
) {}
protected async getRepository(): Promise<Repository<T>> {
const schema = TenantContextService.getSchemaName();
const queryRunner = this.dataSource.createQueryRunner();
await queryRunner.query(`SET search_path TO "${schema}", public`);
return queryRunner.manager.getRepository(this.entity);
}
// CRUD 方法都通过 getRepository() 获取租户隔离的仓储
async findById(id: string): Promise<T | null> {
const repo = await this.getRepository();
return repo.findOneBy({ id } as any);
}
async save(entity: T): Promise<T> {
const repo = await this.getRepository();
return repo.save(entity);
}
}
7.5 租户生命周期管理
-- it0_shared schema
CREATE TABLE tenants (
id VARCHAR(20) PRIMARY KEY, -- 短ID如 't001'
name VARCHAR(200) NOT NULL,
slug VARCHAR(50) NOT NULL UNIQUE, -- URL 友好标识
plan VARCHAR(20) NOT NULL DEFAULT 'free', -- 'free' | 'pro' | 'enterprise'
owner_user_id UUID NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'active', -- 'active' | 'suspended' | 'deleted'
-- 资源配额
max_servers INTEGER NOT NULL DEFAULT 5,
max_skills INTEGER NOT NULL DEFAULT 10,
max_users INTEGER NOT NULL DEFAULT 3,
max_sessions_per_day INTEGER NOT NULL DEFAULT 50,
-- 引擎配置
allowed_engines VARCHAR(20)[] NOT NULL DEFAULT '{claude_code_cli}',
-- 审计
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- 租户创建时自动初始化 Schema
-- 通过 NestJS migration 或 tenant-provisioning.service.ts 执行:
-- CREATE SCHEMA it0_t_{tenantId};
-- 运行租户 Schema 的迁移脚本(创建 servers, credentials, tasks 等表)
7.6 Kong 多租户路由
在 Kong 层面,通过 JWT claim 中的 tenant_id 传递租户标识:
# kong.yml 追加:租户限流(按 tenant 维度)
plugins:
- name: rate-limiting
config:
minute: 120
policy: redis
redis_host: redis
# 按 JWT 中的 tenant_id 限流
limit_by: credential
7.7 Agent 引擎的多租户隔离
┌─────────────────────────────────────────────────┐
│ agent-service │
│ │
│ 租户 A 的任务 ──→ spawn claude -p "..." │
│ --system-prompt-file /tmp/a/ │
│ 使用租户 A 的 .claude/skills/ │
│ SSH 到租户 A 的服务器 │
│ │
│ 租户 B 的任务 ──→ spawn claude -p "..." │
│ --system-prompt-file /tmp/b/ │
│ 使用租户 B 的 .claude/skills/ │
│ SSH 到租户 B 的服务器 │
│ │
│ 关键隔离点: │
│ 1. System Prompt 包含租户专属的服务器清单 │
│ 2. Skills 目录按租户隔离 │
│ 3. SSH 凭证从租户 Schema 读取 │
│ 4. Hook 脚本验证命令目标在租户范围内 │
│ 5. 并发控制:按租户 plan 限制同时执行的任务数 │
└─────────────────────────────────────────────────┘
8. API Gateway — Kong 声明式配置
8.1 架构选择
使用 Kong DB-less 模式(声明式 YAML 配置),不依赖额外数据库,配置文件即基础设施:
# packages/services/api-gateway/kong.yml
_format_version: "3.0"
services:
# ===== auth-service =====
- name: auth-service
url: http://auth-service:3001
routes:
- name: auth-routes
paths:
- /api/v1/auth
strip_path: false
# ===== agent-service =====
- name: agent-service
url: http://agent-service:3002
routes:
- name: agent-routes
paths:
- /api/v1/agent
strip_path: false
- name: agent-ws
paths:
- /ws/agent
strip_path: false
protocols:
- http
- https
- ws
- wss
# ===== ops-service =====
- name: ops-service
url: http://ops-service:3003
routes:
- name: ops-routes
paths:
- /api/v1/ops
strip_path: false
# ===== inventory-service =====
- name: inventory-service
url: http://inventory-service:3004
routes:
- name: inventory-routes
paths:
- /api/v1/inventory
strip_path: false
# ===== monitor-service =====
- name: monitor-service
url: http://monitor-service:3005
routes:
- name: monitor-routes
paths:
- /api/v1/monitor
strip_path: false
# ===== comm-service =====
- name: comm-service
url: http://comm-service:3006
routes:
- name: comm-routes
paths:
- /api/v1/comm
strip_path: false
- name: comm-ws
paths:
- /ws/comm
strip_path: false
protocols:
- http
- https
- ws
- wss
# ===== voice-service =====
- name: voice-service
url: http://voice-service:3008
routes:
- name: voice-ws
paths:
- /ws/voice
strip_path: false
protocols:
- ws
- wss
- name: voice-api
paths:
- /api/v1/voice
strip_path: false
- name: twilio-webhook
paths:
- /api/v1/twilio
strip_path: false
# ===== audit-service =====
- name: audit-service
url: http://audit-service:3007
routes:
- name: audit-routes
paths:
- /api/v1/audit
strip_path: false
# ===== 全局插件 =====
plugins:
# JWT 认证(全局)
- name: jwt
config:
key_claim_name: kid
claims_to_verify:
- exp
# 排除登录和健康检查路由
route: null
service: null
# CORS
- name: cors
config:
origins:
- http://localhost:3000 # Web Admin (Next.js dev)
- https://admin.it0.your-domain.com
methods:
- GET
- POST
- PUT
- DELETE
- PATCH
- OPTIONS
headers:
- Authorization
- Content-Type
credentials: true
# 限流(全局默认)
- name: rate-limiting
config:
minute: 120
policy: redis
redis_host: redis
redis_port: 6379
# 请求日志
- name: file-log
config:
path: /dev/stdout
reopen: true
# ===== 路由级插件覆盖 =====
# auth 路由不需要 JWT(登录端点)
- name: jwt
route: auth-routes
enabled: false
# agent WebSocket 更高限流
- name: rate-limiting
route: agent-ws
config:
minute: 30
policy: redis
redis_host: redis
# audit 路由只允许 admin 角色
- name: acl
route: audit-routes
config:
allow:
- admin
8.2 Kong Docker 配置
# packages/services/api-gateway/Dockerfile
FROM kong:3.7-alpine
# 复制声明式配置
COPY kong.yml /etc/kong/kong.yml
# 环境变量设为 DB-less 模式
ENV KONG_DATABASE=off
ENV KONG_DECLARATIVE_CONFIG=/etc/kong/kong.yml
ENV KONG_PROXY_LISTEN=0.0.0.0:8000
ENV KONG_ADMIN_LISTEN=0.0.0.0:8001
ENV KONG_LOG_LEVEL=info
# deploy/docker-compose.yml 中的 Kong 服务
services:
api-gateway:
build: ../packages/services/api-gateway
ports:
- "8000:8000" # Proxy(客户端访问)
- "8001:8001" # Admin API(仅内部使用)
depends_on:
- auth-service
- agent-service
- ops-service
- inventory-service
- monitor-service
- comm-service
- audit-service
- redis
healthcheck:
test: ["CMD", "kong", "health"]
interval: 10s
timeout: 5s
retries: 3
networks:
- it0-network
8.3 WebSocket 代理注意事项
Kong 原生支持 WebSocket 代理,关键配置:
protocols包含ws和wss- 建议为 WebSocket 路由单独设置更宽松的超时:连接超时 60s、读写超时 3600s
- Agent 流式输出和实时通信都走 WebSocket,需要确保 Kong 不会过早断开长连接
9. Claude Skills 支持(Anthropic Claude Skills)
9.1 Skills 与 Runbook 的关系
Claude Skills Runbook
(底层能力单元) (高层业务流程)
┌─────────────┐ ┌──────────────────┐
│ /inspect │─── 被组合使用 ──→ │ 每日巡检 Runbook │
│ /cleanup │─── 被组合使用 ──→ │ (调用多个 Skills) │
│ /diagnose │ │ │
└─────────────┘ └──────────────────┘
Skills = 原子操作(单个技能,Claude Code 原生识别)
Runbook = 编排流程(多步骤,包含审批、通知、条件分支)
9.2 Skill 定义规范
每个 Skill 是一个目录,包含 SKILL.md(YAML frontmatter + Markdown 指令):
# .claude/skills/inspect/SKILL.md
---
name: inspect
description: 对指定服务器执行全面健康检查(CPU/MEM/DISK/网络/服务状态),返回结构化报告
argument-hint: "[server-name|all]"
allowed-tools: Bash, Read, Grep
disable-model-invocation: false
---
# 服务器健康检查
## 参数
- `$0`: 目标服务器名称,或 `all` 检查所有服务器
## 执行步骤
1. **确认目标**:如果 `$0` 是 `all`,从服务器清单获取所有在线服务器
2. **基础指标采集**(对每台服务器):
- `ssh {server} 'top -bn1 | head -5'` — CPU 负载
- `ssh {server} 'free -h'` — 内存使用
- `ssh {server} 'df -h'` — 磁盘使用
- `ssh {server} 'ss -tuln'` — 网络端口
- `ssh {server} 'uptime'` — 运行时间
3. **服务状态**:
- `ssh {server} 'systemctl list-units --failed'` — 失败的服务
- `ssh {server} 'docker ps --format "table {{.Names}}\t{{.Status}}"'` — 容器状态(如适用)
4. **异常检测**:标记 CPU>80%、MEM>85%、DISK>90% 的指标
5. **输出结构化报告**:按服务器分组,标记异常项
## 输出格式
```markdown
## 巡检报告 — {timestamp}
### {server-name} ● 正常 / ⚠ 告警 / 🔴 异常
| 指标 | 值 | 状态 |
|------|-----|------|
| CPU | 45% | ✓ |
| MEM | 62% | ✓ |
| DISK | 92% | ⚠ |
...
```yaml
# .claude/skills/deploy/SKILL.md
---
name: deploy
description: 将指定应用部署到目标环境(支持 Docker 和 K8s)
argument-hint: "[app-name] [environment]"
allowed-tools: Bash, Read, Grep, Glob
disable-model-invocation: true
---
# 应用部署
## 参数
- `$0`: 应用名称
- `$1`: 目标环境 (dev/staging/prod)
## 前置检查
1. 确认应用 `$0` 存在于部署配置中
2. 确认目标环境 `$1` 的服务器可达
3. 如果是 prod 环境,明确提示需要审批
## 部署流程
1. 拉取最新镜像 / 构建产物
2. 备份当前版本信息(用于回滚)
3. 执行滚动更新
4. 等待健康检查通过
5. 验证服务正常响应
6. 输出部署摘要(版本号、耗时、服务状态)
## 回滚预案
如果部署后健康检查失败,自动执行 /rollback $0 $1
# .claude/skills/diagnose/SKILL.md
---
name: diagnose
description: 诊断服务器或服务的异常问题,自动收集日志和上下文
argument-hint: "[server-name] [symptom-description]"
allowed-tools: Bash, Read, Grep, Glob
disable-model-invocation: false
---
# 故障诊断
## 参数
- `$0`: 目标服务器
- `$ARGUMENTS`: 症状描述(如 "nginx 502 错误"、"磁盘 IO 高")
## 诊断框架
### 1. 收集上下文
- 系统负载: `uptime`, `top -bn1`, `vmstat 1 5`
- 磁盘 IO: `iostat -x 1 5`
- 网络: `netstat -s`, `ss -s`
- 最近错误日志: `journalctl -p err --since "1 hour ago"`
### 2. 针对症状深入
根据 `$ARGUMENTS` 描述的症状,针对性收集:
- HTTP 错误 → Nginx/Apache 错误日志 + upstream 状态
- 高负载 → 进程列表 + 资源占用排行
- 磁盘满 → 大文件查找 + 日志增长速率
- 内存泄漏 → 进程内存排行 + OOM killer 日志
### 3. 分析与建议
- 总结根因分析
- 提供修复建议(标明风险等级)
- 如果有明确的修复命令,提示是否执行
9.3 Skills 管理在 agent-service 中的实现
agent-service/
├── src/
│ ├── infrastructure/
│ │ ├── engines/
│ │ │ └── claude-code-cli/
│ │ │ ├── claude-code-engine.ts
│ │ │ └── ...
│ │ └── skills/ # ★ Skills 管理层
│ │ ├── skill-manager.service.ts # Skill CRUD(读写 .claude/skills/)
│ │ ├── skill-sync.service.ts # DB ↔ 文件系统双向同步
│ │ ├── skill-template-renderer.ts # 模板变量渲染($0, $ARGUMENTS 等)
│ │ └── skill-validator.service.ts # Skill 格式校验(frontmatter + 内容)
// infrastructure/skills/skill-manager.service.ts
@Injectable()
export class SkillManagerService {
private readonly skillsDir: string;
constructor(private readonly configService: ConfigService) {
// Skills 目录:项目根目录下的 .claude/skills/
this.skillsDir = path.join(
this.configService.get('PROJECT_ROOT', process.cwd()),
'.claude', 'skills',
);
}
/** 列出所有已注册的 Skills */
async listSkills(): Promise<SkillDefinition[]> {
const dirs = await fs.readdir(this.skillsDir, { withFileTypes: true });
const skills: SkillDefinition[] = [];
for (const dir of dirs.filter(d => d.isDirectory())) {
const skillPath = path.join(this.skillsDir, dir.name, 'SKILL.md');
if (await this.fileExists(skillPath)) {
const content = await fs.readFile(skillPath, 'utf-8');
skills.push(this.parseSkillMd(dir.name, content));
}
}
return skills;
}
/** 创建/更新 Skill(从 Web Admin 配置写入文件系统) */
async upsertSkill(skill: CreateSkillDto): Promise<void> {
const skillDir = path.join(this.skillsDir, skill.name);
await fs.mkdir(skillDir, { recursive: true });
const content = this.buildSkillMd(skill);
await fs.writeFile(path.join(skillDir, 'SKILL.md'), content, 'utf-8');
// 如果有附带脚本
if (skill.scripts?.length) {
const scriptsDir = path.join(skillDir, 'scripts');
await fs.mkdir(scriptsDir, { recursive: true });
for (const script of skill.scripts) {
await fs.writeFile(path.join(scriptsDir, script.name), script.content, 'utf-8');
}
}
}
/** 删除 Skill */
async deleteSkill(name: string): Promise<void> {
const skillDir = path.join(this.skillsDir, name);
await fs.rm(skillDir, { recursive: true, force: true });
}
/** 通过 Claude Code CLI 调用 Skill */
invokeSkillPrompt(skillName: string, args: string): string {
// 返回格式化的 prompt,让 Claude Code 识别为 Skill 调用
return `/${skillName} ${args}`;
}
/** 解析 SKILL.md (YAML frontmatter + Markdown body) */
private parseSkillMd(dirName: string, content: string): SkillDefinition {
const frontmatterMatch = content.match(/^---\n([\s\S]*?)\n---\n([\s\S]*)$/);
if (!frontmatterMatch) {
return { name: dirName, description: '', content, frontmatter: {} };
}
const frontmatter = yaml.parse(frontmatterMatch[1]);
const body = frontmatterMatch[2];
return {
name: frontmatter.name || dirName,
description: frontmatter.description || '',
argumentHint: frontmatter['argument-hint'],
allowedTools: frontmatter['allowed-tools'],
disableModelInvocation: frontmatter['disable-model-invocation'] ?? false,
content: body,
frontmatter,
};
}
/** 构建 SKILL.md 内容 */
private buildSkillMd(skill: CreateSkillDto): string {
const frontmatter = yaml.stringify({
name: skill.name,
description: skill.description,
'argument-hint': skill.argumentHint,
'allowed-tools': skill.allowedTools,
'disable-model-invocation': skill.disableModelInvocation ?? false,
});
return `---\n${frontmatter}---\n\n${skill.content}`;
}
}
9.4 Skill 与引擎的集成
// Claude Code CLI 引擎中调用 Skill 的两种方式:
// 方式 1: 直接在 prompt 中使用 /skill-name(Claude Code 自动识别)
const args = ['-p', `/inspect all`, '--output-format', 'stream-json', ...];
// 方式 2: Skill 内容已通过 .claude/skills/ 目录自动加载到 Claude Code 上下文中
// Claude Code 启动时读取所有 Skills 的 description,自主决定何时使用
// 当用户说"检查一下服务器"时,Claude Code 会自动匹配 /inspect skill
// 方式 3: Claude API 引擎 — 手动注入 Skill 内容到 system prompt
// 当切换到 Claude API 引擎时,SkillManager 读取 Skill 内容,拼接到 system prompt 中
9.5 Skill 数据库表(agent-service)
-- 追加到 it0_agent schema
-- Skills 注册表(与文件系统双向同步)
CREATE TABLE skills (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(64) NOT NULL UNIQUE, -- kebab-case 名称
description TEXT NOT NULL,
argument_hint VARCHAR(200),
allowed_tools VARCHAR(50)[],
disable_model_invocation BOOLEAN NOT NULL DEFAULT FALSE,
content TEXT NOT NULL, -- Markdown 指令体
frontmatter JSONB NOT NULL DEFAULT '{}', -- 完整 frontmatter
version INTEGER NOT NULL DEFAULT 1,
is_active BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Skill 执行记录
CREATE TABLE skill_executions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
skill_name VARCHAR(64) NOT NULL,
session_id UUID REFERENCES agent_sessions(id),
arguments TEXT,
engine_type VARCHAR(20) NOT NULL,
status VARCHAR(20) NOT NULL, -- 'success' | 'failure' | 'cancelled'
duration_ms INTEGER,
executed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_skill_executions_name ON skill_executions(skill_name, executed_at);
9.6 引擎兼容性:Skills 在三种引擎下的行为
| 引擎 | Skill 加载方式 | Skill 调用方式 |
|---|---|---|
| Claude Code CLI | .claude/skills/ 目录自动发现 |
/skill-name args(原生支持) |
| Claude API | SkillManager 读取 → 拼接到 system prompt |
prompt 中包含 Skill 指令文本 |
| Custom Agent | SkillManager 读取 → 注入到自研引擎上下文 |
自定义调用机制 |
当切换引擎时,SkillManager 确保 Skills 的跨引擎可移植性:
- Claude Code CLI 原生读
.claude/skills/目录 - 其他引擎通过
SkillManager.getSkillContent(name)获取 Skill 内容,手动注入
10. 运维专用 System Prompt 模板
// infrastructure/engines/claude-code-cli/system-prompt-builder.ts
export class SystemPromptBuilder {
build(context: OpsContext): string {
return `
你是 IT0 运维智能体,负责管理以下服务器集群。
## 你管理的环境
${context.servers.map(s => `- ${s.name} (${s.host}) [${s.environment}/${s.role}]`).join('\n')}
## 集群信息
${context.clusters.map(c => `- ${c.name} (${c.type}) — ${c.description}`).join('\n')}
## 操作规范
### 权限分级
- Level 0(自动执行):查看类命令(df, free, top, kubectl get, docker ps, cat/grep 日志)
- Level 1(需确认):服务重启、日志清理、配置查看
- Level 2(需审批):部署、扩缩容、数据库操作、配置修改
- Level 3(禁止):rm -rf /, DROP DATABASE, shutdown, reboot, 格式化磁盘
### 执行原则
1. 每次操作前先说明你要做什么、为什么
2. 执行破坏性操作前,先展示完整命令
3. 每次操作后验证结果(检查返回码、确认服务状态)
4. 发现异常时自动收集诊断信息(日志末尾、资源使用、进程列表、网络状态)
5. 如果不确定,宁可多查一步也不要盲目执行
6. 操作完成后给出简洁总结
### 禁止行为
- 绝不猜测密码或凭证
- 绝不修改 SSH 配置或防火墙规则(除非明确指示)
- 绝不安装未经授权的软件包
- 绝不在生产环境执行未经验证的脚本
### SSH 访问
使用预配置的 SSH 别名:ssh ${context.sshUser}@{server_host}
`.trim();
}
}
11. 项目初始化与目录结构
11.1 Monorepo 结构
IT0/
├── docs/ # 文档
│ ├── backend-guide.md # 本文档
│ ├── flutter-guide.md # Flutter 前端指导
│ └── web-admin-guide.md # PC Web 管理前端指导
│
├── packages/
│ ├── shared/ # 共享包
│ │ ├── proto/ # gRPC Protobuf 定义
│ │ ├── events/ # 事件类型定义
│ │ └── utils/ # 公共工具
│ │
│ └── services/ # 微服务
│ ├── api-gateway/ # Kong 声明式配置
│ │ ├── kong.yml # 路由/服务/插件声明
│ │ ├── Dockerfile # Kong 自定义镜像
│ │ └── plugins/ # 自定义插件(如需)
│ ├── auth-service/
│ ├── agent-service/ # ★ 核心智能体服务
│ ├── ops-service/
│ ├── inventory-service/
│ ├── monitor-service/
│ ├── comm-service/
│ ├── voice-service/ # ★ Pipecat 语音对话引擎 (Python)
│ └── audit-service/
│
├── .claude/ # ★ Claude Code Skills 目录
│ ├── skills/ # 运维技能定义
│ │ ├── inspect/SKILL.md # 全面巡检 Skill
│ │ ├── deploy/SKILL.md # 部署 Skill
│ │ ├── rollback/SKILL.md # 回滚 Skill
│ │ ├── cleanup/SKILL.md # 清理 Skill
│ │ ├── diagnose/SKILL.md # 故障诊断 Skill
│ │ ├── scale/SKILL.md # 扩缩容 Skill
│ │ ├── backup-check/SKILL.md # 备份验证 Skill
│ │ └── security-audit/SKILL.md # 安全审计 Skill
│ ├── commands/ # 兼容旧版 slash commands
│ └── settings.json # Hook + 权限配置
│
├── ops-config/ # 运维配置
│ ├── system-prompts/ # System Prompt 模板
│ ├── hooks/ # Claude Code Hook 脚本
│ │ └── pre-tool-use-hook.py
│ ├── runbooks/ # Runbook 模板(高级,多步骤)
│ └── risk-patterns.json # 危险命令模式库
│
├── deploy/
│ ├── docker-compose.yml # 开发环境
│ ├── docker-compose.prod.yml # 生产环境
│ └── k8s/ # Kubernetes 部署清单
│
├── package.json # Monorepo root (pnpm workspace)
├── pnpm-workspace.yaml
├── tsconfig.base.json
└── .env.example
11.2 单个微服务标准结构(Clean Architecture)
{service-name}/
├── src/
│ ├── domain/ # 领域层 — 零外部依赖
│ │ ├── entities/ # 聚合根 & 实体
│ │ ├── value-objects/ # 值对象
│ │ ├── ports/ # 端口(接口定义)
│ │ │ ├── inbound/ # 入站端口(用例接口)
│ │ │ └── outbound/ # 出站端口(依赖接口)
│ │ ├── events/ # 领域事件
│ │ └── services/ # 领域服务
│ │
│ ├── application/ # 应用层 — 用例编排
│ │ ├── use-cases/ # 用例实现
│ │ └── dto/ # 数据传输对象
│ │
│ ├── infrastructure/ # 基础设施层 — 适配器实现
│ │ ├── persistence/ # TypeORM 实体 & 仓储
│ │ ├── messaging/ # 消息队列
│ │ ├── clients/ # 外部服务客户端
│ │ └── config/ # 配置
│ │
│ ├── interfaces/ # 接口层 — 控制器
│ │ ├── rest/ # REST API
│ │ ├── grpc/ # gRPC 服务
│ │ └── ws/ # WebSocket 网关
│ │
│ ├── app.module.ts
│ └── main.ts
│
├── test/
│ ├── unit/
│ ├── integration/
│ └── e2e/
│
├── Dockerfile
├── package.json
└── tsconfig.json
12. 开发路线图
Phase 1: 基础骨架(Week 1-2)
- Monorepo 初始化(pnpm workspace + NestJS CLI)
- auth-service: JWT 认证 + RBAC
- agent-service: Claude Code CLI 引擎 + 流式输出
- 命令风险分级 + Hook 拦截脚本
- WebSocket 实时推送
Phase 2: 运维核心(Week 3-4)
- inventory-service: 服务器/凭证管理
- ops-service: 任务创建 + 审批工作流
- 端到端流程:前端发起任务 → Agent 执行 → 审批 → 完成
- Runbook 模板系统
Phase 3: 监控告警(Week 5-6)
- monitor-service: 健康检查 + 指标采集
- 告警规则 + 自动触发 Runbook
- 告警 → 通知升级链路
Phase 4: 多渠道通信 + 语音对话(Week 7-9)
- comm-service: 推送 + 短信 + 邮件 + IM(Telegram/企业微信)
- voice-service: Pipecat + faster-whisper + Kokoro-82M 部署
- Flutter App WebSocket 音频流 ←→ voice-service 对接
- Pipecat 主动拨号(Twilio 电话线路)+ Media Streams 语音对话
- 三层递进升级链路端到端测试
Phase 5: 引擎扩展(Week 9-10)
- Claude API 引擎实现(备选方案)
- 引擎运行时切换 + 性能对比
- audit-service: 完整审计链
13. 环境变量配置
# .env.example
# ===== 全局 =====
NODE_ENV=development
LOG_LEVEL=debug
# ===== PostgreSQL =====
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
POSTGRES_USER=it0
POSTGRES_PASSWORD=changeme
POSTGRES_DB=it0
# ===== Redis =====
REDIS_HOST=localhost
REDIS_PORT=6379
# ===== Agent Engine =====
AGENT_ENGINE_TYPE=claude_code_cli # claude_code_cli | claude_api | custom
CLAUDE_CODE_PATH=/usr/local/bin/claude # Claude Code CLI 路径
ANTHROPIC_API_KEY=sk-ant-xxx # Claude API 备选方案
# ===== 安全 =====
VAULT_MASTER_KEY=your-256-bit-key # 凭证加密密钥
JWT_SECRET=your-jwt-secret
JWT_EXPIRES_IN=24h
# ===== 通信渠道(comm-service)=====
SMTP_HOST=smtp.gmail.com
SMTP_PORT=587
SMTP_USER=xxx
SMTP_PASSWORD=xxx
TELEGRAM_BOT_TOKEN=xxx
WECHAT_CORP_ID=xxx
WECHAT_CORP_SECRET=xxx
# ===== voice-service(Pipecat 语音对话引擎)=====
VOICE_SERVICE_URL=http://voice-service:3008 # voice-service 内部地址
WHISPER_MODEL=large-v3 # faster-whisper 模型 (base/small/medium/large-v3)
KOKORO_MODEL=kokoro-82m # Kokoro TTS 模型
VOICE_DEVICE=cuda # STT/TTS 推理设备 (cuda/cpu)
# ===== Twilio(电话线路载体:Pipecat 通过它拨号)=====
TWILIO_ACCOUNT_SID=ACxxx
TWILIO_AUTH_TOKEN=xxx
TWILIO_PHONE_NUMBER=+1234567890
TWILIO_MEDIA_STREAMS_URL=wss://voice-service:3008/twilio-stream # Media Streams 转接地址
14. 关键设计决策备忘
| 决策 | 选择 | 理由 |
|---|---|---|
| 引擎抽象层 | Strategy + Adapter | 运行时切换,不需要重启服务 |
| 服务间同步通信 | gRPC | 类型安全、高性能、双向流 |
| 服务间异步通信 | Redis Streams | 轻量、可靠、不需要额外中间件 |
| 数据库 | PostgreSQL(每服务独立 schema) | 同一个 PG 实例多 schema,开发简单;生产可拆分 |
| 凭证存储 | AES-256-GCM + PostgreSQL | 不引入 HashiCorp Vault 的复杂度,够用 |
| 审计日志 | Append-Only + 数据库级权限控制 | 防篡改,满足合规需求 |
| 定时任务 | Bull Queue (Redis-backed) | NestJS 原生支持,可靠的延迟/重试机制 |
| 监控指标存储 | PostgreSQL 分区表 | 初期够用;后期可迁移 TimescaleDB |
| 前后端通信 | REST + WebSocket | REST 用于 CRUD,WebSocket 用于实时流 |
| API 网关 | Kong DB-less 声明式 | 无需额外 DB,YAML 配置即基础设施,原生支持 WebSocket/JWT/限流 |
| AI Skills | Anthropic Claude Skills (.claude/skills/) |
原生 Claude Code 支持 + DB 同步实现跨引擎可移植 |
| Skills vs Runbook | Skills=原子技能,Runbook=编排流程 | Skills 是底层能力单元,Runbook 组合 Skills 实现多步骤业务流程 |
| 多租户隔离 | Schema-per-Tenant + 行级 tenant_id | Schema 级隔离安全性高,tenant_id 列防御纵深;避免 Database-per-Tenant 的运维复杂度 |
| 租户上下文 | AsyncLocalStorage | 无侵入式传播,不需要每个方法显式传参;NestJS 中间件统一注入 |
| 租户 Schema 管理 | TypeORM SET search_path |
动态切换,无需维护多 DataSource;QueryRunner 级别隔离 |
| 自治运维范式 | 驻留指令(Standing Orders) | 对话定义 → 结构化提取 → 自主执行 → 智能升级;避免 24/7 人工在线 |
| 驻留指令执行 | Bull Queue + cron 调度 | 可靠的定时触发、重试、并发控制;执行记录完整审计 |
| 智能升级通道 | 三层递进(推送→电话唤醒→扩大通知) | App 推送优先;2分钟无响应→电话唤醒;5分钟→IM/短信;10分钟→备用联系人/邮件 |
| 语音对话引擎 | Pipecat (Python, 10.1k stars) | 原生 Claude 集成 + 自部署 STT/TTS + 打断支持 + < 500ms 延迟;不依赖外部语音 API |
| STT 选型 | faster-whisper (large-v3) | 20.6k stars,4x 快于原版 Whisper,支持 GPU + 8bit 量化,中英文准确率高 |
| TTS 选型 | Kokoro-82M | Apache 开源,82M 参数质量媲美大模型,中英双语,CPU/GPU 均可运行 |
| Twilio 定位 | 纯电话线路载体 | Pipecat 通过 Twilio REST API 拨号 + Media Streams 传输音频;无 IVR,Agent 接通即说话 |