330 lines
8.9 KiB
Go
330 lines
8.9 KiB
Go
package services
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"intent-system/internal/licensecheck"
|
||
cainit "intent-system/pkg/cache"
|
||
"intent-system/pkg/config"
|
||
"intent-system/pkg/controllers"
|
||
"intent-system/pkg/itypes"
|
||
"intent-system/pkg/middleware"
|
||
"intent-system/pkg/routers"
|
||
"intent-system/pkg/utils"
|
||
"net/http"
|
||
"os"
|
||
"strings"
|
||
"sync/atomic"
|
||
"time"
|
||
|
||
"intent-system/pkg/dal/core" // ✅ 用于调用 getSupabaseURL 和 DeployFromTar
|
||
|
||
"github.com/civet148/log"
|
||
"github.com/docker/docker/api/types"
|
||
"github.com/docker/docker/client"
|
||
"github.com/gin-gonic/gin"
|
||
)
|
||
|
||
/* ---------- 追加:自动部署节流控制 ---------- */
|
||
var (
|
||
deploying int32 // 0 = idle, 1 = busy
|
||
failStreak int32 // 连续失败次数
|
||
maxFails int32 = 3 // 失败阈值
|
||
cooldown = 5 * time.Minute
|
||
nextAllowed int64 // 进入冷静期的时间点(UnixNano)
|
||
)
|
||
|
||
type Manager struct {
|
||
*controllers.Controller
|
||
cfg *config.Config
|
||
router *gin.Engine
|
||
routerRPC *gin.Engine
|
||
}
|
||
|
||
func NewManager(cfg *config.Config) *Manager {
|
||
m := &Manager{
|
||
cfg: cfg,
|
||
router: gin.New(),
|
||
routerRPC: gin.New(),
|
||
Controller: controllers.NewController(cfg),
|
||
}
|
||
return m
|
||
}
|
||
|
||
func (m *Manager) monitorHealthAndRedeploy(ctx context.Context) {
|
||
|
||
/* ====== 开机延时 60 s,再自检 ====== */
|
||
select {
|
||
case <-time.After(60 * time.Second): // 冷启动缓冲
|
||
case <-ctx.Done():
|
||
return
|
||
}
|
||
ensureCradleReady(ctx) // 自检 + 强制部署
|
||
|
||
// ③ 开启 UDP 广播(9876 端口,2 s 间隔)
|
||
utils.StartBroadcast(ctx, 9876, 2, "PlugAI ZeroStack AI Server")
|
||
|
||
ticker := time.NewTicker(75 * time.Second) //这个75秒没有特殊意思,是作者拍脑袋定的。
|
||
defer ticker.Stop()
|
||
|
||
var (
|
||
lastHealthIP string
|
||
changeSince time.Time
|
||
ipChanging bool
|
||
)
|
||
|
||
client := &http.Client{Timeout: 5 * time.Second}
|
||
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
|
||
case <-ticker.C:
|
||
// ① 本机 IP
|
||
supURL := core.GetSupabaseURL()
|
||
localIP, err := utils.ExtractIP(supURL)
|
||
if err != nil {
|
||
log.Warnf("解析本机 IP 失败: %v", err)
|
||
continue
|
||
}
|
||
|
||
// ② health 接口
|
||
resp, err := client.Get("http://localhost:3008/api/health")
|
||
if err != nil {
|
||
log.Warnf("请求 health 失败: %v", err)
|
||
continue
|
||
}
|
||
var res struct {
|
||
Ip string `json:"ip"`
|
||
}
|
||
if err := json.NewDecoder(resp.Body).Decode(&res); err != nil {
|
||
log.Warnf("解析 health 响应失败: %v", err)
|
||
resp.Body.Close()
|
||
continue
|
||
}
|
||
resp.Body.Close()
|
||
|
||
// 打印一次对比
|
||
log.Infof("health API 返回 IP=%s,localIP=%s", res.Ip, localIP)
|
||
|
||
/* ---------- ③ 首次采样 ---------- */
|
||
if lastHealthIP == "" {
|
||
lastHealthIP = res.Ip
|
||
// 若开机就不一致,立即进入计时
|
||
if res.Ip != localIP {
|
||
ipChanging = true
|
||
changeSince = time.Now()
|
||
}
|
||
continue
|
||
}
|
||
|
||
/* ---------- ④ IP 变化 ---------- */
|
||
if res.Ip != lastHealthIP {
|
||
log.Infof("health IP 变化: %s → %s", lastHealthIP, res.Ip)
|
||
lastHealthIP = res.Ip
|
||
changeSince = time.Now()
|
||
ipChanging = true
|
||
}
|
||
|
||
/* ---------- ⑤ 满足触发条件 ---------- */
|
||
if !(ipChanging && time.Since(changeSince) > 10*time.Second && res.Ip != localIP) {
|
||
continue
|
||
}
|
||
|
||
/* ---------- ⑥ 节流 & 冷静期 ---------- */
|
||
if ts := atomic.LoadInt64(&nextAllowed); ts > time.Now().UnixNano() {
|
||
log.Warn("处于冷却期,跳过自动部署")
|
||
continue
|
||
}
|
||
if !atomic.CompareAndSwapInt32(&deploying, 0, 1) {
|
||
continue // 正在部署
|
||
}
|
||
|
||
// ⑥ 节流 & 冷静期 之后,替换 goroutine 内部逻辑
|
||
go func() {
|
||
defer atomic.StoreInt32(&deploying, 0)
|
||
|
||
// 调用返回 (id, err)
|
||
id, err := core.DeployCradleTar(ctx)
|
||
if err != nil {
|
||
log.Errorf("自动部署失败: %v", err)
|
||
|
||
if atomic.AddInt32(&failStreak, 1) >= maxFails {
|
||
atomic.StoreInt64(&nextAllowed, time.Now().Add(cooldown).UnixNano())
|
||
log.Errorf("连续失败 %d 次,进入冷却 %v", maxFails, cooldown)
|
||
}
|
||
return
|
||
}
|
||
|
||
// 部署成功
|
||
failStreak = 0
|
||
lastHealthIP = localIP
|
||
ipChanging = false
|
||
log.Infof("✅ 自动部署成功,containerID=%s", id)
|
||
}()
|
||
}
|
||
}
|
||
}
|
||
|
||
func (m *Manager) Run() (err error) {
|
||
|
||
if err = licensecheck.Validate("/etc/license.dat"); err != nil {
|
||
log.Errorf("License invalid/expired: %v — exiting immediately", err)
|
||
os.Exit(1)
|
||
} else {
|
||
log.Infof("License check passed")
|
||
}
|
||
|
||
// ✅ 自动启动 IP 变化监控和部署逻辑
|
||
// ctx := context.Background()
|
||
// go m.monitorHealthAndRedeploy(ctx)
|
||
|
||
_ = m.runManager(func() error {
|
||
//save config to local storage
|
||
cainit.InitBigCache()
|
||
//start up web service, if success this routine will be blocked
|
||
if err = m.startWebService(); err != nil {
|
||
m.Close()
|
||
log.Errorf("start web service error [%s]", err)
|
||
return err
|
||
}
|
||
return err
|
||
})
|
||
return
|
||
}
|
||
|
||
func (m *Manager) Close() {
|
||
|
||
}
|
||
|
||
func (m *Manager) initRouterMgr() (r *gin.Engine) {
|
||
|
||
m.router.Use(gin.Logger())
|
||
m.router.Use(gin.Recovery())
|
||
m.router.Use(middleware.Cors())
|
||
//m.router.Static("/", m.cfg.Static)
|
||
routers.InitRouterGroupCommon(m.router, m) //通用接口
|
||
routers.InitRouterGroupPlatform(m.router, m) //管理系统
|
||
routers.InitRouterGroupCustomer(m.router, m) //客户管理
|
||
routers.InitRouterGateway(m.router, m) //网关接口
|
||
routers.InitRouterGroupBiz(m.router, m) //业务接口
|
||
routers.InitRouterGroupDeploy(m.router, m) //部署有关接口
|
||
routers.InitRouterGroupDeployWs(m.router, m) //websocket 推送部署进度信息
|
||
return m.router
|
||
}
|
||
|
||
func (m *Manager) runManager(run func() error) (err error) {
|
||
return run()
|
||
}
|
||
|
||
func (m *Manager) startWebService() (err error) {
|
||
if !m.createImagesDir() {
|
||
log.Panic("create images dir failed")
|
||
}
|
||
routerMgr := m.initRouterMgr()
|
||
strHttpAddr := m.cfg.HttpAddr
|
||
log.Infof("starting http server on %s \n", strHttpAddr)
|
||
//Web manager service
|
||
if err = http.ListenAndServe(strHttpAddr, routerMgr); err != nil { //if everything is fine, it will block this routine
|
||
log.Panic("listen http server [%s] error [%s]\n", strHttpAddr, err.Error())
|
||
}
|
||
return
|
||
}
|
||
|
||
func (m *Manager) createImagesDir() bool {
|
||
var strUrl = m.cfg.ImagePrefix
|
||
var strImages string
|
||
if err := os.MkdirAll(m.cfg.ImagePath, os.ModePerm); err != nil {
|
||
log.Errorf("make dir [%s] error [%s]", m.cfg.ImagePath, err.Error())
|
||
return false
|
||
}
|
||
log.Infof("make dir [%s] ok", m.cfg.ImagePath)
|
||
strUrl = strings.ToLower(strUrl)
|
||
strUrl = strings.TrimPrefix(strUrl, "http://")
|
||
strUrl = strings.TrimPrefix(strUrl, "https://")
|
||
idx := strings.Index(strUrl, "/")
|
||
if idx < 0 {
|
||
log.Panic("url [%s] images path not invalid ", strUrl)
|
||
}
|
||
strImages = strUrl[idx:]
|
||
if m.cfg.Static == "" {
|
||
m.cfg.Static = itypes.DefaultStaticHome
|
||
}
|
||
strLink := m.cfg.Static + strImages
|
||
idx = strings.LastIndex(strLink, "/")
|
||
strPrefixDir := strLink[:idx]
|
||
if err := os.MkdirAll(strPrefixDir, os.ModePerm); err != nil {
|
||
log.Errorf("make dir [%s] error [%s]", strPrefixDir, err.Error())
|
||
return false
|
||
}
|
||
log.Infof("make dir [%s] ok", strPrefixDir)
|
||
|
||
if err := os.Symlink(m.cfg.ImagePath, strLink); err != nil {
|
||
if !strings.Contains(err.Error(), "exists") {
|
||
log.Errorf("make link [%s] error [%s]", strLink, err.Error())
|
||
return false
|
||
}
|
||
}
|
||
log.Infof("make link [%s] to [%s] ok", strLink, m.cfg.ImagePath)
|
||
return true
|
||
}
|
||
|
||
// ensureCradleReady 确保系统里有一个正常运行的 cradle_amd64 容器;
|
||
// 不存在或状态异常则先清理再重新部署。
|
||
func ensureCradleReady(ctx context.Context) {
|
||
cli, err := client.NewClientWithOpts(client.FromEnv)
|
||
if err != nil {
|
||
log.Warnf("docker client err: %v", err)
|
||
return
|
||
}
|
||
defer cli.Close()
|
||
|
||
const imageTag = "cradle:latest" // 镜像名
|
||
const containerNameKeyword = "Cradle" // 容器名关键字,区分大小写(如需不区分,可用 strings.ToLower)
|
||
|
||
var (
|
||
needDeploy = true
|
||
imagesToDelete = map[string]struct{}{}
|
||
)
|
||
|
||
// ① 遍历所有容器
|
||
cs, _ := cli.ContainerList(ctx, types.ContainerListOptions{All: true})
|
||
for _, c := range cs {
|
||
// c.Names 是 []string,通常只有一个,以 "/" 开头
|
||
name := strings.TrimPrefix(c.Names[0], "/")
|
||
if !strings.Contains(name, containerNameKeyword) {
|
||
continue
|
||
}
|
||
|
||
imagesToDelete[c.Image] = struct{}{} // 记录同名镜像
|
||
|
||
if c.State == "running" {
|
||
needDeploy = false // 已有正常实例
|
||
continue
|
||
}
|
||
|
||
// 状态异常:stop + rm
|
||
timeout := 10 * time.Second
|
||
_ = cli.ContainerStop(ctx, c.ID, &timeout)
|
||
_ = cli.ContainerRemove(ctx, c.ID, types.ContainerRemoveOptions{Force: true})
|
||
log.Warnf("清理异常容器 %s (state=%s)", name, c.State)
|
||
}
|
||
|
||
// ② 删除残留镜像(若需要)
|
||
if needDeploy {
|
||
for img := range imagesToDelete {
|
||
_, _ = cli.ImageRemove(ctx, img, types.ImageRemoveOptions{Force: true, PruneChildren: true})
|
||
log.Warnf("删除旧镜像 %s", img)
|
||
}
|
||
// ③ 重新部署
|
||
if id, err := core.DeployCradleTar(ctx); err != nil {
|
||
log.Errorf("开机强制部署失败: %v", err)
|
||
} else {
|
||
log.Infof("✅ 开机强制部署完成,containerID=%s", id)
|
||
}
|
||
} else {
|
||
log.Infof("检测到正在运行的 cradle 容器,跳过开机部署")
|
||
}
|
||
}
|