package dao import ( "encoding/json" "errors" "fmt" "intent-system/pkg/dal/models" "strings" "github.com/civet148/log" "github.com/civet148/sqlca/v2" ) // 客户端带来的本地状态 (org_id, digest) type OrgDigestPair struct { OrgId int64 Digest string } type NewsAsyncCondition struct { Org_Id int64 Digest string } // type NewsPullNewCondition struct { // OrgIDs []int64 `json:"org_ids"` // } type NewsPullNewCondition struct { Pairs []OrgDigestPair } const ( NewsState_NotPublish = 0 //未发布 NewsState_Published = 1 //已发布到订阅列表 NewsState_Pushed = 2 //已推送订阅 ) type NewsCondition struct { PageNo int PageSize int Id int64 Tag string All bool IsDeleted bool ContainExtra bool Asc bool Search string Language string } type NewsDAO struct { db *sqlca.Engine } func NewNewsDAO(db *sqlca.Engine) *NewsDAO { return &NewsDAO{ db: db, } } // insert into table by data model func (dao *NewsDAO) Insert(do *models.NewsDO) (lastInsertId int64, err error) { return dao.db.Model(&do).Table(models.TableNameNews).Insert() } // insert if not exist or update columns on duplicate key... func (dao *NewsDAO) Upsert(do *models.NewsDO, columns ...string) (lastInsertId int64, err error) { return dao.db.Model(&do).Table(models.TableNameNews).Select(columns...).Upsert() } // update table set columns where id=xxx func (dao *NewsDAO) Update(do *models.NewsDO, columns ...string) (rows int64, err error) { return dao.db.Model(&do).Table(models.TableNameNews).Select(columns...).Update() } func (dao *NewsDAO) UpdateByOrgId(do *models.NewsDO, columns ...string) (rows int64, err error) { return dao.db.Model(&do).Table(models.TableNameNews).Eq(models.NEWS_COLUMN_ORG_ID, do.OrgId).Select(columns...).Update() } // query records by id func (dao *NewsDAO) QueryById(id int64, columns ...string) (do *models.NewsDO, err error) { if _, err = dao.db.Model(&do).Table(models.TableNameNews).Id(id).Select(columns...).Query(); err != nil { return nil, log.Errorf(err.Error()) } return } // query records by url func (dao *NewsDAO) QueryByUrl(url string, columns ...string) (do *models.NewsDO, err error) { if _, err = dao.db.Model(&do). Table(models.TableNameNews). Where("url = ?", url). Select(columns...). Query(); err != nil { return nil, log.Errorf(err.Error()) } return } func (dao *NewsDAO) QueryAllByUrl(fullPath string) ([]*models.NewsDO, error) { var list []*models.NewsDO _, err := dao.db.Model(&list). Table(models.TableNameNews). Where("url = ?", fullPath). // ✅ 参数绑定,不拼接 Query() return list, err } func (dao *NewsDAO) QueryOriginalNews(orgId int64, lang string, columns ...string) (do *models.NewsDO, err error) { if _, err = dao.db.Model(&do). Table(models.TableNameNews). Eq(models.NEWS_COLUMN_ORG_ID, orgId). Eq(models.NEWS_COLUMN_LANGUAGE, lang). Eq(models.NEWS_COLUMN_IS_REPLICATE, 0). Eq(models.NEWS_COLUMN_IS_OVERWRITTEN, 1). Select(columns...). Query(); err != nil { return nil, log.Errorf(err.Error()) } return } // query records by conditions func (dao *NewsDAO) QueryByCondition(conditions map[string]interface{}, columns ...string) (dos []*models.NewsDO, err error) { if len(conditions) == 0 { return nil, fmt.Errorf("condition must not be empty") } e := dao.db.Model(&dos).Table(models.TableNameNews).Select(columns...) for k, v := range conditions { e.Eq(k, v) } if _, err = e.Query(); err != nil { return nil, err } return } // query max news id func (dao *NewsDAO) QueryMaxSyncId(lang models.LanguageType) (lastId int64, err error) { if _, err = dao.db.Model(&lastId). Table(models.TableNameNews). Max(models.NEWS_COLUMN_ORG_ID). Eq(models.NEWS_COLUMN_LANGUAGE, lang). Query(); err != nil { return 0, err } return } func (dao *NewsDAO) QueryNotPushed(pageNo, pageSize int) (dos []*models.NewsDO, total int64, err error) { e := dao.db.Model(&dos). Table(models.TableNameNews). Eq(models.NEWS_COLUMN_IS_DELETED, 0). Eq(models.NEWS_COLUMN_STATE, NewsState_NotPublish). Page(pageNo, pageSize). Desc(models.QUESTION_ANSWER_COLUMN_UPDATED_TIME) _, total, err = e.QueryEx() if err != nil { return nil, 0, log.Errorf(err.Error()) } return } func (dao *NewsDAO) QueryList(cond *NewsCondition) (dos []*models.NewsDO, total int64, err error) { e := dao.db.Model(&dos). Table(models.TableNameNews). Select( models.NEWS_COLUMN_ID, models.NEWS_COLUMN_ORG_ID, models.NEWS_COLUMN_SPIDER_ID, models.NEWS_COLUMN_PNAME_ID, models.NEWS_COLUMN_TAG, models.NEWS_COLUMN_CATEGORY, models.NEWS_COLUMN_MAIN_TITLE, models.NEWS_COLUMN_SUB_TITLE, models.NEWS_COLUMN_SUMMARY, models.NEWS_COLUMN_KEYWORDS, models.NEWS_COLUMN_SEO_KEYWORDS, models.NEWS_COLUMN_TAGS, models.NEWS_COLUMN_URL, models.NEWS_COLUMN_IMAGE_URL, models.NEWS_COLUMN_LOGO_URL, models.NEWS_COLUMN_MODEL_PARAMETER, models.NEWS_COLUMN_CONTENT, models.NEWS_COLUMN_IS_HOTSPOT, models.NEWS_COLUMN_IS_OVERWRITTEN, models.NEWS_COLUMN_IS_DELETED, models.NEWS_COLUMN_IS_REPLICATE, models.NEWS_COLUMN_STATE, models.NEWS_COLUMN_CREATED_TIME, models.NEWS_COLUMN_UPDATED_TIME, ). Page(cond.PageNo, cond.PageSize) if cond.Id != 0 { e.Eq(models.NEWS_COLUMN_ID, cond.Id) } if cond.Tag != "" { e.JsonContainArray(models.NEWS_COLUMN_TAGS, cond.Tag) } if !cond.All { e.Eq(models.NEWS_COLUMN_IS_OVERWRITTEN, 0) } if cond.ContainExtra { e.Select(models.NEWS_COLUMN_EXTRA_DATA) } if cond.IsDeleted { e.Eq(models.NEWS_COLUMN_IS_DELETED, 1) } else { e.Eq(models.NEWS_COLUMN_IS_DELETED, 0) } if cond.Language != "" { e.Eq(models.NEWS_COLUMN_LANGUAGE, cond.Language) } if cond.Search != "" { e.Like(models.NEWS_COLUMN_MAIN_TITLE, cond.Search) } if cond.Asc { e.Asc(models.NEWS_COLUMN_UPDATED_TIME, models.NEWS_COLUMN_ID) } else { e.Desc(models.NEWS_COLUMN_UPDATED_TIME, models.NEWS_COLUMN_ID) } _, total, err = e.QueryEx() if err != nil { return nil, 0, log.Errorf(err.Error()) } return } // func (dao *NewsDAO) QueryAsync(cond *NewsAsyncCondition) (dos []*models.NewsDO, total int64, err error) { // log.Infof("[QueryAsync] >>> 调用开始,org_id=%d, digest='%s'", cond.Org_Id, cond.Digest) // if cond.Org_Id == 0 { // log.Warnf("[QueryAsync] org_id 为空,非法请求") // return nil, 0, errors.New("org_id is required") // } // e := dao.db.Model(&dos). // Table(models.TableNameNews). // Select( // models.NEWS_COLUMN_ID, // models.NEWS_COLUMN_ORG_ID, // models.NEWS_COLUMN_SPIDER_ID, // models.NEWS_COLUMN_PNAME_ID, // models.NEWS_COLUMN_TAG, // models.NEWS_COLUMN_CATEGORY, // models.NEWS_COLUMN_MAIN_TITLE, // models.NEWS_COLUMN_SUB_TITLE, // models.NEWS_COLUMN_SUMMARY, // models.NEWS_COLUMN_KEYWORDS, // models.NEWS_COLUMN_SEO_KEYWORDS, // models.NEWS_COLUMN_TAGS, // models.NEWS_COLUMN_URL, // models.NEWS_COLUMN_IMAGE_URL, // models.NEWS_COLUMN_LOGO_URL, // models.NEWS_COLUMN_MODEL_PARAMETER, // models.NEWS_COLUMN_CONTENT, // models.NEWS_COLUMN_IS_HOTSPOT, // models.NEWS_COLUMN_IS_OVERWRITTEN, // models.NEWS_COLUMN_IS_DELETED, // models.NEWS_COLUMN_IS_REPLICATE, // models.NEWS_COLUMN_STATE, // models.NEWS_COLUMN_CREATED_TIME, // models.NEWS_COLUMN_UPDATED_TIME, // models.NEWS_COLUMN_EXTRA_DATA, // ). // Eq(models.NEWS_COLUMN_ORG_ID, cond.Org_Id) // _, total, err = e.QueryEx() // if err != nil { // log.Errorf("[QueryAsync] 查询失败: %v", err) // return nil, 0, err // } // if total == 0 { // log.Warnf("[QueryAsync] org_id=%d 无匹配记录", cond.Org_Id) // return nil, 0, errors.New("未找到对应 org_id 的记录") // } // if total > 1 { // log.Errorf("[QueryAsync] org_id=%d 数据异常,记录不唯一", cond.Org_Id) // return nil, 0, errors.New("org_id 不唯一,数据异常") // } // record := dos[0] // if cond.Digest == "" { // log.Infof("[QueryAsync] org_id=%d digest 为空,直接返回记录", cond.Org_Id) // return []*models.NewsDO{record}, 1, nil // } // // 客户端传了 digest,进行比较 // log.Infof("[QueryAsync] org_id=%d 开始比较 digest,客户端='%s'", cond.Org_Id, cond.Digest) // if record.ExtraData != nil { // if val, ok := record.ExtraData["digest"]; ok { // log.Infof("[QueryAsync] org_id=%d 数据库 digest='%v'", cond.Org_Id, val) // if ds, ok := val.(string); ok && ds == cond.Digest { // log.Infof("[QueryAsync] org_id=%d digest 相同,跳过返回", cond.Org_Id) // return nil, 0, nil // } // } else { // log.Infof("[QueryAsync] org_id=%d ExtraData 中无 digest 字段", cond.Org_Id) // } // } else { // log.Infof("[QueryAsync] org_id=%d ExtraData 为空", cond.Org_Id) // } // log.Infof("[QueryAsync] org_id=%d digest 不同,返回记录", cond.Org_Id) // return []*models.NewsDO{record}, 1, nil // } func (dao *NewsDAO) QueryAsync(cond *NewsAsyncCondition) (dos []*models.NewsDO, total int64, err error) { //log.Infof("[QueryAsync] >>> 调用开始,org_id=%d, digest='%s'", cond.Org_Id, cond.Digest) if cond.Org_Id == 0 { log.Warnf("[QueryAsync] org_id 为空,非法请求") return nil, 0, errors.New("org_id is required") } e := dao.db.Model(&dos). Table(models.TableNameNews). Select( models.NEWS_COLUMN_ID, models.NEWS_COLUMN_ORG_ID, models.NEWS_COLUMN_SPIDER_ID, models.NEWS_COLUMN_PNAME_ID, models.NEWS_COLUMN_TAG, models.NEWS_COLUMN_CATEGORY, models.NEWS_COLUMN_MAIN_TITLE, models.NEWS_COLUMN_SUB_TITLE, models.NEWS_COLUMN_SUMMARY, models.NEWS_COLUMN_KEYWORDS, models.NEWS_COLUMN_SEO_KEYWORDS, models.NEWS_COLUMN_TAGS, models.NEWS_COLUMN_URL, models.NEWS_COLUMN_IMAGE_URL, models.NEWS_COLUMN_LOGO_URL, models.NEWS_COLUMN_MODEL_PARAMETER, models.NEWS_COLUMN_CONTENT, models.NEWS_COLUMN_IS_HOTSPOT, models.NEWS_COLUMN_IS_OVERWRITTEN, models.NEWS_COLUMN_IS_DELETED, models.NEWS_COLUMN_IS_REPLICATE, models.NEWS_COLUMN_STATE, models.NEWS_COLUMN_CREATED_TIME, models.NEWS_COLUMN_UPDATED_TIME, models.NEWS_COLUMN_EXTRA_DATA, ). Eq(models.NEWS_COLUMN_ORG_ID, cond.Org_Id) _, total, err = e.QueryEx() if err != nil { log.Errorf("[QueryAsync] 查询失败: %v", err) return nil, 0, err } if total == 0 { log.Warnf("[QueryAsync] org_id=%d 无匹配记录", cond.Org_Id) return nil, 0, errors.New("未找到对应 org_id 的记录") } if total > 1 { log.Errorf("[QueryAsync] org_id=%d 数据异常,记录不唯一", cond.Org_Id) return nil, 0, errors.New("org_id 不唯一,数据异常") } record := dos[0] // ✅ 解析 ExtraDataRaw → ExtraData if record.ExtraDataRaw != "" { if err := json.Unmarshal([]byte(record.ExtraDataRaw), &record.ExtraData); err != nil { log.Warnf("[QueryAsync] org_id=%d ExtraData 解析失败: %v", record.OrgId, err) } else { log.Infof("[QueryAsync] org_id=%d ExtraData 成功解析: %+v", record.OrgId, record.ExtraData) } } else { log.Infof("[QueryAsync] org_id=%d ExtraDataRaw 为空", record.OrgId) } if cond.Digest == "" { log.Infof("[QueryAsync] org_id=%d digest 为空,直接返回记录", cond.Org_Id) return []*models.NewsDO{record}, 1, nil } // 客户端传了 digest,进行比较 log.Infof("[QueryAsync] org_id=%d 开始比较 digest,客户端='%s'", cond.Org_Id, cond.Digest) if record.ExtraData != nil { if val, ok := record.ExtraData["digest"]; ok { log.Infof("[QueryAsync] org_id=%d 数据库 digest='%v'", cond.Org_Id, val) if ds, ok := val.(string); ok && ds == cond.Digest { log.Infof("[QueryAsync] org_id=%d digest 相同,跳过返回", cond.Org_Id) return nil, 0, nil } } else { log.Infof("[QueryAsync] org_id=%d ExtraData 中无 digest 字段", cond.Org_Id) } } else { log.Infof("[QueryAsync] org_id=%d ExtraData 为空", cond.Org_Id) } log.Infof("[QueryAsync] org_id=%d digest 不同,返回记录", cond.Org_Id) return []*models.NewsDO{record}, 1, nil } func (dao *NewsDAO) QueryAsyncBatch(pairs []OrgDigestPair) (dos []*models.NewsDO, total int64, err error) { if len(pairs) == 0 { return []*models.NewsDO{}, 0, nil } // ① 提取 org_id 列表,同时建映射表 clientDigest[org_id] = digest orgIDs := make([]int64, 0, len(pairs)) clientDigest := make(map[int64]string, len(pairs)) for _, p := range pairs { orgIDs = append(orgIDs, p.OrgId) clientDigest[p.OrgId] = p.Digest } // ② 构造 IN (1,2,3...) 的 SQL placeholders := make([]string, 0, len(orgIDs)) for _, id := range orgIDs { placeholders = append(placeholders, fmt.Sprintf("%d", id)) } inClause := fmt.Sprintf("org_id IN (%s)", strings.Join(placeholders, ",")) var rows []*models.NewsDO _, err = dao.db. Model(&rows). Table(models.TableNameNews). Where(inClause). Query() if err != nil { return nil, 0, err } // ③ Go 层比对 digest(先解析 ExtraDataRaw) for _, row := range rows { // ✅ 解析 ExtraDataRaw → ExtraData if row.ExtraData == nil && row.ExtraDataRaw != "" { var extra map[string]interface{} if err := json.Unmarshal([]byte(row.ExtraDataRaw), &extra); err == nil { row.ExtraData = extra } } cDigest := clientDigest[row.OrgId] var sDigest string if d, ok := row.ExtraData["digest"].(string); ok { sDigest = d } if cDigest == "" || cDigest != sDigest { dos = append(dos, row) } } return dos, int64(len(dos)), nil } // func (dao *NewsDAO) QueryPullNew(cond *NewsPullNewCondition) (dos []*models.NewsDO, total int64, err error) { // e := dao.db.Model(&dos). // Table(models.TableNameNews). // Select( // models.NEWS_COLUMN_ID, // models.NEWS_COLUMN_ORG_ID, // models.NEWS_COLUMN_SPIDER_ID, // models.NEWS_COLUMN_PNAME_ID, // models.NEWS_COLUMN_TAG, // models.NEWS_COLUMN_CATEGORY, // models.NEWS_COLUMN_MAIN_TITLE, // models.NEWS_COLUMN_SUB_TITLE, // models.NEWS_COLUMN_SUMMARY, // models.NEWS_COLUMN_KEYWORDS, // models.NEWS_COLUMN_SEO_KEYWORDS, // models.NEWS_COLUMN_TAGS, // models.NEWS_COLUMN_URL, // models.NEWS_COLUMN_IMAGE_URL, // models.NEWS_COLUMN_LOGO_URL, // models.NEWS_COLUMN_MODEL_PARAMETER, // models.NEWS_COLUMN_CONTENT, // models.NEWS_COLUMN_IS_HOTSPOT, // models.NEWS_COLUMN_IS_OVERWRITTEN, // models.NEWS_COLUMN_IS_DELETED, // models.NEWS_COLUMN_IS_REPLICATE, // models.NEWS_COLUMN_STATE, // models.NEWS_COLUMN_CREATED_TIME, // models.NEWS_COLUMN_UPDATED_TIME, // models.NEWS_COLUMN_EXTRA_DATA, // ) // // 🔧 仅提取 org_id,忽略 digest // if len(cond.Pairs) > 0 { // orgIDs := make([]int64, 0, len(cond.Pairs)) // for _, p := range cond.Pairs { // orgIDs = append(orgIDs, p.OrgId) // } // e.Where(fmt.Sprintf("%s NOT IN (?)", models.NEWS_COLUMN_ORG_ID), orgIDs) // } // _, total, err = e.QueryEx() // if err != nil { // log.Errorf("QueryPullNew failed: %v", err) // return nil, 0, err // } // log.Infof(".................[DEBUG] QueryPullNew 返回数据: %+v", dos) // return // } func (dao *NewsDAO) QueryPullNew(cond *NewsPullNewCondition) (dos []*models.NewsDO, total int64, err error) { e := dao.db.Model(&dos). Table(models.TableNameNews). Select( models.NEWS_COLUMN_ID, models.NEWS_COLUMN_ORG_ID, models.NEWS_COLUMN_SPIDER_ID, models.NEWS_COLUMN_PNAME_ID, models.NEWS_COLUMN_TAG, models.NEWS_COLUMN_CATEGORY, models.NEWS_COLUMN_MAIN_TITLE, models.NEWS_COLUMN_SUB_TITLE, models.NEWS_COLUMN_SUMMARY, models.NEWS_COLUMN_KEYWORDS, models.NEWS_COLUMN_SEO_KEYWORDS, models.NEWS_COLUMN_TAGS, models.NEWS_COLUMN_URL, models.NEWS_COLUMN_IMAGE_URL, models.NEWS_COLUMN_LOGO_URL, models.NEWS_COLUMN_MODEL_PARAMETER, models.NEWS_COLUMN_CONTENT, models.NEWS_COLUMN_IS_HOTSPOT, models.NEWS_COLUMN_IS_OVERWRITTEN, models.NEWS_COLUMN_IS_DELETED, models.NEWS_COLUMN_IS_REPLICATE, models.NEWS_COLUMN_STATE, models.NEWS_COLUMN_CREATED_TIME, models.NEWS_COLUMN_UPDATED_TIME, models.NEWS_COLUMN_EXTRA_DATA, ) if cond != nil && len(cond.Pairs) > 0 { orgIDs := make([]string, 0, len(cond.Pairs)) for _, p := range cond.Pairs { orgIDs = append(orgIDs, fmt.Sprintf("%d", p.OrgId)) } notInClause := fmt.Sprintf("org_id NOT IN (%s)", strings.Join(orgIDs, ",")) e = e.Where(notInClause) } _, total, err = e.QueryEx() if err != nil { log.Errorf("QueryPullNew failed: %v", err) return nil, 0, err } log.Infof(".................[DEBUG] QueryPullNew 返回数据: %+v", dos) return }