import app import json from elasticsearch import Elasticsearch from elasticsearch.exceptions import ApiError, TransportError from dataclasses import dataclass, asdict, field from typing import List, Dict, Any, Optional import uuid import logging from datetime import datetime, timedelta import threading from datetime import datetime, timezone es = Elasticsearch( ["https://es.jellydropsllc.com"], verify_certs=True, ca_certs='/etc/ssl/certs/ISRG_Root_X1.pem' # 使用 ISRG Root X1 验证 ) def store_to_es(key, data: Dict[str, Any], index: str) -> bool: aiemployee_id = data[key].get('aiemployee') if not aiemployee_id: logging.info(f".....................保存时 Data must contain an 'aiemployee' key.") return False try: # index方法将数据存储到指定的索引中,使用aiemployee作为文档ID response = es.index(index=index, id=aiemployee_id, body=data[key], refresh=True) logging.info(f"..........................The document was successfully stored/updated in ES with ID:{response}") return True except Exception as e: logging.info(f".............保存到ES出错!...............{e}..") #print(e) return False def search_es(key: str, index: str) -> Optional[Dict]: # 使用term查询来根据文档的_id搜索 query = { "query": { "term": { "_id": key } } } try: response = es.search(index=index, body=query) except Exception as e: logging.error(f"Error searching ES: {e}") return None hits = response.get('hits', {}).get('hits', []) if hits: return hits[0]['_source'] # 返回找到的第一个文档的内容 return None def load_indices_from_es(key: str, index: str, buf: Dict[str, Any]) -> Any: # 在Elasticsearch的指定索引中搜索 result = search_es(key, index) if result: # 更新 buf 字典 buf[key] = result return result else: return None # 示例客户数据,包含多个 email_addresses # customer_data = [ # { # "jellydropsllc.com": { # "email_addresses": [ # { # "owner_name": "John Doe", # "email_address": "john@jellydropsllc.com", # "category": "lead", # "promotion_history": ["2024-09-15T09:45:30.123456"] # }, # { # "owner_name": "Jane Doe", # "email_address": "jane@jellydropsllc.com", # "category": "contact", # "promotion_history": ["2024-09-16T10:30:15.654321"] # } # ], # "customer_industry": "legal industry" # } # }, # { # "singhahluwalia.com": { # "email_addresses": [ # { # "owner_name": "Alice Smith", # "email_address": "alice@singhahluwalia.com", # "category": "cold lead", # "promotion_history": [ # "2024-10-04T15:31:03.385288" # ] # } # ], # "customer_industry": "legal industry" # } # } # ] # 传递数据并启动 Flask 应用 # store_to_es(key, buf, 'customer') buf = {} key = "ZbIF9jKqzDyChwiB" ret = load_indices_from_es(key, 'customer', buf) if not ret: exit("客服AI员工不能拥有数据库。") customer_data=buf[key]['customer database'] app.set_customer_data(customer_data) app.app.run(port=5001)