plugai_updsrv/pkg/services/service_manager.go

330 lines
8.9 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package services
import (
"context"
"encoding/json"
"intent-system/internal/licensecheck"
cainit "intent-system/pkg/cache"
"intent-system/pkg/config"
"intent-system/pkg/controllers"
"intent-system/pkg/itypes"
"intent-system/pkg/middleware"
"intent-system/pkg/routers"
"intent-system/pkg/utils"
"net/http"
"os"
"strings"
"sync/atomic"
"time"
"intent-system/pkg/dal/core" // ✅ 用于调用 getSupabaseURL 和 DeployFromTar
"github.com/civet148/log"
"github.com/docker/docker/api/types"
"github.com/docker/docker/client"
"github.com/gin-gonic/gin"
)
/* ---------- 追加:自动部署节流控制 ---------- */
var (
deploying int32 // 0 = idle, 1 = busy
failStreak int32 // 连续失败次数
maxFails int32 = 3 // 失败阈值
cooldown = 5 * time.Minute
nextAllowed int64 // 进入冷静期的时间点UnixNano
)
type Manager struct {
*controllers.Controller
cfg *config.Config
router *gin.Engine
routerRPC *gin.Engine
}
func NewManager(cfg *config.Config) *Manager {
m := &Manager{
cfg: cfg,
router: gin.New(),
routerRPC: gin.New(),
Controller: controllers.NewController(cfg),
}
return m
}
func (m *Manager) monitorHealthAndRedeploy(ctx context.Context) {
/* ====== 开机延时 60 s再自检 ====== */
select {
case <-time.After(60 * time.Second): // 冷启动缓冲
case <-ctx.Done():
return
}
ensureCradleReady(ctx) // 自检 + 强制部署
// ③ 开启 UDP 广播9876 端口2 s 间隔)
utils.StartBroadcast(ctx, 9876, 2, "PlugAI ZeroStack AI Server")
ticker := time.NewTicker(75 * time.Second) //这个75秒没有特殊意思是作者拍脑袋定的。
defer ticker.Stop()
var (
lastHealthIP string
changeSince time.Time
ipChanging bool
)
client := &http.Client{Timeout: 5 * time.Second}
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// ① 本机 IP
supURL := core.GetSupabaseURL()
localIP, err := utils.ExtractIP(supURL)
if err != nil {
log.Warnf("解析本机 IP 失败: %v", err)
continue
}
// ② health 接口
resp, err := client.Get("http://localhost:3008/api/health")
if err != nil {
log.Warnf("请求 health 失败: %v", err)
continue
}
var res struct {
Ip string `json:"ip"`
}
if err := json.NewDecoder(resp.Body).Decode(&res); err != nil {
log.Warnf("解析 health 响应失败: %v", err)
resp.Body.Close()
continue
}
resp.Body.Close()
// 打印一次对比
log.Infof("health API 返回 IP=%slocalIP=%s", res.Ip, localIP)
/* ---------- ③ 首次采样 ---------- */
if lastHealthIP == "" {
lastHealthIP = res.Ip
// 若开机就不一致,立即进入计时
if res.Ip != localIP {
ipChanging = true
changeSince = time.Now()
}
continue
}
/* ---------- ④ IP 变化 ---------- */
if res.Ip != lastHealthIP {
log.Infof("health IP 变化: %s → %s", lastHealthIP, res.Ip)
lastHealthIP = res.Ip
changeSince = time.Now()
ipChanging = true
}
/* ---------- ⑤ 满足触发条件 ---------- */
if !(ipChanging && time.Since(changeSince) > 10*time.Second && res.Ip != localIP) {
continue
}
/* ---------- ⑥ 节流 & 冷静期 ---------- */
if ts := atomic.LoadInt64(&nextAllowed); ts > time.Now().UnixNano() {
log.Warn("处于冷却期,跳过自动部署")
continue
}
if !atomic.CompareAndSwapInt32(&deploying, 0, 1) {
continue // 正在部署
}
// ⑥ 节流 & 冷静期 之后,替换 goroutine 内部逻辑
go func() {
defer atomic.StoreInt32(&deploying, 0)
// 调用返回 (id, err)
id, err := core.DeployCradleTar(ctx)
if err != nil {
log.Errorf("自动部署失败: %v", err)
if atomic.AddInt32(&failStreak, 1) >= maxFails {
atomic.StoreInt64(&nextAllowed, time.Now().Add(cooldown).UnixNano())
log.Errorf("连续失败 %d 次,进入冷却 %v", maxFails, cooldown)
}
return
}
// 部署成功
failStreak = 0
lastHealthIP = localIP
ipChanging = false
log.Infof("✅ 自动部署成功containerID=%s", id)
}()
}
}
}
func (m *Manager) Run() (err error) {
if err = licensecheck.Validate("/etc/license.dat"); err != nil {
log.Errorf("License invalid/expired: %v — exiting immediately", err)
os.Exit(1)
} else {
log.Infof("License check passed")
}
// ✅ 自动启动 IP 变化监控和部署逻辑
// ctx := context.Background()
// go m.monitorHealthAndRedeploy(ctx)
_ = m.runManager(func() error {
//save config to local storage
cainit.InitBigCache()
//start up web service, if success this routine will be blocked
if err = m.startWebService(); err != nil {
m.Close()
log.Errorf("start web service error [%s]", err)
return err
}
return err
})
return
}
func (m *Manager) Close() {
}
func (m *Manager) initRouterMgr() (r *gin.Engine) {
m.router.Use(gin.Logger())
m.router.Use(gin.Recovery())
m.router.Use(middleware.Cors())
//m.router.Static("/", m.cfg.Static)
routers.InitRouterGroupCommon(m.router, m) //通用接口
routers.InitRouterGroupPlatform(m.router, m) //管理系统
routers.InitRouterGroupCustomer(m.router, m) //客户管理
routers.InitRouterGateway(m.router, m) //网关接口
routers.InitRouterGroupBiz(m.router, m) //业务接口
routers.InitRouterGroupDeploy(m.router, m) //部署有关接口
routers.InitRouterGroupDeployWs(m.router, m) //websocket 推送部署进度信息
return m.router
}
func (m *Manager) runManager(run func() error) (err error) {
return run()
}
func (m *Manager) startWebService() (err error) {
if !m.createImagesDir() {
log.Panic("create images dir failed")
}
routerMgr := m.initRouterMgr()
strHttpAddr := m.cfg.HttpAddr
log.Infof("starting http server on %s \n", strHttpAddr)
//Web manager service
if err = http.ListenAndServe(strHttpAddr, routerMgr); err != nil { //if everything is fine, it will block this routine
log.Panic("listen http server [%s] error [%s]\n", strHttpAddr, err.Error())
}
return
}
func (m *Manager) createImagesDir() bool {
var strUrl = m.cfg.ImagePrefix
var strImages string
if err := os.MkdirAll(m.cfg.ImagePath, os.ModePerm); err != nil {
log.Errorf("make dir [%s] error [%s]", m.cfg.ImagePath, err.Error())
return false
}
log.Infof("make dir [%s] ok", m.cfg.ImagePath)
strUrl = strings.ToLower(strUrl)
strUrl = strings.TrimPrefix(strUrl, "http://")
strUrl = strings.TrimPrefix(strUrl, "https://")
idx := strings.Index(strUrl, "/")
if idx < 0 {
log.Panic("url [%s] images path not invalid ", strUrl)
}
strImages = strUrl[idx:]
if m.cfg.Static == "" {
m.cfg.Static = itypes.DefaultStaticHome
}
strLink := m.cfg.Static + strImages
idx = strings.LastIndex(strLink, "/")
strPrefixDir := strLink[:idx]
if err := os.MkdirAll(strPrefixDir, os.ModePerm); err != nil {
log.Errorf("make dir [%s] error [%s]", strPrefixDir, err.Error())
return false
}
log.Infof("make dir [%s] ok", strPrefixDir)
if err := os.Symlink(m.cfg.ImagePath, strLink); err != nil {
if !strings.Contains(err.Error(), "exists") {
log.Errorf("make link [%s] error [%s]", strLink, err.Error())
return false
}
}
log.Infof("make link [%s] to [%s] ok", strLink, m.cfg.ImagePath)
return true
}
// ensureCradleReady 确保系统里有一个正常运行的 cradle_amd64 容器;
// 不存在或状态异常则先清理再重新部署。
func ensureCradleReady(ctx context.Context) {
cli, err := client.NewClientWithOpts(client.FromEnv)
if err != nil {
log.Warnf("docker client err: %v", err)
return
}
defer cli.Close()
const imageTag = "cradle:latest" // 镜像名
const containerNameKeyword = "Cradle" // 容器名关键字,区分大小写(如需不区分,可用 strings.ToLower
var (
needDeploy = true
imagesToDelete = map[string]struct{}{}
)
// ① 遍历所有容器
cs, _ := cli.ContainerList(ctx, types.ContainerListOptions{All: true})
for _, c := range cs {
// c.Names 是 []string通常只有一个以 "/" 开头
name := strings.TrimPrefix(c.Names[0], "/")
if !strings.Contains(name, containerNameKeyword) {
continue
}
imagesToDelete[c.Image] = struct{}{} // 记录同名镜像
if c.State == "running" {
needDeploy = false // 已有正常实例
continue
}
// 状态异常stop + rm
timeout := 10 * time.Second
_ = cli.ContainerStop(ctx, c.ID, &timeout)
_ = cli.ContainerRemove(ctx, c.ID, types.ContainerRemoveOptions{Force: true})
log.Warnf("清理异常容器 %s (state=%s)", name, c.State)
}
// ② 删除残留镜像(若需要)
if needDeploy {
for img := range imagesToDelete {
_, _ = cli.ImageRemove(ctx, img, types.ImageRemoveOptions{Force: true, PruneChildren: true})
log.Warnf("删除旧镜像 %s", img)
}
// ③ 重新部署
if id, err := core.DeployCradleTar(ctx); err != nil {
log.Errorf("开机强制部署失败: %v", err)
} else {
log.Infof("✅ 开机强制部署完成containerID=%s", id)
}
} else {
log.Infof("检测到正在运行的 cradle 容器,跳过开机部署")
}
}