it0/docs/backend-guide.md

119 KiB
Raw Blame History

IT0 后端开发指导文档

IT Operations Intelligent Agent — 基于 Claude Code 本体的服务器集群运维智能体

1. 项目概述

1.1 定位

IT0 是一个服务器集群运维智能体平台,核心理念是将 Claude Code 作为运维大脑,通过自建的调度层和安全层,实现 AI 驱动的自动化运维。

1.2 核心设计原则

  • 引擎可替换Claude Code CLI / Claude API / 自研 Agent 可无缝切换
  • 安全第一:三层防御模型(分级权限 + Hook 拦截 + 架构隔离)
  • 人在回路高风险操作必须人工审批AI 只负责"看"和"想"
  • 通信多元:支持语音、文字、短信、自动电话、社交通信、邮件等多渠道通知与交互
  • 多租户 SaaS:从第一天起支持多租户隔离,未来可对外提供服务

1.3 技术栈

层面 技术选型
主体语言 TypeScript (NestJS)
架构模式 DDD + Clean Architecture + 微服务
数据库 PostgreSQL (每个服务独立 schema)
消息队列 Redis Streams / Bull Queue
服务通信 gRPC (服务间) + REST (对外 API) + WebSocket (实时推送)
容器化 Docker + Docker Compose (开发) / Kubernetes (生产)
API 网关 Kong (DB-less 模式,声明式配置)
监控 Prometheus + Grafana
日志 ELK Stack / Loki
AI Skills Anthropic Claude Skills.claude/skills/ 目录规范)
语音对话引擎 Pipecat (Python) + faster-whisper (STT) + Kokoro-82M (TTS) + Silero VAD
电话线路 Twilio Voice仅作为 Pipecat 的电话线路载体,拨号/接听)

2. 整体架构

2.1 系统架构图

┌─────────────────────────────────────────────────────────────┐
│                    Flutter Android App                       │
│         (语音/文字交互 + 仪表盘 + 任务审批 + 日志流)          │
└──────────────────────────┬──────────────────────────────────┘
                           │ HTTPS / WebSocket
                           ▼
┌─────────────────────────────────────────────────────────────┐
│                      API Gateway (Kong)                      │
│              路由 / 认证 / 限流 / 日志                        │
└──────┬────────┬────────┬────────┬────────┬────────┬─────────┘
       │        │        │        │        │        │
       ▼        ▼        ▼        ▼        ▼        ▼
 ┌─────────┐┌────────┐┌────────┐┌────────┐┌────────┐┌────────┐
 │  auth   ││ agent  ││  ops   ││  inv   ││monitor ││  comm  │
 │ service ││service ││service ││service ││service ││service │
 └─────────┘└───┬────┘└────────┘└────────┘└────────┘└────────┘
                │                                     │
                ▼                                     ▼
 ┌──────────────────────────┐          ┌──────────────────────┐
 │    Agent Engine Layer    │          │   通信渠道适配器       │
 │ ┌──────────────────────┐ │          │ ┌──────┐ ┌────────┐  │
 │ │  Claude Code CLI     │ │          │ │ SMS  │ │ IM     │  │
 │ │  Claude API          │ │          │ │Email │ │ Push   │  │
 │ │  Custom Agent        │ │          │ └──────┘ └────────┘  │
 │ └──────────────────────┘ │          └──────────┬───────────┘
 └──────────┬───────────────┘                     │
            │                           ┌─────────▼────────────┐
            │                           │   voice-service      │
            │                           │  (Pipecat + Whisper) │
            │                           │  实时语音对话引擎     │
            │                           └──────────────────────┘
            │ SSH / kubectl / docker
            ▼
 ┌──────────────────────────┐
 │    目标服务器集群          │
 │  prod-1 / prod-2 / k8s   │
 └──────────────────────────┘

2.2 微服务拆分

