cm/caller.py

120 lines
3.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import app
import json
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import ApiError, TransportError
from dataclasses import dataclass, asdict, field
from typing import List, Dict, Any, Optional
import uuid
import logging
from datetime import datetime, timedelta
import threading
from datetime import datetime, timezone
es = Elasticsearch(
["https://es.jellydropsllc.com"],
verify_certs=True,
ca_certs='/etc/ssl/certs/ISRG_Root_X1.pem' # 使用 ISRG Root X1 验证
)
def store_to_es(key, data: Dict[str, Any], index: str) -> bool:
aiemployee_id = data[key].get('aiemployee')
if not aiemployee_id:
logging.info(f".....................保存时 Data must contain an 'aiemployee' key.")
return False
try:
# index方法将数据存储到指定的索引中使用aiemployee作为文档ID
response = es.index(index=index, id=aiemployee_id, body=data[key], refresh=True)
logging.info(f"..........................The document was successfully stored/updated in ES with ID:{response}")
return True
except Exception as e:
logging.info(f".............保存到ES出错...............{e}..")
#print(e)
return False
def search_es(key: str, index: str) -> Optional[Dict]:
# 使用term查询来根据文档的_id搜索
query = {
"query": {
"term": { "_id": key }
}
}
try:
response = es.search(index=index, body=query)
except Exception as e:
logging.error(f"Error searching ES: {e}")
return None
hits = response.get('hits', {}).get('hits', [])
if hits:
return hits[0]['_source'] # 返回找到的第一个文档的内容
return None
def load_indices_from_es(key: str, index: str, buf: Dict[str, Any]) -> Any:
# 在Elasticsearch的指定索引中搜索
result = search_es(key, index)
if result:
# 更新 buf 字典
buf[key] = result
return result
else:
return None
# 示例客户数据,包含多个 email_addresses
# customer_data = [
# {
# "jellydropsllc.com": {
# "email_addresses": [
# {
# "owner_name": "John Doe",
# "email_address": "john@jellydropsllc.com",
# "category": "lead",
# "promotion_history": ["2024-09-15T09:45:30.123456"]
# },
# {
# "owner_name": "Jane Doe",
# "email_address": "jane@jellydropsllc.com",
# "category": "contact",
# "promotion_history": ["2024-09-16T10:30:15.654321"]
# }
# ],
# "customer_industry": "legal industry"
# }
# },
# {
# "singhahluwalia.com": {
# "email_addresses": [
# {
# "owner_name": "Alice Smith",
# "email_address": "alice@singhahluwalia.com",
# "category": "cold lead",
# "promotion_history": [
# "2024-10-04T15:31:03.385288"
# ]
# }
# ],
# "customer_industry": "legal industry"
# }
# }
# ]
# 传递数据并启动 Flask 应用
# store_to_es(key, buf, 'customer')
buf = {}
key = "ZbIF9jKqzDyChwiB"
ret = load_indices_from_es(key, 'customer', buf)
if not ret:
exit("客服AI员工不能拥有数据库。")
customer_data=buf[key]['customer database']
print(f"..........customer_data={customer_data}")
app.set_customer_data(customer_data)
app.app.run(host="0.0.0.0", port=5001)