plugai_updsrv/pkg/dal/dao/news.go

509 lines
15 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package dao
import (
"encoding/json"
"errors"
"fmt"
"intent-system/pkg/dal/models"
"strings"
"github.com/civet148/log"
"github.com/civet148/sqlca/v2"
)
// 客户端带来的本地状态 (org_id, digest)
type OrgDigestPair struct {
OrgId int64
Digest string
}
type NewsAsyncCondition struct {
Org_Id int64
Digest string
}
// type NewsPullNewCondition struct {
// OrgIDs []int64 `json:"org_ids"`
// }
type NewsPullNewCondition struct {
Pairs []OrgDigestPair
}
const (
NewsState_NotPublish = 0 //未发布
NewsState_Published = 1 //已发布到订阅列表
NewsState_Pushed = 2 //已推送订阅
)
type NewsCondition struct {
PageNo int
PageSize int
Id int64
Tag string
All bool
IsDeleted bool
ContainExtra bool
Asc bool
Search string
Language string
}
type NewsDAO struct {
db *sqlca.Engine
}
func NewNewsDAO(db *sqlca.Engine) *NewsDAO {
return &NewsDAO{
db: db,
}
}
// insert into table by data model
func (dao *NewsDAO) Insert(do *models.NewsDO) (lastInsertId int64, err error) {
return dao.db.Model(&do).Table(models.TableNameNews).Insert()
}
// insert if not exist or update columns on duplicate key...
func (dao *NewsDAO) Upsert(do *models.NewsDO, columns ...string) (lastInsertId int64, err error) {
return dao.db.Model(&do).Table(models.TableNameNews).Select(columns...).Upsert()
}
// update table set columns where id=xxx
func (dao *NewsDAO) Update(do *models.NewsDO, columns ...string) (rows int64, err error) {
return dao.db.Model(&do).Table(models.TableNameNews).Select(columns...).Update()
}
func (dao *NewsDAO) UpdateByOrgId(do *models.NewsDO, columns ...string) (rows int64, err error) {
return dao.db.Model(&do).Table(models.TableNameNews).Eq(models.NEWS_COLUMN_ORG_ID, do.OrgId).Select(columns...).Update()
}
// query records by id
func (dao *NewsDAO) QueryById(id int64, columns ...string) (do *models.NewsDO, err error) {
if _, err = dao.db.Model(&do).Table(models.TableNameNews).Id(id).Select(columns...).Query(); err != nil {
return nil, log.Errorf(err.Error())
}
return
}
// query records by url
func (dao *NewsDAO) QueryByUrl(url string, columns ...string) (do *models.NewsDO, err error) {
if _, err = dao.db.Model(&do).
Table(models.TableNameNews).
Where("url = ?", url).
Select(columns...).
Query(); err != nil {
return nil, log.Errorf(err.Error())
}
return
}
func (dao *NewsDAO) QueryAllByUrl(fullPath string) ([]*models.NewsDO, error) {
var list []*models.NewsDO
_, err := dao.db.Model(&list).
Table(models.TableNameNews).
Where("url = ?", fullPath). // ✅ 参数绑定,不拼接
Query()
return list, err
}
func (dao *NewsDAO) QueryOriginalNews(orgId int64, lang string, columns ...string) (do *models.NewsDO, err error) {
if _, err = dao.db.Model(&do).
Table(models.TableNameNews).
Eq(models.NEWS_COLUMN_ORG_ID, orgId).
Eq(models.NEWS_COLUMN_LANGUAGE, lang).
Eq(models.NEWS_COLUMN_IS_REPLICATE, 0).
Eq(models.NEWS_COLUMN_IS_OVERWRITTEN, 1).
Select(columns...).
Query(); err != nil {
return nil, log.Errorf(err.Error())
}
return
}
// query records by conditions
func (dao *NewsDAO) QueryByCondition(conditions map[string]interface{}, columns ...string) (dos []*models.NewsDO, err error) {
if len(conditions) == 0 {
return nil, fmt.Errorf("condition must not be empty")
}
e := dao.db.Model(&dos).Table(models.TableNameNews).Select(columns...)
for k, v := range conditions {
e.Eq(k, v)
}
if _, err = e.Query(); err != nil {
return nil, err
}
return
}
// query max news id
func (dao *NewsDAO) QueryMaxSyncId(lang models.LanguageType) (lastId int64, err error) {
if _, err = dao.db.Model(&lastId).
Table(models.TableNameNews).
Max(models.NEWS_COLUMN_ORG_ID).
Eq(models.NEWS_COLUMN_LANGUAGE, lang).
Query(); err != nil {
return 0, err
}
return
}
func (dao *NewsDAO) QueryNotPushed(pageNo, pageSize int) (dos []*models.NewsDO, total int64, err error) {
e := dao.db.Model(&dos).
Table(models.TableNameNews).
Eq(models.NEWS_COLUMN_IS_DELETED, 0).
Eq(models.NEWS_COLUMN_STATE, NewsState_NotPublish).
Page(pageNo, pageSize).
Desc(models.QUESTION_ANSWER_COLUMN_UPDATED_TIME)
_, total, err = e.QueryEx()
if err != nil {
return nil, 0, log.Errorf(err.Error())
}
return
}
func (dao *NewsDAO) QueryList(cond *NewsCondition) (dos []*models.NewsDO, total int64, err error) {
e := dao.db.Model(&dos).
Table(models.TableNameNews).
Select(
models.NEWS_COLUMN_ID,
models.NEWS_COLUMN_ORG_ID,
models.NEWS_COLUMN_SPIDER_ID,
models.NEWS_COLUMN_PNAME_ID,
models.NEWS_COLUMN_TAG,
models.NEWS_COLUMN_CATEGORY,
models.NEWS_COLUMN_MAIN_TITLE,
models.NEWS_COLUMN_SUB_TITLE,
models.NEWS_COLUMN_SUMMARY,
models.NEWS_COLUMN_KEYWORDS,
models.NEWS_COLUMN_SEO_KEYWORDS,
models.NEWS_COLUMN_TAGS,
models.NEWS_COLUMN_URL,
models.NEWS_COLUMN_IMAGE_URL,
models.NEWS_COLUMN_LOGO_URL,
models.NEWS_COLUMN_MODEL_PARAMETER,
models.NEWS_COLUMN_CONTENT,
models.NEWS_COLUMN_IS_HOTSPOT,
models.NEWS_COLUMN_IS_OVERWRITTEN,
models.NEWS_COLUMN_IS_DELETED,
models.NEWS_COLUMN_IS_REPLICATE,
models.NEWS_COLUMN_STATE,
models.NEWS_COLUMN_CREATED_TIME,
models.NEWS_COLUMN_UPDATED_TIME,
).
Page(cond.PageNo, cond.PageSize)
if cond.Id != 0 {
e.Eq(models.NEWS_COLUMN_ID, cond.Id)
}
if cond.Tag != "" {
e.JsonContainArray(models.NEWS_COLUMN_TAGS, cond.Tag)
}
if !cond.All {
e.Eq(models.NEWS_COLUMN_IS_OVERWRITTEN, 0)
}
if cond.ContainExtra {
e.Select(models.NEWS_COLUMN_EXTRA_DATA)
}
if cond.IsDeleted {
e.Eq(models.NEWS_COLUMN_IS_DELETED, 1)
} else {
e.Eq(models.NEWS_COLUMN_IS_DELETED, 0)
}
if cond.Language != "" {
e.Eq(models.NEWS_COLUMN_LANGUAGE, cond.Language)
}
if cond.Search != "" {
e.Like(models.NEWS_COLUMN_MAIN_TITLE, cond.Search)
}
if cond.Asc {
e.Asc(models.NEWS_COLUMN_UPDATED_TIME, models.NEWS_COLUMN_ID)
} else {
e.Desc(models.NEWS_COLUMN_UPDATED_TIME, models.NEWS_COLUMN_ID)
}
_, total, err = e.QueryEx()
if err != nil {
return nil, 0, log.Errorf(err.Error())
}
return
}
// func (dao *NewsDAO) QueryAsync(cond *NewsAsyncCondition) (dos []*models.NewsDO, total int64, err error) {
// log.Infof("[QueryAsync] >>> 调用开始org_id=%d, digest='%s'", cond.Org_Id, cond.Digest)
// if cond.Org_Id == 0 {
// log.Warnf("[QueryAsync] org_id 为空,非法请求")
// return nil, 0, errors.New("org_id is required")
// }
// e := dao.db.Model(&dos).
// Table(models.TableNameNews).
// Select(
// models.NEWS_COLUMN_ID,
// models.NEWS_COLUMN_ORG_ID,
// models.NEWS_COLUMN_SPIDER_ID,
// models.NEWS_COLUMN_PNAME_ID,
// models.NEWS_COLUMN_TAG,
// models.NEWS_COLUMN_CATEGORY,
// models.NEWS_COLUMN_MAIN_TITLE,
// models.NEWS_COLUMN_SUB_TITLE,
// models.NEWS_COLUMN_SUMMARY,
// models.NEWS_COLUMN_KEYWORDS,
// models.NEWS_COLUMN_SEO_KEYWORDS,
// models.NEWS_COLUMN_TAGS,
// models.NEWS_COLUMN_URL,
// models.NEWS_COLUMN_IMAGE_URL,
// models.NEWS_COLUMN_LOGO_URL,
// models.NEWS_COLUMN_MODEL_PARAMETER,
// models.NEWS_COLUMN_CONTENT,
// models.NEWS_COLUMN_IS_HOTSPOT,
// models.NEWS_COLUMN_IS_OVERWRITTEN,
// models.NEWS_COLUMN_IS_DELETED,
// models.NEWS_COLUMN_IS_REPLICATE,
// models.NEWS_COLUMN_STATE,
// models.NEWS_COLUMN_CREATED_TIME,
// models.NEWS_COLUMN_UPDATED_TIME,
// models.NEWS_COLUMN_EXTRA_DATA,
// ).
// Eq(models.NEWS_COLUMN_ORG_ID, cond.Org_Id)
// _, total, err = e.QueryEx()
// if err != nil {
// log.Errorf("[QueryAsync] 查询失败: %v", err)
// return nil, 0, err
// }
// if total == 0 {
// log.Warnf("[QueryAsync] org_id=%d 无匹配记录", cond.Org_Id)
// return nil, 0, errors.New("未找到对应 org_id 的记录")
// }
// if total > 1 {
// log.Errorf("[QueryAsync] org_id=%d 数据异常,记录不唯一", cond.Org_Id)
// return nil, 0, errors.New("org_id 不唯一,数据异常")
// }
// record := dos[0]
// if cond.Digest == "" {
// log.Infof("[QueryAsync] org_id=%d digest 为空,直接返回记录", cond.Org_Id)
// return []*models.NewsDO{record}, 1, nil
// }
// // 客户端传了 digest进行比较
// log.Infof("[QueryAsync] org_id=%d 开始比较 digest客户端='%s'", cond.Org_Id, cond.Digest)
// if record.ExtraData != nil {
// if val, ok := record.ExtraData["digest"]; ok {
// log.Infof("[QueryAsync] org_id=%d 数据库 digest='%v'", cond.Org_Id, val)
// if ds, ok := val.(string); ok && ds == cond.Digest {
// log.Infof("[QueryAsync] org_id=%d digest 相同,跳过返回", cond.Org_Id)
// return nil, 0, nil
// }
// } else {
// log.Infof("[QueryAsync] org_id=%d ExtraData 中无 digest 字段", cond.Org_Id)
// }
// } else {
// log.Infof("[QueryAsync] org_id=%d ExtraData 为空", cond.Org_Id)
// }
// log.Infof("[QueryAsync] org_id=%d digest 不同,返回记录", cond.Org_Id)
// return []*models.NewsDO{record}, 1, nil
// }
func (dao *NewsDAO) QueryAsync(cond *NewsAsyncCondition) (dos []*models.NewsDO, total int64, err error) {
//log.Infof("[QueryAsync] >>> 调用开始org_id=%d, digest='%s'", cond.Org_Id, cond.Digest)
if cond.Org_Id == 0 {
log.Warnf("[QueryAsync] org_id 为空,非法请求")
return nil, 0, errors.New("org_id is required")
}
e := dao.db.Model(&dos).
Table(models.TableNameNews).
Select(
models.NEWS_COLUMN_ID,
models.NEWS_COLUMN_ORG_ID,
models.NEWS_COLUMN_SPIDER_ID,
models.NEWS_COLUMN_PNAME_ID,
models.NEWS_COLUMN_TAG,
models.NEWS_COLUMN_CATEGORY,
models.NEWS_COLUMN_MAIN_TITLE,
models.NEWS_COLUMN_SUB_TITLE,
models.NEWS_COLUMN_SUMMARY,
models.NEWS_COLUMN_KEYWORDS,
models.NEWS_COLUMN_SEO_KEYWORDS,
models.NEWS_COLUMN_TAGS,
models.NEWS_COLUMN_URL,
models.NEWS_COLUMN_IMAGE_URL,
models.NEWS_COLUMN_LOGO_URL,
models.NEWS_COLUMN_MODEL_PARAMETER,
models.NEWS_COLUMN_CONTENT,
models.NEWS_COLUMN_IS_HOTSPOT,
models.NEWS_COLUMN_IS_OVERWRITTEN,
models.NEWS_COLUMN_IS_DELETED,
models.NEWS_COLUMN_IS_REPLICATE,
models.NEWS_COLUMN_STATE,
models.NEWS_COLUMN_CREATED_TIME,
models.NEWS_COLUMN_UPDATED_TIME,
models.NEWS_COLUMN_EXTRA_DATA,
).
Eq(models.NEWS_COLUMN_ORG_ID, cond.Org_Id)
_, total, err = e.QueryEx()
if err != nil {
log.Errorf("[QueryAsync] 查询失败: %v", err)
return nil, 0, err
}
if total == 0 {
log.Warnf("[QueryAsync] org_id=%d 无匹配记录", cond.Org_Id)
return nil, 0, errors.New("未找到对应 org_id 的记录")
}
if total > 1 {
log.Errorf("[QueryAsync] org_id=%d 数据异常,记录不唯一", cond.Org_Id)
return nil, 0, errors.New("org_id 不唯一,数据异常")
}
record := dos[0]
// ✅ 解析 ExtraDataRaw → ExtraData
if record.ExtraDataRaw != "" {
if err := json.Unmarshal([]byte(record.ExtraDataRaw), &record.ExtraData); err != nil {
log.Warnf("[QueryAsync] org_id=%d ExtraData 解析失败: %v", record.OrgId, err)
} else {
log.Infof("[QueryAsync] org_id=%d ExtraData 成功解析: %+v", record.OrgId, record.ExtraData)
}
} else {
log.Infof("[QueryAsync] org_id=%d ExtraDataRaw 为空", record.OrgId)
}
if cond.Digest == "" {
log.Infof("[QueryAsync] org_id=%d digest 为空,直接返回记录", cond.Org_Id)
return []*models.NewsDO{record}, 1, nil
}
// 客户端传了 digest进行比较
log.Infof("[QueryAsync] org_id=%d 开始比较 digest客户端='%s'", cond.Org_Id, cond.Digest)
if record.ExtraData != nil {
if val, ok := record.ExtraData["digest"]; ok {
log.Infof("[QueryAsync] org_id=%d 数据库 digest='%v'", cond.Org_Id, val)
if ds, ok := val.(string); ok && ds == cond.Digest {
log.Infof("[QueryAsync] org_id=%d digest 相同,跳过返回", cond.Org_Id)
return nil, 0, nil
}
} else {
log.Infof("[QueryAsync] org_id=%d ExtraData 中无 digest 字段", cond.Org_Id)
}
} else {
log.Infof("[QueryAsync] org_id=%d ExtraData 为空", cond.Org_Id)
}
log.Infof("[QueryAsync] org_id=%d digest 不同,返回记录", cond.Org_Id)
return []*models.NewsDO{record}, 1, nil
}
func (dao *NewsDAO) QueryAsyncBatch(pairs []OrgDigestPair) (dos []*models.NewsDO, total int64, err error) {
if len(pairs) == 0 {
return []*models.NewsDO{}, 0, nil
}
// ① 提取 org_id 列表,同时建映射表 clientDigest[org_id] = digest
orgIDs := make([]int64, 0, len(pairs))
clientDigest := make(map[int64]string, len(pairs))
for _, p := range pairs {
orgIDs = append(orgIDs, p.OrgId)
clientDigest[p.OrgId] = p.Digest
}
// ② 构造 IN (1,2,3...) 的 SQL
placeholders := make([]string, 0, len(orgIDs))
for _, id := range orgIDs {
placeholders = append(placeholders, fmt.Sprintf("%d", id))
}
inClause := fmt.Sprintf("org_id IN (%s)", strings.Join(placeholders, ","))
var rows []*models.NewsDO
_, err = dao.db.
Model(&rows).
Table(models.TableNameNews).
Where(inClause).
Query()
if err != nil {
return nil, 0, err
}
// ③ Go 层比对 digest先解析 ExtraDataRaw
for _, row := range rows {
// ✅ 解析 ExtraDataRaw → ExtraData
if row.ExtraData == nil && row.ExtraDataRaw != "" {
var extra map[string]interface{}
if err := json.Unmarshal([]byte(row.ExtraDataRaw), &extra); err == nil {
row.ExtraData = extra
}
}
cDigest := clientDigest[row.OrgId]
var sDigest string
if d, ok := row.ExtraData["digest"].(string); ok {
sDigest = d
}
if cDigest == "" || cDigest != sDigest {
dos = append(dos, row)
}
}
return dos, int64(len(dos)), nil
}
func (dao *NewsDAO) QueryPullNew(cond *NewsPullNewCondition) (dos []*models.NewsDO, total int64, err error) {
e := dao.db.Model(&dos).
Table(models.TableNameNews).
Select(
models.NEWS_COLUMN_ID,
models.NEWS_COLUMN_ORG_ID,
models.NEWS_COLUMN_SPIDER_ID,
models.NEWS_COLUMN_PNAME_ID,
models.NEWS_COLUMN_TAG,
models.NEWS_COLUMN_CATEGORY,
models.NEWS_COLUMN_MAIN_TITLE,
models.NEWS_COLUMN_SUB_TITLE,
models.NEWS_COLUMN_SUMMARY,
models.NEWS_COLUMN_KEYWORDS,
models.NEWS_COLUMN_SEO_KEYWORDS,
models.NEWS_COLUMN_TAGS,
models.NEWS_COLUMN_URL,
models.NEWS_COLUMN_IMAGE_URL,
models.NEWS_COLUMN_LOGO_URL,
models.NEWS_COLUMN_MODEL_PARAMETER,
models.NEWS_COLUMN_CONTENT,
models.NEWS_COLUMN_IS_HOTSPOT,
models.NEWS_COLUMN_IS_OVERWRITTEN,
models.NEWS_COLUMN_IS_DELETED,
models.NEWS_COLUMN_IS_REPLICATE,
models.NEWS_COLUMN_STATE,
models.NEWS_COLUMN_CREATED_TIME,
models.NEWS_COLUMN_UPDATED_TIME,
models.NEWS_COLUMN_EXTRA_DATA,
)
// 🔧 仅提取 org_id忽略 digest
if len(cond.Pairs) > 0 {
orgIDs := make([]int64, 0, len(cond.Pairs))
for _, p := range cond.Pairs {
orgIDs = append(orgIDs, p.OrgId)
}
e.Where(fmt.Sprintf("%s NOT IN (?)", models.NEWS_COLUMN_ORG_ID), orgIDs)
}
_, total, err = e.QueryEx()
if err != nil {
log.Errorf("QueryPullNew failed: %v", err)
return nil, 0, err
}
log.Infof(".................[DEBUG] QueryPullNew 返回数据: %+v", dos)
return
}