119 lines
3.5 KiB
Python
119 lines
3.5 KiB
Python
import app
|
||
import json
|
||
from elasticsearch import Elasticsearch
|
||
from elasticsearch.exceptions import ApiError, TransportError
|
||
from dataclasses import dataclass, asdict, field
|
||
from typing import List, Dict, Any, Optional
|
||
import uuid
|
||
import logging
|
||
from datetime import datetime, timedelta
|
||
import threading
|
||
from datetime import datetime, timezone
|
||
|
||
|
||
es = Elasticsearch(
|
||
["https://es.jellydropsllc.com"],
|
||
verify_certs=True,
|
||
ca_certs='/etc/ssl/certs/ISRG_Root_X1.pem' # 使用 ISRG Root X1 验证
|
||
)
|
||
|
||
|
||
def store_to_es(key, data: Dict[str, Any], index: str) -> bool:
|
||
aiemployee_id = data[key].get('aiemployee')
|
||
if not aiemployee_id:
|
||
logging.info(f".....................保存时 Data must contain an 'aiemployee' key.")
|
||
return False
|
||
try:
|
||
# index方法将数据存储到指定的索引中,使用aiemployee作为文档ID
|
||
response = es.index(index=index, id=aiemployee_id, body=data[key], refresh=True)
|
||
|
||
logging.info(f"..........................The document was successfully stored/updated in ES with ID:{response}")
|
||
return True
|
||
except Exception as e:
|
||
logging.info(f".............保存到ES出错!...............{e}..")
|
||
#print(e)
|
||
return False
|
||
|
||
|
||
def search_es(key: str, index: str) -> Optional[Dict]:
|
||
# 使用term查询来根据文档的_id搜索
|
||
query = {
|
||
"query": {
|
||
"term": { "_id": key }
|
||
}
|
||
}
|
||
try:
|
||
response = es.search(index=index, body=query)
|
||
except Exception as e:
|
||
logging.error(f"Error searching ES: {e}")
|
||
return None
|
||
|
||
hits = response.get('hits', {}).get('hits', [])
|
||
if hits:
|
||
return hits[0]['_source'] # 返回找到的第一个文档的内容
|
||
return None
|
||
|
||
|
||
def load_indices_from_es(key: str, index: str, buf: Dict[str, Any]) -> Any:
|
||
# 在Elasticsearch的指定索引中搜索
|
||
result = search_es(key, index)
|
||
if result:
|
||
# 更新 buf 字典
|
||
buf[key] = result
|
||
return result
|
||
else:
|
||
return None
|
||
|
||
|
||
|
||
# 示例客户数据,包含多个 email_addresses
|
||
# customer_data = [
|
||
# {
|
||
# "jellydropsllc.com": {
|
||
# "email_addresses": [
|
||
# {
|
||
# "owner_name": "John Doe",
|
||
# "email_address": "john@jellydropsllc.com",
|
||
# "category": "lead",
|
||
# "promotion_history": ["2024-09-15T09:45:30.123456"]
|
||
# },
|
||
# {
|
||
# "owner_name": "Jane Doe",
|
||
# "email_address": "jane@jellydropsllc.com",
|
||
# "category": "contact",
|
||
# "promotion_history": ["2024-09-16T10:30:15.654321"]
|
||
# }
|
||
# ],
|
||
# "customer_industry": "legal industry"
|
||
# }
|
||
# },
|
||
# {
|
||
# "singhahluwalia.com": {
|
||
# "email_addresses": [
|
||
# {
|
||
# "owner_name": "Alice Smith",
|
||
# "email_address": "alice@singhahluwalia.com",
|
||
# "category": "cold lead",
|
||
# "promotion_history": [
|
||
# "2024-10-04T15:31:03.385288"
|
||
# ]
|
||
# }
|
||
# ],
|
||
# "customer_industry": "legal industry"
|
||
# }
|
||
# }
|
||
# ]
|
||
|
||
# 传递数据并启动 Flask 应用
|
||
# store_to_es(key, buf, 'customer')
|
||
|
||
buf = {}
|
||
key = "ZbIF9jKqzDyChwiB"
|
||
ret = load_indices_from_es(key, 'customer', buf)
|
||
if not ret:
|
||
exit("客服AI员工不能拥有数据库。")
|
||
|
||
customer_data=buf[key]['customer']
|
||
app.set_customer_data(customer_data)
|
||
app.app.run()
|