服务 端口 职责 数据库 Schema
api-gateway 8000 路由、认证、限流
auth-service 3001 用户认证、RBAC、API Key it0_auth
agent-service 3002 智能体引擎、会话管理、命令执行 it0_agent
ops-service 3003 运维任务、审批工作流、Runbook it0_ops
inventory-service 3004 资产管理、凭证管理、SSH 配置 it0_inventory
monitor-service 3005 健康检查、指标、告警 it0_monitor
comm-service 3006 多渠道通信(推送/短信/邮件/IM it0_comm
audit-service 3007 操作审计、合规日志 it0_audit
voice-service 3008 实时语音对话引擎Pipecat + Whisper + Kokoro — (无状态)

3. DDD 限界上下文详细设计

3.1 Agent Core智能体核心agent-service

这是整个系统的大脑,也是架构灵活性的关键所在。

3.1.1 Clean Architecture 分层

agent-service/
├── src/
│   ├── domain/                          # 领域层(零依赖)
│   │   ├── entities/
│   │   │   ├── agent-session.entity.ts       # 会话聚合根
│   │   │   ├── agent-task.entity.ts          # 任务实体
│   │   │   ├── command-record.entity.ts      # 命令记录
│   │   │   └── standing-order.entity.ts      # ★ 驻留指令(自治任务定义)
│   │   ├── value-objects/
│   │   │   ├── agent-engine-type.vo.ts       # 引擎类型ClaudeCode/API/Custom
│   │   │   ├── command-risk-level.vo.ts      # 风险等级L0-L3
│   │   │   ├── task-status.vo.ts             # 任务状态
│   │   │   └── session-id.vo.ts
│   │   ├── ports/                            # 端口(接口定义)
│   │   │   ├── inbound/
│   │   │   │   ├── execute-task.port.ts           # 执行任务用例
│   │   │   │   ├── approve-command.port.ts        # 审批命令用例
│   │   │   │   ├── switch-engine.port.ts          # 切换引擎用例
│   │   │   │   └── manage-standing-order.port.ts  # ★ 驻留指令管理
│   │   │   └── outbound/
│   │   │       ├── agent-engine.port.ts           # ★ 核心抽象 — 智能体引擎接口
│   │   │       ├── session-repository.port.ts
│   │   │       ├── command-guard.port.ts          # 命令安全守卫
│   │   │       ├── event-publisher.port.ts
│   │   │       └── stream-emitter.port.ts         # 流式输出发射器
│   │   ├── events/
│   │   │   ├── command-blocked.event.ts
│   │   │   ├── task-completed.event.ts
│   │   │   └── approval-required.event.ts
│   │   └── services/
│   │       ├── command-risk-classifier.ts     # 命令风险分级(领域服务)
│   │       ├── session-manager.ts
│   │       └── standing-order-extractor.ts  # ★ 对话 → 驻留指令提取
│   │
│   ├── application/                     # 应用层(用例编排)
│   │   ├── use-cases/
│   │   │   ├── execute-task.use-case.ts
│   │   │   ├── approve-command.use-case.ts
│   │   │   ├── cancel-task.use-case.ts
│   │   │   ├── get-session-history.use-case.ts
│   │   │   └── switch-engine.use-case.ts
│   │   └── dto/
│   │       ├── execute-task.dto.ts
│   │       ├── task-result.dto.ts
│   │       └── stream-event.dto.ts
│   │
│   ├── infrastructure/                  # 基础设施层(适配器实现)
│   │   ├── engines/                          # ★ 引擎适配器Strategy 模式)
│   │   │   ├── agent-engine.interface.ts          # 统一接口
│   │   │   ├── claude-code-cli/
│   │   │   │   ├── claude-code-engine.ts          # Claude Code CLI 适配器
│   │   │   │   ├── cli-process-manager.ts         # 进程管理spawn/kill/timeout
│   │   │   │   ├── stream-json-parser.ts          # stream-json 输出解析
│   │   │   │   ├── system-prompt-builder.ts       # 运维专用 prompt 构建
│   │   │   │   └── hook-config-manager.ts         # .claude/settings.json 管理
│   │   │   ├── claude-api/
│   │   │   │   ├── claude-api-engine.ts           # Claude API 直连适配器
│   │   │   │   ├── tool-executor.ts               # 工具执行器Bash/Read/Write
│   │   │   │   └── agent-loop.ts                  # 自建 Agent Loop
│   │   │   └── custom/
│   │   │       └── custom-engine.ts               # 未来自研引擎预留
│   │   ├── guards/                           # 命令安全守卫
│   │   │   ├── command-guard.service.ts           # 命令拦截实现
│   │   │   ├── risk-patterns.ts                   # 风险命令正则库
│   │   │   └── pre-tool-use-hook.py               # Claude Code Hook 脚本
│   │   ├── persistence/
│   │   │   ├── typeorm/
│   │   │   │   ├── session.orm-entity.ts
│   │   │   │   ├── task.orm-entity.ts
│   │   │   │   └── command-record.orm-entity.ts
│   │   │   └── repositories/
│   │   │       └── session.repository.ts
│   │   └── messaging/
│   │       ├── event-publisher.service.ts
│   │       └── stream-emitter.service.ts     # WebSocket 流推送
│   │
│   └── interfaces/                      # 接口层(控制器)
│       ├── rest/
│       │   ├── agent.controller.ts
│       │   └── session.controller.ts
│       └── ws/
│           └── agent-stream.gateway.ts   # WebSocket 网关

3.1.2 ★ 核心接口AgentEngine引擎抽象

这是整个架构灵活性的核心,使用 Strategy + Adapter 模式

// domain/ports/outbound/agent-engine.port.ts

/**
 * 智能体引擎端口 — 所有引擎实现必须遵循此接口
 * Claude Code CLI / Claude API / 自研 Agent 均通过此接口接入
 */
export interface AgentEnginePort {
  /** 引擎类型标识 */
  readonly engineType: AgentEngineType;

  /**
   * 执行任务(流式输出)
   * @returns AsyncGenerator逐步 yield 流式事件
   */
  executeTask(params: EngineTaskParams): AsyncGenerator<EngineStreamEvent>;

  /** 取消正在执行的任务 */
  cancelTask(sessionId: string): Promise<void>;

  /** 继续会话(追加指令) */
  continueSession(sessionId: string, message: string): AsyncGenerator<EngineStreamEvent>;

  /** 健康检查 */
  healthCheck(): Promise<boolean>;
}

/** 引擎任务参数 */
export interface EngineTaskParams {
  sessionId: string;
  prompt: string;
  systemPrompt: string;
  allowedTools: string[];
  maxTurns: number;
  maxBudgetUsd?: number;
  /** 上下文附加信息(服务器清单、历史操作等) */
  context?: Record<string, unknown>;
  /** Skill 调用如果指定prompt 格式为 /{skillName} {args} */
  skill?: {
    name: string;
    arguments: string;
  };
}

/** 引擎流式事件(统一格式,各适配器负责转换) */
export type EngineStreamEvent =
  | { type: 'thinking'; content: string }
  | { type: 'text'; content: string }
  | { type: 'tool_use'; toolName: string; input: Record<string, unknown> }
  | { type: 'tool_result'; toolName: string; output: string; isError: boolean }
  | { type: 'approval_required'; command: string; riskLevel: RiskLevel; taskId: string }
  | { type: 'completed'; summary: string; tokensUsed?: number }
  | { type: 'error'; message: string; code: string };

3.1.3 Claude Code CLI 引擎实现

// infrastructure/engines/claude-code-cli/claude-code-engine.ts

@Injectable()
export class ClaudeCodeCliEngine implements AgentEnginePort {
  readonly engineType = AgentEngineType.CLAUDE_CODE_CLI;

  constructor(
    private readonly processManager: CliProcessManager,
    private readonly streamParser: StreamJsonParser,
    private readonly promptBuilder: SystemPromptBuilder,
    private readonly configService: ConfigService,
  ) {}

  async *executeTask(params: EngineTaskParams): AsyncGenerator<EngineStreamEvent> {
    const systemPromptPath = await this.promptBuilder.buildAndSave(params);

    const args = [
      '-p', params.prompt,
      '--system-prompt-file', systemPromptPath,
      '--output-format', 'stream-json',
      '--allowedTools', params.allowedTools.join(','),
      '--max-turns', String(params.maxTurns),
      '--session-id', params.sessionId,
      '--verbose',
      '--include-partial-messages',
    ];

    if (params.maxBudgetUsd) {
      args.push('--max-budget-usd', String(params.maxBudgetUsd));
    }

    const process = this.processManager.spawn('claude', args);

    for await (const rawEvent of this.streamParser.parse(process.stdout)) {
      // 将 Claude Code 原始 stream-json 事件转换为统一 EngineStreamEvent
      const event = this.mapToEngineEvent(rawEvent);
      if (event) yield event;
    }

    const exitCode = await this.processManager.waitForExit(process);
    if (exitCode !== 0) {
      yield { type: 'error', message: `Claude Code exited with code ${exitCode}`, code: 'CLI_EXIT_ERROR' };
    }
  }

  // ... 其他方法
}

3.1.4 Claude API 引擎实现(备选)

// infrastructure/engines/claude-api/claude-api-engine.ts

@Injectable()
export class ClaudeApiEngine implements AgentEnginePort {
  readonly engineType = AgentEngineType.CLAUDE_API;

  constructor(
    private readonly toolExecutor: ToolExecutor,  // 自建的 Bash/Read/Write 执行器
    private readonly configService: ConfigService,
  ) {}

  async *executeTask(params: EngineTaskParams): AsyncGenerator<EngineStreamEvent> {
    const client = new Anthropic({ apiKey: this.configService.get('ANTHROPIC_API_KEY') });

    const tools = this.buildToolDefinitions(params.allowedTools);
    let messages: MessageParam[] = [{ role: 'user', content: params.prompt }];

    // 自建 Agent Loop
    for (let turn = 0; turn < params.maxTurns; turn++) {
      const stream = client.messages.stream({
        model: this.configService.get('CLAUDE_MODEL', 'claude-sonnet-4-5-20250929'),
        max_tokens: 8192,
        system: params.systemPrompt,
        messages,
        tools,
      });

      for await (const event of stream) {
        const mapped = this.mapStreamEvent(event);
        if (mapped) yield mapped;
      }

      const response = await stream.finalMessage();

      // 检查是否需要执行工具
      const toolUseBlocks = response.content.filter(b => b.type === 'tool_use');
      if (toolUseBlocks.length === 0) break; // 无工具调用,结束

      // 执行工具并收集结果
      const toolResults = [];
      for (const block of toolUseBlocks) {
        const result = await this.toolExecutor.execute(block.name, block.input);
        yield { type: 'tool_result', toolName: block.name, output: result.output, isError: result.isError };
        toolResults.push({ type: 'tool_result' as const, tool_use_id: block.id, content: result.output });
      }

      messages = [...messages,
        { role: 'assistant', content: response.content },
        { role: 'user', content: toolResults },
      ];
    }
  }
}

3.1.5 引擎切换机制

// infrastructure/engines/engine-registry.ts

@Injectable()
export class EngineRegistry {
  private engines = new Map<AgentEngineType, AgentEnginePort>();

  constructor(
    @Optional() private readonly cliEngine: ClaudeCodeCliEngine,
    @Optional() private readonly apiEngine: ClaudeApiEngine,
    @Optional() private readonly customEngine: CustomEngine,
    private readonly configService: ConfigService,
  ) {
    if (cliEngine) this.engines.set(AgentEngineType.CLAUDE_CODE_CLI, cliEngine);
    if (apiEngine) this.engines.set(AgentEngineType.CLAUDE_API, apiEngine);
    if (customEngine) this.engines.set(AgentEngineType.CUSTOM, customEngine);
  }

  /** 获取当前活跃引擎 */
  getActiveEngine(): AgentEnginePort {
    const type = this.configService.get<AgentEngineType>(
      'AGENT_ENGINE_TYPE',
      AgentEngineType.CLAUDE_CODE_CLI,
    );
    const engine = this.engines.get(type);
    if (!engine) throw new Error(`Engine ${type} not registered`);
    return engine;
  }

  /** 运行时切换引擎 */
  switchEngine(type: AgentEngineType): AgentEnginePort {
    const engine = this.engines.get(type);
    if (!engine) throw new Error(`Engine ${type} not available`);
    // 更新配置
    this.configService.set('AGENT_ENGINE_TYPE', type);
    return engine;
  }

  /** 列出可用引擎 */
  listAvailable(): AgentEngineType[] {
    return [...this.engines.keys()];
  }
}

3.2 Operations运维操作ops-service

3.2.1 目录结构

ops-service/
├── src/
│   ├── domain/
│   │   ├── entities/
│   │   │   ├── ops-task.entity.ts            # 运维任务聚合根
│   │   │   ├── approval-request.entity.ts    # 审批请求
│   │   │   ├── runbook.entity.ts             # 运维手册
│   │   │   ├── standing-order.entity.ts      # ★ 驻留指令聚合根
│   │   │   └── standing-order-execution.entity.ts  # ★ 执行记录
│   │   ├── value-objects/
│   │   │   ├── task-type.vo.ts               # 巡检/部署/回滚/扩缩容/故障恢复
│   │   │   ├── approval-status.vo.ts         # pending/approved/rejected
│   │   │   └── execution-result.vo.ts
│   │   ├── ports/
│   │   │   ├── inbound/
│   │   │   │   ├── create-task.port.ts
│   │   │   │   ├── approve-task.port.ts
│   │   │   │   └── execute-runbook.port.ts
│   │   │   └── outbound/
│   │   │       ├── task-repository.port.ts
│   │   │       ├── agent-client.port.ts      # 调用 agent-service
│   │   │       ├── notification.port.ts      # 调用 comm-service
│   │   │       └── audit-logger.port.ts      # 调用 audit-service
│   │   └── services/
│   │       └── approval-workflow.service.ts   # 审批流程领域服务
│   │
│   ├── application/
│   │   └── use-cases/
│   │       ├── create-inspection-task.use-case.ts    # 创建巡检任务
│   │       ├── create-deployment-task.use-case.ts    # 创建部署任务
│   │       ├── approve-task.use-case.ts
│   │       ├── schedule-recurring-task.use-case.ts   # 定时任务
│   │       ├── execute-runbook.use-case.ts
│   │       ├── create-standing-order.use-case.ts     # ★ 创建驻留指令
│   │       ├── update-standing-order.use-case.ts     # ★ 修改驻留指令
│   │       └── execute-standing-order.use-case.ts    # ★ 执行驻留指令
│   │
│   ├── infrastructure/
│   │   ├── persistence/typeorm/
│   │   ├── clients/
│   │   │   ├── agent-client.service.ts       # gRPC 调用 agent-service
│   │   │   └── comm-client.service.ts        # gRPC 调用 comm-service
│   │   └── schedulers/
│   │       ├── cron-scheduler.service.ts     # 定时巡检调度
│   │       └── standing-order-executor.ts    # ★ 驻留指令执行引擎
│   │
│   └── interfaces/
│       └── rest/
│           ├── task.controller.ts
│           ├── approval.controller.ts
│           └── runbook.controller.ts

3.2.2 审批工作流

// domain/services/approval-workflow.service.ts

export class ApprovalWorkflowService {
  /**
   * 审批流程:
   * 1. agent-service 执行任务时遇到 Level 2 命令
   * 2. 发布 approval_required 事件
   * 3. ops-service 创建审批请求
   * 4. comm-service 通知人类(推送/短信/电话)
   * 5. 人类通过 Flutter App 或回复短信/邮件 审批
   * 6. ops-service 发布 approval_granted 事件
   * 7. agent-service 继续执行
   *
   * 超时策略:
   * - Level 1: 5 分钟超时,自动通过
   * - Level 2: 30 分钟超时,自动拒绝
   * - 紧急模式: 管理员可设置特定 Runbook 自动审批
   */
  async processApproval(request: ApprovalRequest): Promise<ApprovalResult> {
    // ...
  }
}

3.3 Inventory资产管理inventory-service

inventory-service/
├── src/
│   ├── domain/
│   │   ├── entities/
│   │   │   ├── server.entity.ts              # 服务器实体
│   │   │   ├── cluster.entity.ts             # 集群聚合根
│   │   │   ├── credential.entity.ts          # 凭证(加密存储)
│   │   │   └── ssh-config.entity.ts          # SSH 配置
│   │   ├── value-objects/
│   │   │   ├── server-role.vo.ts             # web/db/cache/worker/gateway
│   │   │   ├── environment.vo.ts             # dev/staging/prod
│   │   │   └── connection-info.vo.ts         # IP/端口/用户
│   │   └── ports/
│   │       └── outbound/
│   │           ├── server-repository.port.ts
│   │           └── credential-vault.port.ts  # 凭证保险库
│   │
│   ├── infrastructure/
│   │   ├── vault/
│   │   │   └── credential-vault.service.ts   # AES-256 加密存储
│   │   └── persistence/typeorm/
│   │
│   └── interfaces/
│       └── rest/
│           ├── server.controller.ts
│           └── cluster.controller.ts

关键:凭证安全

// infrastructure/vault/credential-vault.service.ts

@Injectable()
export class CredentialVaultService implements CredentialVaultPort {
  /**
   * 凭证存储规范:
   * - SSH 私钥、密码等敏感信息 AES-256-GCM 加密后存入 PostgreSQL
   * - 加密密钥从环境变量 VAULT_MASTER_KEY 读取
   * - 读取时解密到内存,绝不写入日志或磁盘临时文件
   * - Agent 需要 SSH 时,通过 inventory-service API 获取临时凭证
   *   (凭证有 TTL用后即弃
   */
}

3.4 Monitoring监控告警monitor-service

monitor-service/
├── src/
│   ├── domain/
│   │   ├── entities/
│   │   │   ├── health-check.entity.ts
│   │   │   ├── alert-rule.entity.ts          # 告警规则
│   │   │   ├── alert-event.entity.ts         # 告警事件
│   │   │   └── metric-snapshot.entity.ts
│   │   ├── value-objects/
│   │   │   ├── alert-severity.vo.ts          # info/warning/critical/fatal
│   │   │   ├── check-type.vo.ts              # ping/http/tcp/ssh/custom
│   │   │   └── metric-type.vo.ts             # cpu/mem/disk/network/custom
│   │   └── ports/outbound/
│   │       ├── metric-collector.port.ts       # 指标采集
│   │       └── alert-dispatcher.port.ts       # 告警分发 → comm-service
│   │
│   ├── infrastructure/
│   │   ├── collectors/
│   │   │   ├── ssh-collector.service.ts      # SSH 远程采集
│   │   │   ├── prometheus-collector.ts       # Prometheus 拉取
│   │   │   └── agent-collector.ts            # 通过 agent-service 执行采集命令
│   │   └── schedulers/
│   │       └── health-check-scheduler.ts     # 定时健康检查
│   │
│   └── interfaces/
│       └── rest/
│           ├── health-check.controller.ts
│           └── alert.controller.ts

告警 → 自动修复流程:

monitor-service 检测到异常
    ↓ 发布 AlertEvent
ops-service 接收告警
    ↓ 匹配 Runbook自动修复手册
    ↓ 如果有匹配的 Runbook 且风险等级允许
agent-service 执行修复
    ↓ 如果需要审批
comm-service 通知人类(推送+短信+电话升级)
    ↓ 人类审批
agent-service 继续执行
    ↓
monitor-service 验证恢复

3.5 Communication多渠道通信comm-service

这是联系人类的统一出口。

comm-service/
├── src/
│   ├── domain/
│   │   ├── entities/
│   │   │   ├── message.entity.ts              # 消息聚合根
│   │   │   ├── contact.entity.ts              # 联系人
│   │   │   ├── channel-config.entity.ts       # 渠道配置
│   │   │   └── escalation-policy.entity.ts    # 升级策略
│   │   ├── value-objects/
│   │   │   ├── channel-type.vo.ts             # push/sms/voice_call/email/telegram/wechat/voice_dialog
│   │   │   ├── message-priority.vo.ts         # low/normal/high/urgent
│   │   │   └── delivery-status.vo.ts          # pending/sent/delivered/failed
│   │   └── ports/outbound/
│   │       └── channel-adapter.port.ts        # ★ 通信渠道抽象接口
│   │
│   ├── infrastructure/
│   │   ├── channels/                          # ★ 渠道适配器Strategy 模式)
│   │   │   ├── channel-adapter.interface.ts
│   │   │   ├── websocket/
│   │   │   │   └── ws-channel.adapter.ts      # WebSocket 实时推送
│   │   │   ├── sms/
│   │   │   │   └── twilio-sms.adapter.ts      # Twilio SMS
│   │   │   ├── voice/
│   │   │   │   ├── pipecat-voice.adapter.ts   # ★ 主要Pipecat 实时语音对话
│   │   │   │   └── twilio-voice.adapter.ts    # Twilio 电话线路Pipecat 拨号载体)
│   │   │   ├── email/
│   │   │   │   └── smtp-email.adapter.ts      # SMTP 邮件
│   │   │   ├── telegram/
│   │   │   │   └── telegram-bot.adapter.ts    # Telegram Bot
│   │   │   ├── wechat/
│   │   │   │   └── wechat-work.adapter.ts     # 企业微信
│   │   │   └── voice-client/
│   │   │       └── voice-service-client.ts    # HTTP/gRPC 调用 voice-service
│   │   ├── escalation/
│   │   │   └── escalation-engine.service.ts   # 升级引擎
│   │   └── persistence/typeorm/
│   │
│   └── interfaces/
│       ├── rest/
│       │   ├── message.controller.ts
│       │   └── contact.controller.ts
│       └── ws/
│           └── realtime.gateway.ts

3.5.1 通信渠道抽象

// domain/ports/outbound/channel-adapter.port.ts

export interface ChannelAdapterPort {
  readonly channelType: ChannelType;

  /** 发送消息(文字/语音/富文本) */
  send(params: SendMessageParams): Promise<DeliveryResult>;

  /** 接收消息(双向渠道如 Telegram/微信) */
  onMessage?(callback: (msg: IncomingMessage) => void): void;

  /** 渠道是否可用 */
  isAvailable(): Promise<boolean>;
}

export interface SendMessageParams {
  to: Contact;
  content: string;
  priority: MessagePriority;
  /** 语音电话场景TTS 文本 */
  ttsText?: string;
  /** 审批场景:附带审批链接或回复指令 */
  approvalAction?: {
    taskId: string;
    approveKeyword: string;   // 如 "Y" 或 "approve"
    rejectKeyword: string;    // 如 "N" 或 "reject"
  };
}

3.5.2 升级策略Escalation

// infrastructure/escalation/escalation-engine.service.ts

/**
 * 通知升级策略(分层递进):
 *
 * ┌─── 第一层:推送 + 电话 ─────────────────────────────────────┐
 * │ 1. WebSocket + FCM 推送到 Flutter App通知卡片 * │    用户打开 App → Pipecat 语音对话App 内 WebSocket 音频)  │
 * │ 2. 如果 2 分钟内无响应 → Pipecat 通过 Twilio 拨出电话       │
 * │    管理员接听 → Agent 直接开口说话(同一 Pipecat 引擎)      │
 * │    "您好prod-2 磁盘 97%,我建议清理日志,您同意吗?"       │
 * │    管理员语音回复 → Agent 理解并执行                          │
 * └────────────────────────────────────────────────────────────┘
 *
 * ┌─── 第二层:扩大通知 ──────────────────────────────────────┐
 * │ 如果 5 分钟内仍无响应 → 短信 + IM (Telegram/企业微信)      │
 * │ 如果 10 分钟内无响应 → 通知备用联系人(重复第一层)         │
 * │ 如果 20 分钟内无响应 → 邮件通知管理组(完整报告)           │
 * └────────────────────────────────────────────────────────────┘
 *
 * 升级策略可按告警级别自定义:
 * - info:     只推送 App
 * - warning:  App + 短信/IM
 * - critical: App + Pipecat 电话语音对话
 * - fatal:    全渠道 + 备用联系人 + 管理组
 */

3.5.3 语音交互流程Pipecat 引擎)

IT0 使用 PipecatGitHub 10.1k stars作为实时语音对话引擎自部署 STT/TTS 模型,不依赖外部语音 API。

主要场景App 内语音对话

Flutter App麦克风录音
    ↓ WebSocket 音频流 (PCM 16kHz)
voice-service (Pipecat Pipeline)
    ├── Silero VAD → 检测语音活动 + 打断
    ├── faster-whisper → STT语音转文字
    ├── Claude API → LLM 理解 + 决策
    └── Kokoro-82M → TTS文字转语音
    ↓ WebSocket 音频流
