package services import ( "context" "encoding/json" "intent-system/internal/licensecheck" cainit "intent-system/pkg/cache" "intent-system/pkg/config" "intent-system/pkg/controllers" "intent-system/pkg/itypes" "intent-system/pkg/middleware" "intent-system/pkg/routers" "intent-system/pkg/utils" "net/http" "os" "strings" "sync/atomic" "time" "intent-system/pkg/dal/core" // ✅ 用于调用 getSupabaseURL 和 DeployFromTar "github.com/civet148/log" "github.com/docker/docker/api/types" "github.com/docker/docker/client" "github.com/gin-gonic/gin" ) /* ---------- 追加:自动部署节流控制 ---------- */ var ( deploying int32 // 0 = idle, 1 = busy failStreak int32 // 连续失败次数 maxFails int32 = 3 // 失败阈值 cooldown = 5 * time.Minute nextAllowed int64 // 进入冷静期的时间点(UnixNano) ) type Manager struct { *controllers.Controller cfg *config.Config router *gin.Engine routerRPC *gin.Engine } func NewManager(cfg *config.Config) *Manager { m := &Manager{ cfg: cfg, router: gin.New(), routerRPC: gin.New(), Controller: controllers.NewController(cfg), } return m } func (m *Manager) monitorHealthAndRedeploy(ctx context.Context) { /* ====== 开机延时 60 s,再自检 ====== */ select { case <-time.After(60 * time.Second): // 冷启动缓冲 case <-ctx.Done(): return } ensureCradleReady(ctx) // 自检 + 强制部署 // ③ 开启 UDP 广播(9876 端口,2 s 间隔) utils.StartBroadcast(ctx, 9876, 2, "PlugAI ZeroStack AI Server") ticker := time.NewTicker(75 * time.Second) //这个75秒没有特殊意思,是作者拍脑袋定的。 defer ticker.Stop() var ( lastHealthIP string changeSince time.Time ipChanging bool ) client := &http.Client{Timeout: 5 * time.Second} for { select { case <-ctx.Done(): return case <-ticker.C: // ① 本机 IP supURL := core.GetSupabaseURL() localIP, err := utils.ExtractIP(supURL) if err != nil { log.Warnf("解析本机 IP 失败: %v", err) continue } // ② health 接口 resp, err := client.Get("http://localhost:3008/api/health") if err != nil { log.Warnf("请求 health 失败: %v", err) continue } var res struct { Ip string `json:"ip"` } if err := json.NewDecoder(resp.Body).Decode(&res); err != nil { log.Warnf("解析 health 响应失败: %v", err) resp.Body.Close() continue } resp.Body.Close() // 打印一次对比 log.Infof("health API 返回 IP=%s,localIP=%s", res.Ip, localIP) /* ---------- ③ 首次采样 ---------- */ if lastHealthIP == "" { lastHealthIP = res.Ip // 若开机就不一致,立即进入计时 if res.Ip != localIP { ipChanging = true changeSince = time.Now() } continue } /* ---------- ④ IP 变化 ---------- */ if res.Ip != lastHealthIP { log.Infof("health IP 变化: %s → %s", lastHealthIP, res.Ip) lastHealthIP = res.Ip changeSince = time.Now() ipChanging = true } /* ---------- ⑤ 满足触发条件 ---------- */ if !(ipChanging && time.Since(changeSince) > 10*time.Second && res.Ip != localIP) { continue } /* ---------- ⑥ 节流 & 冷静期 ---------- */ if ts := atomic.LoadInt64(&nextAllowed); ts > time.Now().UnixNano() { log.Warn("处于冷却期,跳过自动部署") continue } if !atomic.CompareAndSwapInt32(&deploying, 0, 1) { continue // 正在部署 } // ⑥ 节流 & 冷静期 之后,替换 goroutine 内部逻辑 go func() { defer atomic.StoreInt32(&deploying, 0) // 调用返回 (id, err) id, err := core.DeployCradleTar(ctx) if err != nil { log.Errorf("自动部署失败: %v", err) if atomic.AddInt32(&failStreak, 1) >= maxFails { atomic.StoreInt64(&nextAllowed, time.Now().Add(cooldown).UnixNano()) log.Errorf("连续失败 %d 次,进入冷却 %v", maxFails, cooldown) } return } // 部署成功 failStreak = 0 lastHealthIP = localIP ipChanging = false log.Infof("✅ 自动部署成功,containerID=%s", id) }() } } } func (m *Manager) Run() (err error) { if err = licensecheck.Validate("/etc/license.dat"); err != nil { log.Errorf("License invalid/expired: %v — exiting immediately", err) os.Exit(1) } else { log.Infof("License check passed") } // ✅ 自动启动 IP 变化监控和部署逻辑 // ctx := context.Background() // go m.monitorHealthAndRedeploy(ctx) _ = m.runManager(func() error { //save config to local storage cainit.InitBigCache() //start up web service, if success this routine will be blocked if err = m.startWebService(); err != nil { m.Close() log.Errorf("start web service error [%s]", err) return err } return err }) return } func (m *Manager) Close() { } func (m *Manager) initRouterMgr() (r *gin.Engine) { m.router.Use(gin.Logger()) m.router.Use(gin.Recovery()) m.router.Use(middleware.Cors()) //m.router.Static("/", m.cfg.Static) routers.InitRouterGroupCommon(m.router, m) //通用接口 routers.InitRouterGroupPlatform(m.router, m) //管理系统 routers.InitRouterGroupCustomer(m.router, m) //客户管理 routers.InitRouterGateway(m.router, m) //网关接口 routers.InitRouterGroupBiz(m.router, m) //业务接口 routers.InitRouterGroupDeploy(m.router, m) //部署有关接口 routers.InitRouterGroupDeployWs(m.router, m) //websocket 推送部署进度信息 return m.router } func (m *Manager) runManager(run func() error) (err error) { return run() } func (m *Manager) startWebService() (err error) { if !m.createImagesDir() { log.Panic("create images dir failed") } routerMgr := m.initRouterMgr() strHttpAddr := m.cfg.HttpAddr log.Infof("starting http server on %s \n", strHttpAddr) //Web manager service if err = http.ListenAndServe(strHttpAddr, routerMgr); err != nil { //if everything is fine, it will block this routine log.Panic("listen http server [%s] error [%s]\n", strHttpAddr, err.Error()) } return } func (m *Manager) createImagesDir() bool { var strUrl = m.cfg.ImagePrefix var strImages string if err := os.MkdirAll(m.cfg.ImagePath, os.ModePerm); err != nil { log.Errorf("make dir [%s] error [%s]", m.cfg.ImagePath, err.Error()) return false } log.Infof("make dir [%s] ok", m.cfg.ImagePath) strUrl = strings.ToLower(strUrl) strUrl = strings.TrimPrefix(strUrl, "http://") strUrl = strings.TrimPrefix(strUrl, "https://") idx := strings.Index(strUrl, "/") if idx < 0 { log.Panic("url [%s] images path not invalid ", strUrl) } strImages = strUrl[idx:] if m.cfg.Static == "" { m.cfg.Static = itypes.DefaultStaticHome } strLink := m.cfg.Static + strImages idx = strings.LastIndex(strLink, "/") strPrefixDir := strLink[:idx] if err := os.MkdirAll(strPrefixDir, os.ModePerm); err != nil { log.Errorf("make dir [%s] error [%s]", strPrefixDir, err.Error()) return false } log.Infof("make dir [%s] ok", strPrefixDir) if err := os.Symlink(m.cfg.ImagePath, strLink); err != nil { if !strings.Contains(err.Error(), "exists") { log.Errorf("make link [%s] error [%s]", strLink, err.Error()) return false } } log.Infof("make link [%s] to [%s] ok", strLink, m.cfg.ImagePath) return true } // ensureCradleReady 确保系统里有一个正常运行的 cradle_amd64 容器; // 不存在或状态异常则先清理再重新部署。 func ensureCradleReady(ctx context.Context) { cli, err := client.NewClientWithOpts(client.FromEnv) if err != nil { log.Warnf("docker client err: %v", err) return } defer cli.Close() const imageTag = "cradle:latest" // 镜像名 const containerNameKeyword = "Cradle" // 容器名关键字,区分大小写(如需不区分,可用 strings.ToLower) var ( needDeploy = true imagesToDelete = map[string]struct{}{} ) // ① 遍历所有容器 cs, _ := cli.ContainerList(ctx, types.ContainerListOptions{All: true}) for _, c := range cs { // c.Names 是 []string,通常只有一个,以 "/" 开头 name := strings.TrimPrefix(c.Names[0], "/") if !strings.Contains(name, containerNameKeyword) { continue } imagesToDelete[c.Image] = struct{}{} // 记录同名镜像 if c.State == "running" { needDeploy = false // 已有正常实例 continue } // 状态异常:stop + rm timeout := 10 * time.Second _ = cli.ContainerStop(ctx, c.ID, &timeout) _ = cli.ContainerRemove(ctx, c.ID, types.ContainerRemoveOptions{Force: true}) log.Warnf("清理异常容器 %s (state=%s)", name, c.State) } // ② 删除残留镜像(若需要) if needDeploy { for img := range imagesToDelete { _, _ = cli.ImageRemove(ctx, img, types.ImageRemoveOptions{Force: true, PruneChildren: true}) log.Warnf("删除旧镜像 %s", img) } // ③ 重新部署 if id, err := core.DeployCradleTar(ctx); err != nil { log.Errorf("开机强制部署失败: %v", err) } else { log.Infof("✅ 开机强制部署完成,containerID=%s", id) } } else { log.Infof("检测到正在运行的 cradle 容器,跳过开机部署") } }