Flutter App 实时播放语音回复
    ↓ 同步显示实时转写文字(辅助理解 + 留痕)

电话场景Pipecat 主动拨号2分钟 App 无响应后)

voice-service (Pipecat) 发起拨号请求
    ↓ Twilio REST API: 拨出电话到管理员手机
管理员手机响铃 → 接听
    ↓ Twilio Media Streams (WebSocket 双向音频)
voice-service (Pipecat Pipeline)  ← 同一个引擎
    ├── 与 App 内语音对话使用完全相同的 Pipeline
    ├── Agent 接通后立即开口:"您好,我是 IT0 Agent..."
    └── 唯一区别:音频通过 Twilio 电话线而非 Flutter WebSocket
    ↓ Twilio Media Streams
管理员通过电话与 Agent 语音对话

关键设计决策

  • Pipecat 是唯一的语音对话引擎,无论音频来自 App 还是电话线
  • Twilio 只是电话线路载体(拨号/接听),不负责任何 STT/TTS/IVR 逻辑
  • 无 IVR 菜单:不需要"按 1 按 2"Agent 接通就说话,管理员直接语音回复
  • 所有 STT/TTS 在本地 GPU 运行faster-whisper + Kokoro延迟 < 500ms
  • 支持打断barge-in:用户开始说话时 Pipecat 自动暂停 TTS 播放

3.5.4 Voice Dialogue Servicevoice-service

voice-service 是一个 Python 微服务,基于 Pipecat 框架运行实时语音对话 Pipeline。

voice-service/                    # Python (FastAPI + Pipecat)
├── app/
│   ├── main.py                   # FastAPI 入口
│   ├── config.py                 # 配置模型路径、端口、Claude API Key
│   ├── pipelines/
│   │   ├── base_pipeline.py      # ★ Pipecat Pipeline 定义
│   │   │   # VAD → STT → LLM → TTS → Audio Output
│   │   ├── app_transport.py      # Flutter WebSocket 音频传输
│   │   └── twilio_transport.py   # Twilio Media Streams 音频传输
│   ├── services/
│   │   ├── stt_service.py        # faster-whisper 配置(模型: large-v3, GPU
│   │   ├── tts_service.py        # Kokoro-82M 配置(中/英双语)
│   │   ├── vad_service.py        # Silero VAD 配置
│   │   └── llm_service.py        # Pipecat AnthropicLLMService (Claude API)
│   ├── context/
│   │   ├── session_context.py    # 对话上下文管理(从 agent-service 获取)
│   │   └── tool_handler.py       # Agent tool 调用转发给 agent-service
│   └── api/
│       ├── health.py             # 健康检查
│       ├── session_router.py     # 创建/结束语音对话会话
│       └── twilio_webhook.py     # Twilio Media Streams WebSocket 端点
├── models/                       # 本地模型文件
│   ├── whisper-large-v3/         # faster-whisper 模型
│   ├── kokoro-82m/               # Kokoro TTS 模型
│   └── silero-vad/               # Silero VAD 模型
├── Dockerfile
├── requirements.txt
└── docker-compose.voice.yml
# app/pipelines/base_pipeline.py — Pipecat Pipeline 核心

from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.services.anthropic import AnthropicLLMService
from pipecat.services.faster_whisper import FasterWhisperSTTService
from pipecat.services.kokoro import KokoroTTSService
from pipecat.vad.silero import SileroVADAnalyzer

async def create_voice_pipeline(
    transport,           # AppTransport 或 TwilioTransport
    session_context,     # 对话上下文(驻留指令、服务器信息等)
):
    """
    创建 Pipecat 语音对话 Pipeline
    - 音频输入 → VAD → STT → LLM → TTS → 音频输出
    - 支持打断:用户说话时自动停止 TTS
    - 支持 tool_useAgent 可调用 agent-service 的工具
    """
    stt = FasterWhisperSTTService(
        model="large-v3",
        language="zh",       # 中文为主,自动检测英文
        device="cuda",
    )

    llm = AnthropicLLMService(
        model="claude-sonnet-4-5-20250929",
        system_prompt=session_context.system_prompt,
        tools=session_context.available_tools,
    )

    tts = KokoroTTSService(
        model="kokoro-82m",
        voice="zh_female_1",  # 中文女声
    )

    pipeline = Pipeline([
        transport.input(),          # 音频输入
        SileroVADAnalyzer(),        # 语音活动检测
        stt,                        # 语音 → 文字
        llm,                        # 文字 → Agent 回复
        tts,                        # 回复 → 语音
        transport.output(),         # 音频输出
    ])

    return PipelineTask(pipeline, allow_interruptions=True)
# app/pipelines/app_transport.py — Flutter App WebSocket 传输

from pipecat.transports.network.websocket import WebSocketServerTransport

class AppTransport(WebSocketServerTransport):
    """
    Flutter App 通过 WebSocket 发送/接收音频流
    - 输入PCM 16kHz 16bit 单声道Flutter 录音格式)
    - 输出PCM 16kHz 16bit 单声道Flutter 播放格式)
    """
    def __init__(self, websocket):
        super().__init__(
            websocket=websocket,
            params=WebSocketServerParams(
                audio_in_sample_rate=16000,
                audio_out_sample_rate=16000,
                vad_enabled=True,
                vad_analyzer=SileroVADAnalyzer(),
            ),
        )
# app/pipelines/twilio_transport.py — Twilio 电话线兜底传输

from pipecat.transports.services.twilio import TwilioTransport

class TwilioPhoneTransport(TwilioTransport):
    """
    Twilio 电话线传输Pipecat 主动拨号 → 管理员接听 → Agent 直接开口说话
    - Pipecat 调用 Twilio REST API 拨出电话
    - 接通后通过 Media Streams WebSocket 双向传输音频
    - Agent 接通即开始语音汇报,无 IVR 菜单
    - 音频编解码mu-law 8kHz电话标准↔ PCM 16kHzWhisper 输入)
    """
    async def initiate_call(self, phone_number: str, voice_session_id: str):
        """主动拨出电话(由 SmartEscalationService 触发)"""
        # Twilio REST API 创建呼叫,连接到 Media Streams
        pass

Docker 配置

# docker-compose.voice.yml
voice-service:
  build: ./packages/services/voice-service
  ports:
    - "3008:3008"
  environment:
    - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
    - AGENT_SERVICE_URL=http://agent-service:3002
    - WHISPER_MODEL=large-v3
    - KOKORO_MODEL=kokoro-82m
    - DEVICE=cuda        # GPU 加速(生产环境)
  volumes:
    - ./models:/app/models   # 模型文件挂载
  deploy:
    resources:
      reservations:
        devices:
          - driver: nvidia
            count: 1
            capabilities: [gpu]
  healthcheck:
    test: ["CMD", "curl", "-f", "http://localhost:3008/health"]
    interval: 30s
    start_period: 60s   # 模型加载时间

3.6 Audit审计日志audit-service

audit-service/
├── src/
│   ├── domain/
│   │   ├── entities/
│   │   │   └── audit-log.entity.ts
│   │   └── value-objects/
│   │       ├── action-type.vo.ts         # command_executed/approval/login/config_change
│   │       └── audit-level.vo.ts
│   ├── application/
│   │   └── use-cases/
│   │       ├── record-audit.use-case.ts
│   │       └── query-audit-trail.use-case.ts
│   └── infrastructure/
│       └── persistence/
│           └── append-only-repository.ts  # 只追加,不可修改/删除

审计日志规范:

  • 所有 agent 执行的命令、结果、时间戳
  • 所有审批操作的决策人、时间、理由
  • 所有引擎切换操作
  • 只追加Append-Only,不可篡改

3.7 Auth认证授权auth-service

auth-service/
├── src/
│   ├── domain/
│   │   ├── entities/
│   │   │   ├── user.entity.ts
│   │   │   ├── role.entity.ts
│   │   │   └── api-key.entity.ts
│   │   └── value-objects/
│   │       ├── permission.vo.ts          # 细粒度权限
│   │       └── role-type.vo.ts           # admin/operator/viewer
│   └── infrastructure/
│       ├── jwt/
│       │   └── jwt.strategy.ts
│       └── guards/
│           └── rbac.guard.ts

RBAC 权限模型:

角色 查看仪表盘 发起巡检 审批操作 管理服务器 切换引擎
viewer
operator
admin

3.8 ★ 自治运维工作流 — 驻留指令Standing Orders

这是 IT0 的核心运维范式:管理员通过对话定义任务 → Agent 自主执行 → 遇到无法决策的事件智能升级给人类。

3.8.1 核心概念:驻留指令

驻留指令Standing Order 是管理员通过对话与 Agent 协商后生成的持久化自治任务定义。一旦创建Agent 将按照定义自主执行,无需管理员持续在线。

┌────────────────────────────────────────────────────────────────────────┐
│                      IT0 自治运维工作流                                  │
│                                                                        │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────────────┐      │
│  │  Phase 1     │    │  Phase 2     │    │  Phase 3             │      │
│  │  对话定义     │───→│  自治执行     │───→│  智能升级             │      │
│  │              │    │              │    │                      │      │
│  │ 管理员通过    │    │ Agent 按计划  │    │ 遇到无法决策的情况:  │      │
│  │ 文字/语音对话 │    │ 或事件触发    │    │                      │      │
│  │ 与 Agent 沟通 │    │ 自动执行任务  │    │ 致命 → 语音电话       │      │
│  │ 任务需求      │    │              │    │ 严重 → IM 实时消息    │      │
│  │              │    │ 在决策边界内  │    │ 警告 → 短信/推送      │      │
│  │ Agent 提取为  │    │ 自主处理问题  │    │ 信息 → 邮件/日报      │      │
│  │ 结构化指令    │    │              │    │                      │      │
│  │              │    │ 记录所有操作  │    │ 人类响应后继续执行    │      │
│  │ 管理员确认    │    │ 到审计日志    │    │ 或按人类指示调整      │      │
│  └──────────────┘    └──────────────┘    └──────────────────────┘      │
│                                                                        │
│  ── 管理员随时可以修改、暂停、删除驻留指令 ──                            │
└────────────────────────────────────────────────────────────────────────┘

3.8.2 对话 → 指令提取Task Definition Session

管理员与 Agent 的对话中Agent 使用专门的指令提取模式,从自然语言中结构化提取任务定义:

管理员: "以后每天早上8点帮我巡检所有生产服务器检查CPU、内存、磁盘用量。
        如果磁盘超过85%自动清理超过7天的日志。
        如果超过95%,马上电话通知我。
        其他异常发短信就行。"

Agent:  "收到。让我确认一下您的要求:

        📋 驻留指令:每日生产环境巡检
        ⏰ 触发方式:每天 08:00cron: 0 8 * * *
        🎯 目标范围:所有 environment=prod 的服务器

        📌 自治操作Agent 可自主执行):
        ├─ 检查 CPU / 内存 / 磁盘使用率
        └─ 磁盘 > 85%:自动清理 7 天前的日志

        🚨 升级策略:
        ├─ 磁盘 > 95%:立即语音电话通知
        └─ 其他异常:短信通知

        ⚠️ 决策边界Agent 不会自主执行的操作):
        ├─ ❌ 不会重启任何服务
        ├─ ❌ 不会修改任何配置文件
        └─ ❌ 不会删除非日志文件

        是否确认创建?"

管理员: "确认。另外如果CPU持续超过90%超过10分钟也给我打电话。"

Agent:  "已更新。追加条件:
        ├─ CPU > 90% 持续 10 分钟:语音电话通知

        ✅ 驻留指令已创建,将从明天 08:00 开始执行。
        指令ID: SO-20260208-001"

指令提取流程agent-service 内部)

// domain/services/standing-order-extractor.ts

/**
 * 从对话中提取驻留指令的领域服务
 *
 * 流程:
 * 1. 对话进行中Agent 检测到用户意图是「定义持续性任务」
 * 2. Agent 切换到指令提取模式System Prompt 追加指令提取 schema
 * 3. Agent 生成结构化的指令草案JSON
 * 4. 草案经人类确认后持久化为 StandingOrder
 * 5. 后续对话修改可增量更新现有指令
 *
 * 关键Agent 的 System Prompt 中包含 StandingOrder JSON Schema
 * 使 Agent 输出符合格式的指令定义,而非自由文本。
 */
export class StandingOrderExtractorService {
  /**
   * 检测对话意图是否为「定义任务」
   * 关键词:以后、每天、定时、自动、帮我...检查/清理/巡检...
   */
  detectTaskDefinitionIntent(message: string): boolean;

  /**
   * 将 Agent 的结构化输出解析为 StandingOrder 草案
   */
  parseOrderDraft(agentOutput: string): StandingOrderDraft;

  /**
   * 人类确认后,将草案转为正式驻留指令
   */
  confirmOrder(draft: StandingOrderDraft, userId: string): StandingOrder;
}

3.8.3 驻留指令实体(跨 agent-service + ops-service

// ops-service/domain/entities/standing-order.entity.ts

export class StandingOrder {
  id: string;                         // SO-{date}-{seq}
  tenantId: string;
  name: string;                       // "每日生产巡检"
  description: string;                // 自然语言描述

  /** 定义此指令的对话会话ID溯源 */
  definedInSessionId: string;

  /** ── 触发方式 ── */
  trigger: {
    type: 'cron' | 'event' | 'threshold' | 'manual';
    cronExpression?: string;          // "0 8 * * *"
    eventType?: string;               // 'alert_fired' | 'deployment_completed'
    thresholdCondition?: {            // 指标阈值触发
      metricType: string;
      operator: '>' | '<' | '==' | '>=';
      value: number;
      durationSeconds: number;        // 持续时间
    };
  };

  /** ── 目标范围 ── */
  targets: {
    serverIds?: string[];             // 指定服务器
    clusterIds?: string[];            // 指定集群
    allServers?: boolean;             // 所有服务器
    environmentFilter?: string[];     // ['prod'] 只在生产环境
    tagFilter?: Record<string, string>; // 按标签筛选
  };

  /** ── Agent 执行指令 ── */
  agentInstructions: {
    prompt: string;                   // 结构化的执行指令(从对话提取)
    skills?: string[];                // 使用的 Skills
    runbookId?: string;               // 关联的 Runbook可选
    maxRiskLevel: 0 | 1;              // 自治执行允许的最高风险等级0=只读1=低风险写入)
    maxTurns: number;                 // Agent 最大对话轮次
    maxBudgetUsd?: number;            // 单次执行预算上限
  };

  /** ── 决策边界Agent 自治范围)── */
  decisionBoundary: {
    /** 可自动执行的操作白名单 */
    allowedActions: string[];         // ['cleanup_logs', 'report_metrics']
    /** 必须升级给人类的情况 */
    escalateConditions: string[];     // ['disk_full', 'service_down', 'data_loss_risk']
    /** 各阈值的通知方式 */
    escalationRules: Array<{
      condition: string;              // "disk_usage > 95%" 或 "cpu > 90% for 10min"
      channel: 'voice_call' | 'im' | 'sms' | 'push' | 'email';
      priority: 'urgent' | 'high' | 'normal' | 'low';
    }>;
  };

  /** ── 升级策略 ── */
  escalationPolicyId: string;         // 关联的 comm-service 升级策略

  /** ── 生命周期 ── */
  status: 'active' | 'paused' | 'archived';
  validFrom: Date;
  validUntil?: Date;                  // 可选过期时间
  createdBy: string;
  lastModifiedInSessionId?: string;   // 最后修改此指令的对话ID
  createdAt: Date;
  updatedAt: Date;
}

3.8.4 自治执行引擎ops-service + agent-service 协作)

// ops-service/infrastructure/schedulers/standing-order-executor.service.ts

/**
 * 驻留指令执行引擎
 *
 * 职责:
 * 1. 根据触发条件cron/event/threshold启动执行
 * 2. 创建「无人值守会话」headless session
 * 3. 在 Agent 的决策边界内自主运行
 * 4. 记录所有操作到审计日志
 * 5. 遇到边界外情况 → 触发智能升级
 */
@Injectable()
export class StandingOrderExecutorService {
  constructor(
    private readonly orderRepo: StandingOrderRepository,
    private readonly agentClient: AgentClientPort,
    private readonly commClient: CommClientPort,
    private readonly auditLogger: AuditLoggerPort,
  ) {}

  /**
   * Cron 触发Bull Queue 定时任务调度
   * 每分钟扫描所有 active 的 cron 型驻留指令,匹配到期的加入执行队列
   */
  @Cron('* * * * *')
  async scanCronOrders(): Promise<void> {
    const dueOrders = await this.orderRepo.findDueCronOrders();
    for (const order of dueOrders) {
      await this.executionQueue.add('execute-standing-order', {
        orderId: order.id,
        triggerType: 'cron',
        triggeredAt: new Date(),
      });
    }
  }

  /**
   * 事件触发:监听 Redis Streams 事件
   * alert_fired / deployment_completed 等事件匹配 event 型驻留指令
   */
  @OnEvent('alert.fired')
  async onAlertFired(event: AlertEvent): Promise<void> {
    const matchingOrders = await this.orderRepo.findByEventTrigger('alert_fired');
    for (const order of matchingOrders) {
      if (this.matchesTargets(order, event.serverId)) {
        await this.executeOrder(order, { triggerEvent: event });
      }
    }
  }

  /**
   * 核心执行逻辑
   */
  async executeOrder(order: StandingOrder, context: ExecutionContext): Promise<void> {
    // 1. 创建无人值守执行记录
    const execution = await this.createExecution(order);

    // 2. 构建 Agent 任务参数(注入决策边界到 System Prompt
    const taskParams = this.buildTaskParams(order, context);

    // 3. 调用 agent-service 执行gRPC 流式调用)
    try {
      for await (const event of this.agentClient.executeTask(taskParams)) {
        // 记录每一步操作
        await this.auditLogger.log({
          actionType: 'standing_order_step',
          orderId: order.id,
          executionId: execution.id,
          detail: event,
        });

        // 4. 检查是否触发升级条件
        if (event.type === 'approval_required') {
          await this.handleEscalation(order, execution, event);
        }

        // 5. 检查自定义升级规则
        if (this.matchesEscalationRule(order, event)) {
          await this.triggerCustomEscalation(order, execution, event);
        }
      }

      execution.status = 'completed';
    } catch (error) {
      execution.status = 'failed';
      execution.error = error.message;
      // 执行失败也要通知
      await this.notifyExecutionFailure(order, execution, error);
    }

    await this.orderRepo.saveExecution(execution);
  }

  /**
   * 构建 Agent 任务参数
   * 关键:将决策边界注入到 System Prompt 中,约束 Agent 行为
   */
  private buildTaskParams(order: StandingOrder, ctx: ExecutionContext): EngineTaskParams {
    const boundaryPrompt = `
## 驻留指令执行模式

你正在执行驻留指令「${order.name}ID: ${order.id})。
这是一次**无人值守的自治执行**,请严格遵守以下边界:

### 允许的操作(可自主执行)
${order.decisionBoundary.allowedActions.map(a => `- ${a}`).join('\n')}

### 禁止的操作(必须停止并请求人工干预)
${order.decisionBoundary.escalateConditions.map(c => `- 遇到「${c}」→ 立即停止,输出 ESCALATION_REQUIRED`).join('\n')}

### 最高风险等级限制
仅允许执行风险等级 ≤ ${order.agentInstructions.maxRiskLevel} 的命令。
遇到更高风险的命令,输出 ESCALATION_REQUIRED 并说明原因。

### 执行目标
${JSON.stringify(order.targets, null, 2)}
`;

    return {
      sessionId: `so-${order.id}-${Date.now()}`,
      prompt: order.agentInstructions.prompt,
      systemPrompt: boundaryPrompt,
      allowedTools: this.resolveAllowedTools(order),
      maxTurns: order.agentInstructions.maxTurns,
      maxBudgetUsd: order.agentInstructions.maxBudgetUsd,
      context: {
        standingOrderId: order.id,
        triggerContext: ctx,
        isHeadless: true,  // 标记为无人值守模式
      },
    };
  }
}

3.8.5 智能升级Smart Escalation

当 Agent 在自治执行中遇到超出决策边界的情况时,按以下三层递进流程升级:

Agent 执行中检测到异常
    │
    ├── 匹配驻留指令中的 escalationRules
    │   ├── condition: "disk_usage > 95%"  → priority: urgent
    │   ├── condition: "cpu > 90% for 10min"  → priority: urgent
    │   └── condition: "other_anomaly"  → priority: normal
    │
    ├── 如果无具体规则匹配 → 使用驻留指令关联的 escalationPolicy
    │
    └── 三层递进升级
        │
        ├── ⏱ 第一层:唤醒管理员
        │   ├── ① App 推送WebSocket + FCM→ 全屏通知卡片
        │   │   └── 管理员打开 App →「语音对话」→ Pipecat WebSocket 语音对话
        │   │       Agent 语音汇报 + 听取指示(支持打断)
        │   │
        │   ├── ② 2分钟无响应 → Pipecat 通过 Twilio 拨出电话
        │   │   管理员接听 → Agent 直接开口说话(无 IVR 菜单):
        │   │   "您好,我是 IT0 Agent。prod-2 磁盘使用率 97%
        │   │    我建议清理 /var/log 下超过7天的日志。您同意吗"
        │   │   管理员语音回复 → Agent 理解并执行
        │   │   (与 App 内语音对话使用同一 Pipecat 引擎)
        │   │
        │   └── 管理员通过任意渠道回复后 → SmartEscalationService.onHumanResponse()
        │
        ├── ⏱ 第二层扩大通知范围5 分钟内仍无响应)
        │   ├── IM 消息Telegram/企业微信):
        │   │   结构化消息 + 操作按钮(批准/拒绝/查看详情)
        │   ├── 短信(简要描述 + 回复 Y/N 审批)
        │   └── 通知备用联系人(重复第一层流程)
        │
        └── ⏱ 第三层全渠道通知10+ 分钟无响应)
            ├── 邮件:完整报告(日志摘录、指标图表、操作链接)
            └── 通知管理组所有成员
// ops-service/domain/services/smart-escalation.service.ts

export class SmartEscalationService {
  constructor(
    private readonly commClient: CommClientPort,
    private readonly voiceClient: VoiceServiceClientPort,  // ★ voice-service 客户端
    private readonly agentClient: AgentClientPort,
    private readonly orderRepo: StandingOrderRepository,
    private readonly execRepo: StandingOrderExecutionRepository,
  ) {}

  /**
   * 根据驻留指令的升级规则 + 当前事件,决定通知渠道和内容
   */
  async escalate(
    order: StandingOrder,
    execution: StandingOrderExecution,
    event: EngineStreamEvent,
  ): Promise<void> {
    const rule = this.matchRule(order.decisionBoundary.escalationRules, event);
    const notification = this.buildNotification(order, execution, event, rule);

    // 1. 第一层App 推送 + 语音对话预备
    await this.commClient.send({
      tenantId: order.tenantId,
      channel: 'push',                          // 推送到 App
      priority: rule?.priority ?? 'normal',
      content: notification,
      callbackContext: {
        executionId: execution.id,
        orderId: order.id,
        awaitingAction: 'approve_or_reject',
      },
    });

    // 2. 预创建语音对话会话(用户打开 App 时可立即接入)
    const voiceSession = await this.voiceClient.prepareSession({
      executionId: execution.id,
      agentContext: {
        orderId: order.id,
        orderName: order.name,
        eventSummary: event.summary,
        sessionId: execution.sessionId,
      },
    });

    // 3. 启动超时升级调度
    await this.scheduleEscalationChain(execution, rule, voiceSession);

    // 4. 暂停执行,等待人类响应
    execution.status = 'awaiting_human';
    execution.escalatedAt = new Date();
    execution.escalationChannel = 'push';  // 初始渠道
  }

  /**
   * 超时升级链App推送 → 电话唤醒 → IM/短信 → 备用联系人 → 邮件/管理组
   */
  private async scheduleEscalationChain(
    execution: StandingOrderExecution,
    rule: EscalationRule | null,
    voiceSession: VoiceSessionInfo,
  ) {
    const chain = [
      { delay: 2 * 60_000, action: 'pipecat_phone_call', params: { voiceSessionId: voiceSession.id } },
      { delay: 5 * 60_000, action: 'im_and_sms' },
      { delay: 10 * 60_000, action: 'backup_contacts' },
      { delay: 20 * 60_000, action: 'email_management_group' },
    ];

    // 使用 Bull Queue 延迟任务实现,每一步检查是否已有响应
    for (const step of chain) {
      await this.escalationQueue.add('escalation-step', {
        executionId: execution.id,
        ...step,
      }, { delay: step.delay });
    }
  }

  /**
   * 人类响应回调(通过任何渠道回复后触发)
   * - App 内语音对话 → voice-service 回调
   * - Pipecat 电话语音对话 → voice-service 回调(同一引擎)
   * - IM 按钮/文字 → comm-service 回调
   * - 短信回复 → comm-service 回调
   */
  async onHumanResponse(
    executionId: string,
    response: HumanResponse,
  ): Promise<void> {
    const execution = await this.execRepo.findById(executionId);

    // 取消后续升级链(人已响应)
    await this.escalationQueue.removeByExecutionId(executionId);

    if (response.action === 'approve') {
      await this.agentClient.continueSession(execution.sessionId,
        `人工审批已通过。操作者: ${response.respondedBy},渠道: ${response.channel}。继续执行。`
      );
      execution.status = 'running';
    } else if (response.action === 'reject') {
      await this.agentClient.cancelTask(execution.sessionId);
      execution.status = 'aborted_by_human';
    } else if (response.action === 'instruct') {
      // 人类通过语音或文字给出详细指示
      await this.agentClient.continueSession(execution.sessionId,
        `管理员指示(${response.channel}${response.instruction}`
      );
      execution.status = 'running';
    }

    execution.humanResponse = response;
    execution.respondedBy = response.respondedBy;
    execution.respondedAt = new Date();
  }
}

Pipecat 主动拨号voice-service 通过 Twilio REST API 发起电话):

# voice-service — 主动拨出电话
# app/services/outbound_call_service.py

from twilio.rest import Client as TwilioClient

class OutboundCallService:
    """
    当 App 推送 2 分钟无响应时SmartEscalationService 调用此服务
    → Pipecat 通过 Twilio 拨出电话
    → 管理员接听后 Pipecat 直接开始语音对话(无 IVR 菜单)
    """

    async def initiate_call(
        self,
        phone_number: str,
        voice_session_id: str,    # 预创建的语音会话 ID
        agent_context: dict,       # 驻留指令上下文
    ):
        # 1. Twilio 拨出电话,接通后连接到 Pipecat Media Streams
        call = self.twilio_client.calls.create(
            to=phone_number,
            from_=self.twilio_phone_number,
            twiml=f'''
                <Response>
                    <Connect>
                        <Stream url="wss://{self.host}/twilio-stream/{voice_session_id}" />
                    </Connect>
                </Response>
            ''',
        )

        # 2. Pipecat Pipeline 在 Media Streams 连接时自动启动
        #    Agent 接通即开口说话:"您好,我是 IT0 Agent..."
        #    无需任何 IVR 菜单或按键操作
        return call.sid
// comm-service — 触发 Pipecat 拨号
// SmartEscalationService 在 2 分钟超时后调用

async function triggerPipecatCall(
  voiceSessionId: string,
  contact: Contact,
) {
  // 调用 voice-service API让 Pipecat 拨出电话
  await this.voiceClient.initiateCall({
    phoneNumber: contact.phone,
    voiceSessionId,
    // Pipecat 接通后 Agent 直接开口说话
    // 使用与 App 内完全相同的 Pipeline
  });
}

3.8.6 对话修改驻留指令

管理员可以随时通过对话修改已有的驻留指令:

管理员: "把每日巡检的时间改成早上7点另外磁盘清理阈值从85%降到80%。"

Agent:  "已更新驻留指令 SO-20260208-001
        ├─ ⏰ 触发时间08:00 → 07:00cron: 0 7 * * *
        └─ 📌 磁盘清理阈值85% → 80%

        其他配置保持不变。变更已生效。"

管理员: "暂停这个指令,我这周在做服务器迁移。"

Agent:  "已暂停驻留指令 SO-20260208-001「每日生产巡检」。
        ⏸ 状态active → paused
        需要恢复时告诉我即可。"

实现要点

  • Agent System Prompt 中包含当前租户所有活跃的驻留指令列表
  • Agent 检测到修改意图时,调用 update_standing_order tool
  • 所有修改记录关联对话 session ID完整溯源
  • 修改操作写入审计日志

3.8.7 执行结果汇报

┌─────────────────────────────────────────────────────────────────┐
│                   驻留指令执行汇报方式                            │
│                                                                 │
│  ┌───────────────────┐                                         │
│  │ 正常完成           │ → 静默记录审计日志                       │
│  │(一切正常)        │   可在 Web Admin / Flutter 查看          │
│  └───────────────────┘                                         │
│                                                                 │
│  ┌───────────────────┐                                         │
│  │ 有发现但已自治处理 │ → 推送通知 + 执行摘要                    │
│  │(如自动清理了日志) │   "已清理 prod-2 的 3.2GB 旧日志"       │
│  └───────────────────┘                                         │
│                                                                 │
│  ┌───────────────────┐                                         │
│  │ 需人工决策         │ → 按 escalationRules 升级               │
│  │(超出决策边界)    │   电话/IM/短信/邮件                      │
│  └───────────────────┘                                         │
│                                                                 │
│  ┌───────────────────┐                                         │
│  │ 执行失败           │ → 立即推送 + 短信                       │
│  │Agent 异常/超时) │   含错误详情和建议                       │
│  └───────────────────┘                                         │
│                                                                 │
│  ── 日报/周报 ──                                                │
│  每日 20:00 汇总当天所有驻留指令执行情况                         │
│  推送到 Flutter App / 邮件                                      │
└─────────────────────────────────────────────────────────────────┘

3.8.8 相关 Toolsagent-service 注册的工具)

// agent-service 的 tool 定义(供 Agent 调用)

const standingOrderTools = [
  {
    name: 'create_standing_order',
    description: '创建新的驻留指令(当用户要求定义持续性/定时任务时使用)',
    input_schema: { /* StandingOrderDraft JSON Schema */ },
  },
  {
    name: 'update_standing_order',
    description: '修改已有的驻留指令(当用户要求调整任务参数时使用)',
    input_schema: {
      orderId: { type: 'string' },
      changes: { /* 部分更新的字段 */ },
    },
  },
  {
    name: 'pause_standing_order',
    description: '暂停驻留指令(用户要求暂停某个定时任务时使用)',
    input_schema: { orderId: { type: 'string' } },
  },
  {
    name: 'resume_standing_order',
    description: '恢复已暂停的驻留指令',
    input_schema: { orderId: { type: 'string' } },
  },
  {
    name: 'list_standing_orders',
    description: '列出当前所有驻留指令及其状态',
    input_schema: { statusFilter: { type: 'string', enum: ['active', 'paused', 'all'] } },
  },
  {
    name: 'report_escalation',
    description: '无人值守执行中遇到超出决策边界的情况,请求人工干预',
    input_schema: {
      severity: { type: 'string', enum: ['fatal', 'critical', 'warning', 'info'] },
      situation: { type: 'string', description: '当前状况描述' },
      suggestedAction: { type: 'string', description: 'Agent 建议的操作' },
      requiresImmediate: { type: 'boolean' },
    },
  },
];

4. 数据库设计PostgreSQL

多租户说明:根据 §7 的 Schema-per-Tenant 策略,以下业务表存在于每个租户的独立 Schemait0_t_{tenantId})中。 所有表额外包含 tenant_id 列作为防御性纵深——即使 Schema 隔离失败,行级 tenant_id 仍可防止数据泄露。 公共管理表(tenants, plans 等)位于 it0_shared Schema参见 §7.5。

4.1 agent-service — it0_agent schema

-- 智能体会话
CREATE TABLE agent_sessions (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    tenant_id       VARCHAR(20) NOT NULL,          -- ★ 防御性纵深:冗余租户标识
    engine_type     VARCHAR(20) NOT NULL,         -- 'claude_code_cli' | 'claude_api' | 'custom'
    status          VARCHAR(20) NOT NULL DEFAULT 'active',
    system_prompt   TEXT,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    metadata        JSONB DEFAULT '{}'
);

-- 命令执行记录
CREATE TABLE command_records (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    tenant_id       VARCHAR(20) NOT NULL,
    session_id      UUID NOT NULL REFERENCES agent_sessions(id),
    command         TEXT NOT NULL,
    risk_level      SMALLINT NOT NULL DEFAULT 0,  -- 0-3
    status          VARCHAR(20) NOT NULL,          -- 'executed' | 'blocked' | 'approved' | 'rejected'
    output          TEXT,
    error           TEXT,
    approved_by     UUID,                          -- 审批人
    executed_at     TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    duration_ms     INTEGER
);

CREATE INDEX idx_command_records_session ON command_records(session_id);
CREATE INDEX idx_command_records_risk ON command_records(risk_level) WHERE risk_level >= 2;

4.2 ops-service — it0_ops schema

-- 运维任务
CREATE TABLE ops_tasks (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    tenant_id       VARCHAR(20) NOT NULL,
    type            VARCHAR(30) NOT NULL,          -- 'inspection' | 'deployment' | 'rollback' | 'scaling' | 'recovery'
    title           VARCHAR(200) NOT NULL,
    description     TEXT,
    status          VARCHAR(20) NOT NULL DEFAULT 'pending',
    priority        SMALLINT NOT NULL DEFAULT 1,
    target_servers  UUID[] NOT NULL,               -- 目标服务器 ID 列表
    runbook_id      UUID,
    agent_session_id UUID,
    created_by      UUID NOT NULL,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    completed_at    TIMESTAMPTZ,
    result_summary  TEXT
);

-- 审批请求
CREATE TABLE approval_requests (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    tenant_id       VARCHAR(20) NOT NULL,
    task_id         UUID NOT NULL REFERENCES ops_tasks(id),
    command         TEXT NOT NULL,
    risk_level      SMALLINT NOT NULL,
    status          VARCHAR(20) NOT NULL DEFAULT 'pending',
    decided_by      UUID,
    decided_at      TIMESTAMPTZ,
    reason          TEXT,
    expires_at      TIMESTAMPTZ NOT NULL,
    notification_channels VARCHAR(20)[]            -- 已通知的渠道
);

-- 运维手册Runbook
CREATE TABLE runbooks (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    tenant_id       VARCHAR(20) NOT NULL,
    name            VARCHAR(200) NOT NULL,
    description     TEXT,
    trigger_type    VARCHAR(30),                   -- 'manual' | 'alert' | 'schedule'
    prompt_template TEXT NOT NULL,                  -- Agent 执行的 prompt 模板
    allowed_tools   VARCHAR(50)[] NOT NULL,
    max_risk_level  SMALLINT NOT NULL DEFAULT 1,   -- 此 Runbook 允许的最高风险等级
    auto_approve    BOOLEAN NOT NULL DEFAULT FALSE,
    is_active       BOOLEAN NOT NULL DEFAULT TRUE,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- ★ 驻留指令Standing Orders— 自治运维任务定义
CREATE TABLE standing_orders (
    id              VARCHAR(30) PRIMARY KEY,          -- SO-{date}-{seq}
    tenant_id       VARCHAR(20) NOT NULL,
    name            VARCHAR(200) NOT NULL,
    description     TEXT,
    -- 对话溯源
    defined_in_session_id UUID NOT NULL,               -- 定义此指令的对话ID
    last_modified_in_session_id UUID,                  -- 最后修改的对话ID
    -- 触发方式
    trigger_type    VARCHAR(20) NOT NULL,               -- 'cron' | 'event' | 'threshold' | 'manual'
    trigger_config  JSONB NOT NULL,                     -- { cronExpression, eventType, thresholdCondition }
    -- 目标范围
    targets         JSONB NOT NULL,                     -- { serverIds, clusterIds, allServers, environmentFilter, tagFilter }
    -- Agent 执行参数
    agent_prompt    TEXT NOT NULL,                      -- 从对话提取的结构化执行指令
    agent_skills    VARCHAR(100)[],                     -- 使用的 Skills 名称列表
    runbook_id      UUID REFERENCES runbooks(id),      -- 关联的 Runbook可选
    max_risk_level  SMALLINT NOT NULL DEFAULT 0,       -- 自治执行最高风险等级0=只读1=低风险写入)
    max_turns       INTEGER NOT NULL DEFAULT 20,
    max_budget_usd  NUMERIC(6,2),
    -- 决策边界
    decision_boundary JSONB NOT NULL,                  -- { allowedActions, escalateConditions, escalationRules }
    -- 升级策略
    escalation_policy_id UUID,
    -- 生命周期
    status          VARCHAR(20) NOT NULL DEFAULT 'active', -- 'active' | 'paused' | 'archived'
    valid_from      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    valid_until     TIMESTAMPTZ,
    -- 元信息
    created_by      UUID NOT NULL,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at      TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_standing_orders_status ON standing_orders(status) WHERE status = 'active';
CREATE INDEX idx_standing_orders_trigger ON standing_orders(trigger_type);

-- 驻留指令执行记录
CREATE TABLE standing_order_executions (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    tenant_id       VARCHAR(20) NOT NULL,
    order_id        VARCHAR(30) NOT NULL REFERENCES standing_orders(id),
    -- 触发信息
    trigger_type    VARCHAR(20) NOT NULL,
    trigger_context JSONB,                              -- 触发上下文告警事件、cron 时间等)
    -- 执行状态
    session_id      VARCHAR(100),                       -- Agent 会话ID
    status          VARCHAR(20) NOT NULL DEFAULT 'running',
                    -- 'running' | 'completed' | 'failed' | 'awaiting_human' | 'aborted_by_human'
    -- 结果
    result_summary  TEXT,                               -- Agent 执行摘要
    actions_taken   JSONB DEFAULT '[]',                 -- 执行的操作列表
    error           TEXT,
    -- 升级信息
    escalated_at    TIMESTAMPTZ,
    escalation_channel VARCHAR(20),                     -- 升级使用的渠道
    human_response  JSONB,                              -- 人类响应内容
    responded_by    UUID,
    responded_at    TIMESTAMPTZ,
    -- 时间
    started_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    completed_at    TIMESTAMPTZ,
    duration_ms     INTEGER
);

CREATE INDEX idx_so_executions_order ON standing_order_executions(order_id, started_at DESC);
CREATE INDEX idx_so_executions_status ON standing_order_executions(status) WHERE status IN ('running', 'awaiting_human');

4.3 inventory-service — it0_inventory schema

-- 服务器
CREATE TABLE servers (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    tenant_id       VARCHAR(20) NOT NULL,
    name            VARCHAR(100) NOT NULL UNIQUE,
    host            VARCHAR(255) NOT NULL,         -- IP 或主机名(内网 IP 也可以)
    port            INTEGER NOT NULL DEFAULT 22,
    environment     VARCHAR(20) NOT NULL,           -- 'dev' | 'staging' | 'prod'
    role            VARCHAR(30) NOT NULL,           -- 'web' | 'db' | 'cache' | 'worker' | 'gateway'
    cluster_id      UUID REFERENCES clusters(id),
    ssh_user        VARCHAR(50) NOT NULL,           -- SSH 登录用户名
    credential_id   UUID NOT NULL REFERENCES credentials(id), -- SSH 密钥或密码
    -- ★ SSH ProxyJump 跳板机配置
    network_type    VARCHAR(20) NOT NULL DEFAULT 'public', -- 'public' | 'private'(内网需跳板机)
    jump_server_id  UUID REFERENCES servers(id),    -- 跳板机自引用NULL=直连)
    -- 额外 SSH 选项
    ssh_options     JSONB DEFAULT '{}',             -- 自定义 SSH 参数,如 StrictHostKeyChecking 等
    tags            JSONB DEFAULT '{}',
    status          VARCHAR(20) NOT NULL DEFAULT 'active',
    description     TEXT,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at      TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_servers_environment ON servers(environment);
CREATE INDEX idx_servers_cluster ON servers(cluster_id);

-- 集群
CREATE TABLE clusters (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    tenant_id       VARCHAR(20) NOT NULL,
    name            VARCHAR(100) NOT NULL UNIQUE,
    type            VARCHAR(20) NOT NULL,           -- 'bare_metal' | 'k8s' | 'docker_swarm'
    environment     VARCHAR(20) NOT NULL,
    description     TEXT,
    kubeconfig      TEXT,                           -- 加密存储
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- 凭证(加密存储)
CREATE TABLE credentials (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    tenant_id       VARCHAR(20) NOT NULL,
    name            VARCHAR(100) NOT NULL,
    type            VARCHAR(20) NOT NULL,           -- 'ssh_key' | 'ssh_password' | 'token' | 'kubeconfig'
    encrypted_value BYTEA NOT NULL,                 -- AES-256-GCM 加密
    iv              BYTEA NOT NULL,
    -- 元信息(非敏感)
    fingerprint     VARCHAR(100),                   -- SSH key 指纹(用于展示,不含私钥)
    key_type        VARCHAR(20),                    -- 'rsa' | 'ed25519' | 'ecdsa'(仅 ssh_key 类型)
    created_by      UUID,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    expires_at      TIMESTAMPTZ,
    last_used_at    TIMESTAMPTZ
);

/**
 * SSH 连接构建逻辑agent-service 中实现):
 *
 * 1. 查询目标服务器server = getServer(serverId)
 * 2. 查询凭证credential = getCredential(server.credential_id)
 * 3. 判断连接方式:
 *
 *    A) 直连network_type = 'public' 或 jump_server_id = NULL
 *       ssh -i /tmp/key_{uuid} -p {port} {ssh_user}@{host}
 *
 *    B) 跳板机network_type = 'private' 且 jump_server_id != NULL
 *       jumpServer = getServer(server.jump_server_id)
 *       jumpCred = getCredential(jumpServer.credential_id)
 *       ssh -J {jump_user}@{jump_host}:{jump_port} -i /tmp/key_{uuid} -p {port} {ssh_user}@{host}
 *
 *    C) 多级跳板jump_server 自身也有 jump_server_id
 *       递归构建 ProxyJump 链ssh -J jump1,jump2 target
 *
 * 4. 凭证通过临时文件注入TTL=5min用后删除
 * 5. 额外 SSH 选项从 server.ssh_options 读取
 */

4.4 monitor-service — it0_monitor schema

-- 告警规则
CREATE TABLE alert_rules (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    tenant_id       VARCHAR(20) NOT NULL,
    name            VARCHAR(200) NOT NULL,
    metric_type     VARCHAR(30) NOT NULL,          -- 'cpu' | 'memory' | 'disk' | 'network' | 'custom'
    condition       JSONB NOT NULL,                -- { "operator": ">", "threshold": 90, "duration_seconds": 300 }
    severity        VARCHAR(20) NOT NULL,          -- 'info' | 'warning' | 'critical' | 'fatal'
    target_servers  UUID[],                        -- NULL = 所有服务器
    runbook_id      UUID,                          -- 自动修复关联的 Runbook
    is_active       BOOLEAN NOT NULL DEFAULT TRUE,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- 告警事件
CREATE TABLE alert_events (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    tenant_id       VARCHAR(20) NOT NULL,
    rule_id         UUID NOT NULL REFERENCES alert_rules(id),
    server_id       UUID NOT NULL,
    severity        VARCHAR(20) NOT NULL,
    message         TEXT NOT NULL,
    metric_value    DOUBLE PRECISION,
    status          VARCHAR(20) NOT NULL DEFAULT 'firing', -- 'firing' | 'resolved' | 'acknowledged'
    fired_at        TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    resolved_at     TIMESTAMPTZ,
    acknowledged_by UUID
);

-- 指标快照(时序数据,定期归档)
CREATE TABLE metric_snapshots (
    id              BIGSERIAL PRIMARY KEY,
    tenant_id       VARCHAR(20) NOT NULL,
    server_id       UUID NOT NULL,
    metric_type     VARCHAR(30) NOT NULL,
    value           DOUBLE PRECISION NOT NULL,
    recorded_at     TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- 分区表优化(按月)
-- CREATE TABLE metric_snapshots_2026_02 PARTITION OF metric_snapshots
--   FOR VALUES FROM ('2026-02-01') TO ('2026-03-01');

4.5 comm-service — it0_comm schema

-- 联系人
CREATE TABLE contacts (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    tenant_id       VARCHAR(20) NOT NULL,
    user_id         UUID NOT NULL,
    name            VARCHAR(100) NOT NULL,
    phone           VARCHAR(20),                   -- 短信/电话
    email           VARCHAR(200),
    telegram_id     VARCHAR(100),
    wechat_id       VARCHAR(100),
    preferred_channel VARCHAR(20) NOT NULL DEFAULT 'websocket',
    is_active       BOOLEAN NOT NULL DEFAULT TRUE
);

-- 消息记录
CREATE TABLE messages (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    tenant_id       VARCHAR(20) NOT NULL,
    contact_id      UUID NOT NULL REFERENCES contacts(id),
    channel_type    VARCHAR(20) NOT NULL,
    direction       VARCHAR(10) NOT NULL,          -- 'outbound' | 'inbound'
    content         TEXT NOT NULL,
    priority        VARCHAR(10) NOT NULL DEFAULT 'normal',
    delivery_status VARCHAR(20) NOT NULL DEFAULT 'pending',
    related_task_id UUID,                          -- 关联的运维任务
    sent_at         TIMESTAMPTZ,
    delivered_at    TIMESTAMPTZ,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- 升级策略
CREATE TABLE escalation_policies (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    tenant_id       VARCHAR(20) NOT NULL,
    name            VARCHAR(100) NOT NULL,
    severity        VARCHAR(20) NOT NULL,          -- 匹配告警级别
    steps           JSONB NOT NULL,                -- [{"delay_seconds": 0, "channel": "websocket"}, {"delay_seconds": 120, "channel": "sms"}, ...]
    is_default      BOOLEAN NOT NULL DEFAULT FALSE
);

4.6 audit-service — it0_audit schema

-- 审计日志(只追加,不可修改/删除)
CREATE TABLE audit_logs (
    id              BIGSERIAL PRIMARY KEY,
    tenant_id       VARCHAR(20) NOT NULL,
    action_type     VARCHAR(50) NOT NULL,          -- 'command_executed' | 'approval_decided' | 'engine_switched' | etc.
    actor_type      VARCHAR(20) NOT NULL,          -- 'agent' | 'user' | 'system'
    actor_id        VARCHAR(100),
    resource_type   VARCHAR(50),                   -- 'server' | 'task' | 'session'
    resource_id     VARCHAR(100),
    detail          JSONB NOT NULL,
    ip_address      INET,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- 性能索引
CREATE INDEX idx_audit_logs_action ON audit_logs(action_type, created_at);
CREATE INDEX idx_audit_logs_actor ON audit_logs(actor_id, created_at);
CREATE INDEX idx_audit_logs_resource ON audit_logs(resource_type, resource_id);

-- 防篡改REVOKE DELETE, UPDATE ON audit_logs FROM app_user;

5. 服务间通信

5.1 通信矩阵

                  ┌─────────────────────────────────────────────┐
                  │              事件总线 (Redis Streams)         │
                  └──┬────┬────┬────┬────┬────┬────┬────────────┘
                     │    │    │    │    │    │    │
                ┌────┴─┐ ┌┴────┐ ┌─┴──┐ ┌┴───┐ ┌─┴──┐ ┌──┴──┐
                │auth  │ │agent│ │ops │ │inv │ │mon │ │comm │
                └──────┘ └──┬──┘ └─┬──┘ └────┘ └──┬─┘ └─────┘
                            │      │               │
                  gRPC 同步调用:                      │
                  ops → agent (执行任务)              │
                  agent → inv (获取凭证)              │
                  mon → ops (触发自动修复)  ───────────┘
                  ops → comm (通知人类)

5.2 关键事件流

// 事件定义(所有服务共享的 proto/event 包)

// agent-service 发布
interface CommandExecutedEvent {
  sessionId: string;
  command: string;
  riskLevel: number;
  output: string;
  timestamp: Date;
}

interface ApprovalRequiredEvent {
  taskId: string;
  command: string;
  riskLevel: number;
  targetServer: string;
  expiresAt: Date;
}

// monitor-service 发布
interface AlertFiredEvent {
  ruleId: string;
  serverId: string;
  severity: 'info' | 'warning' | 'critical' | 'fatal';
  message: string;
  metricValue: number;
}

// ops-service 发布
interface TaskCompletedEvent {
  taskId: string;
  result: 'success' | 'failure' | 'partial';
  summary: string;
}

6. 安全架构

6.1 三层防御模型

Layer 1: 命令分级agent-service 领域层)
    ├── Level 0: 只读命令 → 自动执行
    ├── Level 1: 低风险写入 → 前端确认
    ├── Level 2: 高风险操作 → 多渠道审批
    └── Level 3: 禁止命令 → 直接拦截

Layer 2: Hook 拦截Claude Code PreToolUse
    └── pre-tool-use-hook.py
        ├── 正则匹配危险模式
        ├── 写入审批队列
        └── 返回 block/allow

Layer 3: 架构隔离(目标服务器侧)
    ├── 受限 SSH 用户 (rbash)
    ├── K8s RBAC (ops-agent ClusterRole)
    ├── 数据库只读账号
    └── 网络隔离 (Security Group)

6.2 凭证安全流程

agent-service 需要 SSH 到 prod-1
    ↓ gRPC 请求
inventory-service 接收请求
    ↓ 验证调用者权限
    ↓ 从 PostgreSQL 读取加密凭证
    ↓ 内存中解密
    ↓ 生成临时凭证TTL=5min
    ↓ 返回
agent-service 使用临时凭证执行 SSH
    ↓ 完成后
临时凭证自动过期

7. 多租户 SaaS 架构

7.1 隔离策略

采用 Schema-per-Tenant(同一 PostgreSQL 实例,每个租户独立 Schema+ 行级隔离混合模式:

┌─────────────────────────────────────────────────────────────┐
│                    PostgreSQL 实例                            │
│                                                             │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐         │
│  │ it0_shared  │  │ it0_t_001   │  │ it0_t_002   │  ...    │
│  │ (公共表)     │  │ (租户A数据) │  │ (租户B数据) │         │
│  │             │  │             │  │             │         │
│  │ tenants     │  │ servers     │  │ servers     │         │
│  │ plans       │  │ credentials │  │ credentials │         │
│  │ billing     │  │ tasks       │  │ tasks       │         │
│  │ global_cfg  │  │ sessions    │  │ sessions    │         │
│  └─────────────┘  └─────────────┘  └─────────────┘         │
└─────────────────────────────────────────────────────────────┘
表类别 隔离方式 说明
tenants, plans, billing 公共 Schema (it0_shared) 跨租户的 SaaS 管理数据
servers, credentials, tasks, sessions, skills, runbooks, alerts, contacts, audit_logs 租户独立 Schema (it0_t_{tenantId}) 完全隔离,互不可见

7.2 租户上下文传播AsyncLocalStorage

所有微服务共享同一个 TenantContext 模式,基于 Node.js AsyncLocalStorage

// packages/shared/tenant/tenant-context.service.ts

import { AsyncLocalStorage } from 'async_hooks';

export interface TenantInfo {
  tenantId: string;
  tenantName: string;
  plan: 'free' | 'pro' | 'enterprise';
  schemaName: string;  // 'it0_t_{tenantId}'
}

export class TenantContextService {
  private static storage = new AsyncLocalStorage<TenantInfo>();

  /** 设置当前请求的租户上下文 */
  static run<T>(tenant: TenantInfo, fn: () => T): T {
    return this.storage.run(tenant, fn);
  }

  /** 获取当前租户 */
  static getTenant(): TenantInfo {
    const tenant = this.storage.getStore();
    if (!tenant) throw new Error('Tenant context not initialized');
    return tenant;
  }

  static getTenantId(): string {
    return this.getTenant().tenantId;
  }

  static getSchemaName(): string {
    return this.getTenant().schemaName;
  }
}

7.3 请求级租户注入

// packages/shared/tenant/tenant.middleware.ts

/**
 * NestJS 中间件:从 JWT / API Key / 请求头提取 tenantId
 * 注入到 AsyncLocalStorage后续所有数据库操作自动切换 Schema
 */
@Injectable()
export class TenantMiddleware implements NestMiddleware {
  constructor(private readonly tenantRepo: TenantRepository) {}

  async use(req: Request, res: Response, next: NextFunction) {
    // 优先级JWT claim > X-Tenant-Id header > API Key 关联
    const tenantId = this.extractTenantId(req);
    if (!tenantId) throw new UnauthorizedException('Tenant not identified');

    const tenant = await this.tenantRepo.findById(tenantId);
    if (!tenant) throw new NotFoundException('Tenant not found');

    TenantContextService.run({
      tenantId: tenant.id,
      tenantName: tenant.name,
      plan: tenant.plan,
      schemaName: `it0_t_${tenant.id}`,
    }, () => next());
  }
}

7.4 TypeORM 动态 Schema 切换

// packages/shared/tenant/tenant-aware.repository.ts

/**
 * 所有租户隔离的 Repository 基类
 * 自动在查询前切换 PostgreSQL search_path 到租户 Schema
 */
export abstract class TenantAwareRepository<T> {
  constructor(
    protected readonly dataSource: DataSource,
    protected readonly entity: EntityTarget<T>,
  ) {}

  protected async getRepository(): Promise<Repository<T>> {
    const schema = TenantContextService.getSchemaName();
    const queryRunner = this.dataSource.createQueryRunner();
    await queryRunner.query(`SET search_path TO "${schema}", public`);
    return queryRunner.manager.getRepository(this.entity);
  }

  // CRUD 方法都通过 getRepository() 获取租户隔离的仓储
  async findById(id: string): Promise<T | null> {
    const repo = await this.getRepository();
    return repo.findOneBy({ id } as any);
  }

  async save(entity: T): Promise<T> {
    const repo = await this.getRepository();
    return repo.save(entity);
  }
}

7.5 租户生命周期管理

-- it0_shared schema

CREATE TABLE tenants (
    id              VARCHAR(20) PRIMARY KEY,          -- 短ID如 't001'
    name            VARCHAR(200) NOT NULL,
    slug            VARCHAR(50) NOT NULL UNIQUE,      -- URL 友好标识
    plan            VARCHAR(20) NOT NULL DEFAULT 'free', -- 'free' | 'pro' | 'enterprise'
    owner_user_id   UUID NOT NULL,
    status          VARCHAR(20) NOT NULL DEFAULT 'active', -- 'active' | 'suspended' | 'deleted'
    -- 资源配额
    max_servers     INTEGER NOT NULL DEFAULT 5,
    max_skills      INTEGER NOT NULL DEFAULT 10,
    max_users       INTEGER NOT NULL DEFAULT 3,
    max_sessions_per_day INTEGER NOT NULL DEFAULT 50,
    -- 引擎配置
    allowed_engines VARCHAR(20)[] NOT NULL DEFAULT '{claude_code_cli}',
    -- 审计
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at      TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- 租户创建时自动初始化 Schema
-- 通过 NestJS migration 或 tenant-provisioning.service.ts 执行:
-- CREATE SCHEMA it0_t_{tenantId};
-- 运行租户 Schema 的迁移脚本(创建 servers, credentials, tasks 等表)

7.6 Kong 多租户路由

在 Kong 层面,通过 JWT claim 中的 tenant_id 传递租户标识:

# kong.yml 追加:租户限流(按 tenant 维度)
plugins:
  - name: rate-limiting
    config:
      minute: 120
      policy: redis
      redis_host: redis
      # 按 JWT 中的 tenant_id 限流
      limit_by: credential

7.7 Agent 引擎的多租户隔离

┌─────────────────────────────────────────────────┐
│  agent-service                                   │
│                                                  │
│  租户 A 的任务 ──→ spawn claude -p "..."         │
│                    --system-prompt-file /tmp/a/   │
│                    使用租户 A 的 .claude/skills/   │
│                    SSH 到租户 A 的服务器           │
│                                                  │
│  租户 B 的任务 ──→ spawn claude -p "..."         │
│                    --system-prompt-file /tmp/b/   │
│                    使用租户 B 的 .claude/skills/   │
│                    SSH 到租户 B 的服务器           │
│                                                  │
│  关键隔离点:                                     │
│  1. System Prompt 包含租户专属的服务器清单         │
│  2. Skills 目录按租户隔离                         │
│  3. SSH 凭证从租户 Schema 读取                    │
│  4. Hook 脚本验证命令目标在租户范围内              │
│  5. 并发控制:按租户 plan 限制同时执行的任务数     │
└─────────────────────────────────────────────────┘

8. API Gateway — Kong 声明式配置

8.1 架构选择

使用 Kong DB-less 模式(声明式 YAML 配置),不依赖额外数据库,配置文件即基础设施:

# packages/services/api-gateway/kong.yml

_format_version: "3.0"

services:
  # ===== auth-service =====
  - name: auth-service
    url: http://auth-service:3001
    routes:
      - name: auth-routes
        paths:
          - /api/v1/auth
        strip_path: false

  # ===== agent-service =====
  - name: agent-service
    url: http://agent-service:3002
    routes:
      - name: agent-routes
        paths:
          - /api/v1/agent
        strip_path: false
      - name: agent-ws
        paths:
          - /ws/agent
        strip_path: false
        protocols:
          - http
          - https
          - ws
          - wss

  # ===== ops-service =====
  - name: ops-service
    url: http://ops-service:3003
    routes:
      - name: ops-routes
        paths:
          - /api/v1/ops
        strip_path: false

  # ===== inventory-service =====
  - name: inventory-service
    url: http://inventory-service:3004
    routes:
      - name: inventory-routes
        paths:
          - /api/v1/inventory
        strip_path: false

  # ===== monitor-service =====
  - name: monitor-service
    url: http://monitor-service:3005
    routes:
      - name: monitor-routes
        paths:
          - /api/v1/monitor
        strip_path: false

  # ===== comm-service =====
  - name: comm-service
    url: http://comm-service:3006
    routes:
      - name: comm-routes
        paths:
          - /api/v1/comm
        strip_path: false
      - name: comm-ws
        paths:
          - /ws/comm
        strip_path: false
        protocols:
          - http
          - https
          - ws
          - wss

  # ===== voice-service =====
  - name: voice-service
    url: http://voice-service:3008
    routes:
      - name: voice-ws
        paths:
          - /ws/voice
        strip_path: false
        protocols:
          - ws
          - wss
      - name: voice-api
        paths:
          - /api/v1/voice
        strip_path: false
      - name: twilio-webhook
        paths:
          - /api/v1/twilio
        strip_path: false

  # ===== audit-service =====
  - name: audit-service
    url: http://audit-service:3007
    routes:
      - name: audit-routes
        paths:
          - /api/v1/audit
        strip_path: false

# ===== 全局插件 =====
plugins:
  # JWT 认证(全局)
  - name: jwt
    config:
      key_claim_name: kid
      claims_to_verify:
        - exp
    # 排除登录和健康检查路由
    route: null
    service: null

  # CORS
  - name: cors
    config:
      origins:
        - http://localhost:3000    # Web Admin (Next.js dev)
        - https://admin.it0.your-domain.com
      methods:
        - GET
        - POST
        - PUT
        - DELETE
        - PATCH
        - OPTIONS
      headers:
        - Authorization
        - Content-Type
      credentials: true

  # 限流(全局默认)
  - name: rate-limiting
    config:
      minute: 120
      policy: redis
      redis_host: redis
      redis_port: 6379

  # 请求日志
  - name: file-log
    config:
      path: /dev/stdout
      reopen: true

# ===== 路由级插件覆盖 =====

  # auth 路由不需要 JWT登录端点
  - name: jwt
    route: auth-routes
    enabled: false

  # agent WebSocket 更高限流
  - name: rate-limiting
    route: agent-ws
    config:
      minute: 30
      policy: redis
      redis_host: redis

  # audit 路由只允许 admin 角色
  - name: acl
    route: audit-routes
    config:
      allow:
        - admin

8.2 Kong Docker 配置

# packages/services/api-gateway/Dockerfile

FROM kong:3.7-alpine

# 复制声明式配置
COPY kong.yml /etc/kong/kong.yml

# 环境变量设为 DB-less 模式
ENV KONG_DATABASE=off
ENV KONG_DECLARATIVE_CONFIG=/etc/kong/kong.yml
ENV KONG_PROXY_LISTEN=0.0.0.0:8000
ENV KONG_ADMIN_LISTEN=0.0.0.0:8001
ENV KONG_LOG_LEVEL=info
# deploy/docker-compose.yml 中的 Kong 服务
services:
  api-gateway:
    build: ../packages/services/api-gateway
    ports:
      - "8000:8000"   # Proxy客户端访问
      - "8001:8001"   # Admin API仅内部使用
    depends_on:
      - auth-service
      - agent-service
      - ops-service
      - inventory-service
      - monitor-service
      - comm-service
      - audit-service
      - redis
    healthcheck:
      test: ["CMD", "kong", "health"]
      interval: 10s
      timeout: 5s
      retries: 3
    networks:
      - it0-network

8.3 WebSocket 代理注意事项

Kong 原生支持 WebSocket 代理,关键配置:

  • protocols 包含 wswss
  • 建议为 WebSocket 路由单独设置更宽松的超时:连接超时 60s、读写超时 3600s
  • Agent 流式输出和实时通信都走 WebSocket需要确保 Kong 不会过早断开长连接

9. Claude Skills 支持Anthropic Claude Skills

9.1 Skills 与 Runbook 的关系

Claude Skills                          Runbook
(底层能力单元)                          (高层业务流程)
┌─────────────┐                    ┌──────────────────┐
│ /inspect    │─── 被组合使用 ──→  │ 每日巡检 Runbook  │
│ /cleanup    │─── 被组合使用 ──→  │ (调用多个 Skills) │
│ /diagnose   │                    │                  │
└─────────────┘                    └──────────────────┘

Skills = 原子操作单个技能Claude Code 原生识别)
Runbook = 编排流程(多步骤,包含审批、通知、条件分支)

9.2 Skill 定义规范

每个 Skill 是一个目录,包含 SKILL.mdYAML frontmatter + Markdown 指令):

# .claude/skills/inspect/SKILL.md

---
name: inspect
description: 对指定服务器执行全面健康检查CPU/MEM/DISK/网络/服务状态),返回结构化报告
argument-hint: "[server-name|all]"
allowed-tools: Bash, Read, Grep
disable-model-invocation: false
---

# 服务器健康检查

## 参数
- `$0`: 目标服务器名称,或 `all` 检查所有服务器

## 执行步骤

1. **确认目标**:如果 `$0` 是 `all`,从服务器清单获取所有在线服务器
2. **基础指标采集**(对每台服务器):
   - `ssh {server} 'top -bn1 | head -5'` — CPU 负载
   - `ssh {server} 'free -h'` — 内存使用
   - `ssh {server} 'df -h'` — 磁盘使用
   - `ssh {server} 'ss -tuln'` — 网络端口
   - `ssh {server} 'uptime'` — 运行时间
3. **服务状态**
   - `ssh {server} 'systemctl list-units --failed'` — 失败的服务
   - `ssh {server} 'docker ps --format "table {{.Names}}\t{{.Status}}"'` — 容器状态(如适用)
4. **异常检测**:标记 CPU>80%、MEM>85%、DISK>90% 的指标
5. **输出结构化报告**:按服务器分组,标记异常项

## 输出格式
```markdown
## 巡检报告 — {timestamp}

### {server-name} ● 正常 / ⚠ 告警 / 🔴 异常
| 指标 | 值 | 状态 |
|------|-----|------|
| CPU  | 45% | ✓   |
| MEM  | 62% | ✓   |
| DISK | 92% | ⚠   |
...

```yaml
# .claude/skills/deploy/SKILL.md

---
name: deploy
description: 将指定应用部署到目标环境(支持 Docker 和 K8s
argument-hint: "[app-name] [environment]"
allowed-tools: Bash, Read, Grep, Glob
disable-model-invocation: true
---

# 应用部署

## 参数
- `$0`: 应用名称
- `$1`: 目标环境 (dev/staging/prod)

## 前置检查
1. 确认应用 `$0` 存在于部署配置中
2. 确认目标环境 `$1` 的服务器可达
3. 如果是 prod 环境,明确提示需要审批

## 部署流程
1. 拉取最新镜像 / 构建产物
2. 备份当前版本信息(用于回滚)
3. 执行滚动更新
4. 等待健康检查通过
5. 验证服务正常响应
6. 输出部署摘要(版本号、耗时、服务状态)

## 回滚预案
如果部署后健康检查失败,自动执行 /rollback $0 $1
# .claude/skills/diagnose/SKILL.md

---
name: diagnose
description: 诊断服务器或服务的异常问题,自动收集日志和上下文
argument-hint: "[server-name] [symptom-description]"
allowed-tools: Bash, Read, Grep, Glob
disable-model-invocation: false
---

# 故障诊断

## 参数
- `$0`: 目标服务器
- `$ARGUMENTS`: 症状描述(如 "nginx 502 错误"、"磁盘 IO 高"

## 诊断框架

### 1. 收集上下文
- 系统负载: `uptime`, `top -bn1`, `vmstat 1 5`
- 磁盘 IO: `iostat -x 1 5`
- 网络: `netstat -s`, `ss -s`
- 最近错误日志: `journalctl -p err --since "1 hour ago"`

### 2. 针对症状深入
根据 `$ARGUMENTS` 描述的症状,针对性收集:
- HTTP 错误 → Nginx/Apache 错误日志 + upstream 状态
- 高负载 → 进程列表 + 资源占用排行
- 磁盘满 → 大文件查找 + 日志增长速率
- 内存泄漏 → 进程内存排行 + OOM killer 日志

### 3. 分析与建议
- 总结根因分析
- 提供修复建议(标明风险等级)
- 如果有明确的修复命令,提示是否执行

9.3 Skills 管理在 agent-service 中的实现

agent-service/
├── src/
│   ├── infrastructure/
│   │   ├── engines/
│   │   │   └── claude-code-cli/
│   │   │       ├── claude-code-engine.ts
│   │   │       └── ...
│   │   └── skills/                              # ★ Skills 管理层
│   │       ├── skill-manager.service.ts         #   Skill CRUD读写 .claude/skills/
│   │       ├── skill-sync.service.ts            #   DB ↔ 文件系统双向同步
│   │       ├── skill-template-renderer.ts       #   模板变量渲染($0, $ARGUMENTS 等)
│   │       └── skill-validator.service.ts       #   Skill 格式校验frontmatter + 内容)
// infrastructure/skills/skill-manager.service.ts

@Injectable()
export class SkillManagerService {
  private readonly skillsDir: string;

  constructor(private readonly configService: ConfigService) {
    // Skills 目录:项目根目录下的 .claude/skills/
    this.skillsDir = path.join(
      this.configService.get('PROJECT_ROOT', process.cwd()),
      '.claude', 'skills',
    );
  }

  /** 列出所有已注册的 Skills */
  async listSkills(): Promise<SkillDefinition[]> {
    const dirs = await fs.readdir(this.skillsDir, { withFileTypes: true });
    const skills: SkillDefinition[] = [];

    for (const dir of dirs.filter(d => d.isDirectory())) {
      const skillPath = path.join(this.skillsDir, dir.name, 'SKILL.md');
      if (await this.fileExists(skillPath)) {
        const content = await fs.readFile(skillPath, 'utf-8');
        skills.push(this.parseSkillMd(dir.name, content));
      }
    }
    return skills;
  }

  /** 创建/更新 Skill从 Web Admin 配置写入文件系统) */
  async upsertSkill(skill: CreateSkillDto): Promise<void> {
    const skillDir = path.join(this.skillsDir, skill.name);
    await fs.mkdir(skillDir, { recursive: true });

    const content = this.buildSkillMd(skill);
    await fs.writeFile(path.join(skillDir, 'SKILL.md'), content, 'utf-8');

    // 如果有附带脚本
    if (skill.scripts?.length) {
      const scriptsDir = path.join(skillDir, 'scripts');
      await fs.mkdir(scriptsDir, { recursive: true });
      for (const script of skill.scripts) {
        await fs.writeFile(path.join(scriptsDir, script.name), script.content, 'utf-8');
      }
    }
  }

  /** 删除 Skill */
  async deleteSkill(name: string): Promise<void> {
    const skillDir = path.join(this.skillsDir, name);
    await fs.rm(skillDir, { recursive: true, force: true });
  }

  /** 通过 Claude Code CLI 调用 Skill */
  invokeSkillPrompt(skillName: string, args: string): string {
    // 返回格式化的 prompt让 Claude Code 识别为 Skill 调用
    return `/${skillName} ${args}`;
  }

  /** 解析 SKILL.md (YAML frontmatter + Markdown body) */
  private parseSkillMd(dirName: string, content: string): SkillDefinition {
    const frontmatterMatch = content.match(/^---\n([\s\S]*?)\n---\n([\s\S]*)$/);
    if (!frontmatterMatch) {
      return { name: dirName, description: '', content, frontmatter: {} };
    }
    const frontmatter = yaml.parse(frontmatterMatch[1]);
    const body = frontmatterMatch[2];
    return {
      name: frontmatter.name || dirName,
      description: frontmatter.description || '',
      argumentHint: frontmatter['argument-hint'],
      allowedTools: frontmatter['allowed-tools'],
      disableModelInvocation: frontmatter['disable-model-invocation'] ?? false,
      content: body,
      frontmatter,
    };
  }

  /** 构建 SKILL.md 内容 */
  private buildSkillMd(skill: CreateSkillDto): string {
    const frontmatter = yaml.stringify({
      name: skill.name,
      description: skill.description,
      'argument-hint': skill.argumentHint,
      'allowed-tools': skill.allowedTools,
      'disable-model-invocation': skill.disableModelInvocation ?? false,
    });
    return `---\n${frontmatter}---\n\n${skill.content}`;
  }
}

9.4 Skill 与引擎的集成

// Claude Code CLI 引擎中调用 Skill 的两种方式:

// 方式 1: 直接在 prompt 中使用 /skill-nameClaude Code 自动识别)
const args = ['-p', `/inspect all`, '--output-format', 'stream-json', ...];

// 方式 2: Skill 内容已通过 .claude/skills/ 目录自动加载到 Claude Code 上下文中
// Claude Code 启动时读取所有 Skills 的 description自主决定何时使用
// 当用户说"检查一下服务器"时Claude Code 会自动匹配 /inspect skill

// 方式 3: Claude API 引擎 — 手动注入 Skill 内容到 system prompt
// 当切换到 Claude API 引擎时SkillManager 读取 Skill 内容,拼接到 system prompt 中

9.5 Skill 数据库表agent-service

-- 追加到 it0_agent schema

-- Skills 注册表(与文件系统双向同步)
CREATE TABLE skills (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    name            VARCHAR(64) NOT NULL UNIQUE,      -- kebab-case 名称
    description     TEXT NOT NULL,
    argument_hint   VARCHAR(200),
    allowed_tools   VARCHAR(50)[],
    disable_model_invocation BOOLEAN NOT NULL DEFAULT FALSE,
    content         TEXT NOT NULL,                     -- Markdown 指令体
    frontmatter     JSONB NOT NULL DEFAULT '{}',       -- 完整 frontmatter
    version         INTEGER NOT NULL DEFAULT 1,
    is_active       BOOLEAN NOT NULL DEFAULT TRUE,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at      TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Skill 执行记录
CREATE TABLE skill_executions (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    skill_name      VARCHAR(64) NOT NULL,
    session_id      UUID REFERENCES agent_sessions(id),
    arguments       TEXT,
    engine_type     VARCHAR(20) NOT NULL,
    status          VARCHAR(20) NOT NULL,              -- 'success' | 'failure' | 'cancelled'
    duration_ms     INTEGER,
    executed_at     TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_skill_executions_name ON skill_executions(skill_name, executed_at);

9.6 引擎兼容性Skills 在三种引擎下的行为

引擎 Skill 加载方式 Skill 调用方式
Claude Code CLI .claude/skills/ 目录自动发现 /skill-name args(原生支持)
Claude API SkillManager 读取 → 拼接到 system prompt prompt 中包含 Skill 指令文本
Custom Agent SkillManager 读取 → 注入到自研引擎上下文 自定义调用机制

当切换引擎时,SkillManager 确保 Skills 的跨引擎可移植性:

  • Claude Code CLI 原生读 .claude/skills/ 目录
  • 其他引擎通过 SkillManager.getSkillContent(name) 获取 Skill 内容,手动注入

10. 运维专用 System Prompt 模板

// infrastructure/engines/claude-code-cli/system-prompt-builder.ts

export class SystemPromptBuilder {
  build(context: OpsContext): string {
    return `
你是 IT0 运维智能体,负责管理以下服务器集群。

## 你管理的环境
${context.servers.map(s => `- ${s.name} (${s.host}) [${s.environment}/${s.role}]`).join('\n')}

## 集群信息
${context.clusters.map(c => `- ${c.name} (${c.type}) — ${c.description}`).join('\n')}

## 操作规范

### 权限分级
- Level 0自动执行查看类命令df, free, top, kubectl get, docker ps, cat/grep 日志)
- Level 1需确认服务重启、日志清理、配置查看
- Level 2需审批部署、扩缩容、数据库操作、配置修改
- Level 3禁止rm -rf /, DROP DATABASE, shutdown, reboot, 格式化磁盘

### 执行原则
1. 每次操作前先说明你要做什么、为什么
2. 执行破坏性操作前,先展示完整命令
3. 每次操作后验证结果(检查返回码、确认服务状态)
4. 发现异常时自动收集诊断信息(日志末尾、资源使用、进程列表、网络状态)
5. 如果不确定,宁可多查一步也不要盲目执行
6. 操作完成后给出简洁总结

### 禁止行为
- 绝不猜测密码或凭证
- 绝不修改 SSH 配置或防火墙规则(除非明确指示)
- 绝不安装未经授权的软件包
- 绝不在生产环境执行未经验证的脚本

### SSH 访问
使用预配置的 SSH 别名ssh ${context.sshUser}@{server_host}
`.trim();
  }
}

11. 项目初始化与目录结构

11.1 Monorepo 结构

IT0/
├── docs/                                    # 文档
│   ├── backend-guide.md                     # 本文档
│   ├── flutter-guide.md                     # Flutter 前端指导
│   └── web-admin-guide.md                   # PC Web 管理前端指导
│
├── packages/
│   ├── shared/                              # 共享包
│   │   ├── proto/                           # gRPC Protobuf 定义
│   │   ├── events/                          # 事件类型定义
│   │   └── utils/                           # 公共工具
│   │
│   └── services/                            # 微服务
│       ├── api-gateway/                     # Kong 声明式配置
│       │   ├── kong.yml                     #   路由/服务/插件声明
│       │   ├── Dockerfile                   #   Kong 自定义镜像
│       │   └── plugins/                     #   自定义插件(如需)
│       ├── auth-service/
│       ├── agent-service/                   # ★ 核心智能体服务
│       ├── ops-service/
│       ├── inventory-service/
│       ├── monitor-service/
│       ├── comm-service/
│       ├── voice-service/                  # ★ Pipecat 语音对话引擎 (Python)
│       └── audit-service/
│
├── .claude/                                 # ★ Claude Code Skills 目录
│   ├── skills/                              #   运维技能定义
│   │   ├── inspect/SKILL.md                 #   全面巡检 Skill
│   │   ├── deploy/SKILL.md                  #   部署 Skill
│   │   ├── rollback/SKILL.md                #   回滚 Skill
│   │   ├── cleanup/SKILL.md                 #   清理 Skill
│   │   ├── diagnose/SKILL.md                #   故障诊断 Skill
│   │   ├── scale/SKILL.md                   #   扩缩容 Skill
│   │   ├── backup-check/SKILL.md            #   备份验证 Skill
│   │   └── security-audit/SKILL.md          #   安全审计 Skill
│   ├── commands/                            #   兼容旧版 slash commands
│   └── settings.json                        #   Hook + 权限配置
│
├── ops-config/                              # 运维配置
│   ├── system-prompts/                      # System Prompt 模板
│   ├── hooks/                               # Claude Code Hook 脚本
│   │   └── pre-tool-use-hook.py
│   ├── runbooks/                            # Runbook 模板(高级,多步骤)
│   └── risk-patterns.json                   # 危险命令模式库
│
├── deploy/
│   ├── docker-compose.yml                   # 开发环境
│   ├── docker-compose.prod.yml              # 生产环境
│   └── k8s/                                 # Kubernetes 部署清单
│
├── package.json                             # Monorepo root (pnpm workspace)
├── pnpm-workspace.yaml
├── tsconfig.base.json
└── .env.example

11.2 单个微服务标准结构Clean Architecture

{service-name}/
├── src/
│   ├── domain/                      # 领域层 — 零外部依赖
│   │   ├── entities/                #   聚合根 & 实体
│   │   ├── value-objects/           #   值对象
│   │   ├── ports/                   #   端口(接口定义)
│   │   │   ├── inbound/             #     入站端口(用例接口)
│   │   │   └── outbound/            #     出站端口(依赖接口)
│   │   ├── events/                  #   领域事件
│   │   └── services/                #   领域服务
│   │
│   ├── application/                 # 应用层 — 用例编排
│   │   ├── use-cases/               #   用例实现
│   │   └── dto/                     #   数据传输对象
│   │
│   ├── infrastructure/              # 基础设施层 — 适配器实现
│   │   ├── persistence/             #   TypeORM 实体 & 仓储
│   │   ├── messaging/               #   消息队列
│   │   ├── clients/                 #   外部服务客户端
│   │   └── config/                  #   配置
│   │
│   ├── interfaces/                  # 接口层 — 控制器
│   │   ├── rest/                    #   REST API
│   │   ├── grpc/                    #   gRPC 服务
│   │   └── ws/                      #   WebSocket 网关
│   │
│   ├── app.module.ts
│   └── main.ts
│
├── test/
│   ├── unit/
│   ├── integration/
│   └── e2e/
│
├── Dockerfile
├── package.json
└── tsconfig.json

12. 开发路线图

Phase 1: 基础骨架Week 1-2

  • Monorepo 初始化pnpm workspace + NestJS CLI
  • auth-service: JWT 认证 + RBAC
  • agent-service: Claude Code CLI 引擎 + 流式输出
  • 命令风险分级 + Hook 拦截脚本
  • WebSocket 实时推送

Phase 2: 运维核心Week 3-4

  • inventory-service: 服务器/凭证管理
  • ops-service: 任务创建 + 审批工作流
  • 端到端流程:前端发起任务 → Agent 执行 → 审批 → 完成
  • Runbook 模板系统

Phase 3: 监控告警Week 5-6

  • monitor-service: 健康检查 + 指标采集
  • 告警规则 + 自动触发 Runbook
  • 告警 → 通知升级链路

Phase 4: 多渠道通信 + 语音对话Week 7-9

  • comm-service: 推送 + 短信 + 邮件 + IMTelegram/企业微信)
  • voice-service: Pipecat + faster-whisper + Kokoro-82M 部署
  • Flutter App WebSocket 音频流 ←→ voice-service 对接
  • Pipecat 主动拨号Twilio 电话线路)+ Media Streams 语音对话
  • 三层递进升级链路端到端测试

Phase 5: 引擎扩展Week 9-10

  • Claude API 引擎实现(备选方案)
  • 引擎运行时切换 + 性能对比
  • audit-service: 完整审计链

13. 环境变量配置

# .env.example

# ===== 全局 =====
NODE_ENV=development
LOG_LEVEL=debug

# ===== PostgreSQL =====
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
POSTGRES_USER=it0
POSTGRES_PASSWORD=changeme
POSTGRES_DB=it0

# ===== Redis =====
REDIS_HOST=localhost
REDIS_PORT=6379

# ===== Agent Engine =====
AGENT_ENGINE_TYPE=claude_code_cli        # claude_code_cli | claude_api | custom
CLAUDE_CODE_PATH=/usr/local/bin/claude   # Claude Code CLI 路径
ANTHROPIC_API_KEY=sk-ant-xxx             # Claude API 备选方案

# ===== 安全 =====
VAULT_MASTER_KEY=your-256-bit-key        # 凭证加密密钥
JWT_SECRET=your-jwt-secret
JWT_EXPIRES_IN=24h

# ===== 通信渠道comm-service=====
SMTP_HOST=smtp.gmail.com
SMTP_PORT=587
SMTP_USER=xxx
SMTP_PASSWORD=xxx
TELEGRAM_BOT_TOKEN=xxx
WECHAT_CORP_ID=xxx
WECHAT_CORP_SECRET=xxx

# ===== voice-servicePipecat 语音对话引擎)=====
VOICE_SERVICE_URL=http://voice-service:3008  # voice-service 内部地址
WHISPER_MODEL=large-v3                       # faster-whisper 模型 (base/small/medium/large-v3)
KOKORO_MODEL=kokoro-82m                      # Kokoro TTS 模型
VOICE_DEVICE=cuda                            # STT/TTS 推理设备 (cuda/cpu)

# ===== Twilio电话线路载体Pipecat 通过它拨号)=====
TWILIO_ACCOUNT_SID=ACxxx
TWILIO_AUTH_TOKEN=xxx
TWILIO_PHONE_NUMBER=+1234567890
TWILIO_MEDIA_STREAMS_URL=wss://voice-service:3008/twilio-stream  # Media Streams 转接地址

14. 关键设计决策备忘

决策 选择 理由
引擎抽象层 Strategy + Adapter 运行时切换,不需要重启服务
服务间同步通信 gRPC 类型安全、高性能、双向流
服务间异步通信 Redis Streams 轻量、可靠、不需要额外中间件
数据库 PostgreSQL每服务独立 schema 同一个 PG 实例多 schema开发简单生产可拆分
凭证存储 AES-256-GCM + PostgreSQL 不引入 HashiCorp Vault 的复杂度,够用
审计日志 Append-Only + 数据库级权限控制 防篡改,满足合规需求
定时任务 Bull Queue (Redis-backed) NestJS 原生支持,可靠的延迟/重试机制
监控指标存储 PostgreSQL 分区表 初期够用;后期可迁移 TimescaleDB
前后端通信 REST + WebSocket REST 用于 CRUDWebSocket 用于实时流
API 网关 Kong DB-less 声明式 无需额外 DBYAML 配置即基础设施,原生支持 WebSocket/JWT/限流
AI Skills Anthropic Claude Skills (.claude/skills/) 原生 Claude Code 支持 + DB 同步实现跨引擎可移植
Skills vs Runbook Skills=原子技能Runbook=编排流程 Skills 是底层能力单元Runbook 组合 Skills 实现多步骤业务流程
多租户隔离 Schema-per-Tenant + 行级 tenant_id Schema 级隔离安全性高tenant_id 列防御纵深;避免 Database-per-Tenant 的运维复杂度
租户上下文 AsyncLocalStorage 无侵入式传播不需要每个方法显式传参NestJS 中间件统一注入
租户 Schema 管理 TypeORM SET search_path 动态切换,无需维护多 DataSourceQueryRunner 级别隔离
自治运维范式 驻留指令Standing Orders 对话定义 → 结构化提取 → 自主执行 → 智能升级;避免 24/7 人工在线
驻留指令执行 Bull Queue + cron 调度 可靠的定时触发、重试、并发控制;执行记录完整审计
智能升级通道 三层递进(推送→电话唤醒→扩大通知) App 推送优先2分钟无响应→电话唤醒5分钟→IM/短信10分钟→备用联系人/邮件
语音对话引擎 Pipecat (Python, 10.1k stars) 原生 Claude 集成 + 自部署 STT/TTS + 打断支持 + < 500ms 延迟;不依赖外部语音 API
STT 选型 faster-whisper (large-v3) 20.6k stars4x 快于原版 Whisper支持 GPU + 8bit 量化,中英文准确率高
TTS 选型 Kokoro-82M Apache 开源82M 参数质量媲美大模型中英双语CPU/GPU 均可运行
Twilio 定位 纯电话线路载体 Pipecat 通过 Twilio REST API 拨号 + Media Streams 传输音频;无 IVRAgent 接通即说话