This commit is contained in:
hailin 2024-10-25 15:18:10 +08:00
parent 449dd04ee5
commit b7829fd879
3 changed files with 79 additions and 102 deletions

Binary file not shown.

78
app.py
View File

@ -1,4 +1,24 @@
from flask import Flask, render_template, request, redirect, url_for from flask import Flask, render_template, request, redirect, url_for
import uuid
import logging
import json
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import ApiError, TransportError
from dataclasses import dataclass, asdict, field
from typing import List, Dict, Any, Optional
# 配置日志级别、格式和输出位置
logging.basicConfig(
level=logging.INFO, # 设置日志级别为 INFO
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("app.log"), # 将日志记录到文件
logging.StreamHandler() # 同时输出到控制台
]
)
app = Flask(__name__) app = Flask(__name__)
@ -7,6 +27,61 @@ customer_data = []
buf = {} buf = {}
key = "ZbIF9jKqzDyChwiB" key = "ZbIF9jKqzDyChwiB"
es = Elasticsearch(
["https://es.jellydropsllc.com"],
verify_certs=True,
ca_certs='/etc/ssl/certs/ISRG_Root_X1.pem' # 使用 ISRG Root X1 验证
)
def store_to_es(key, data: Dict[str, Any], index: str) -> bool:
aiemployee_id = data[key].get('aiemployee')
if not aiemployee_id:
logging.info(f".....................保存时 Data must contain an 'aiemployee' key.")
return False
try:
# index方法将数据存储到指定的索引中使用aiemployee作为文档ID
response = es.index(index=index, id=aiemployee_id, body=data[key], refresh=True)
logging.info(f"..........................The document was successfully stored/updated in ES with ID:{response}")
return True
except Exception as e:
logging.info(f".............保存到ES出错...............{e}..")
#print(e)
return False
def search_es(key: str, index: str) -> Optional[Dict]:
# 使用term查询来根据文档的_id搜索
query = {
"query": {
"term": { "_id": key }
}
}
try:
response = es.search(index=index, body=query)
except Exception as e:
logging.error(f"Error searching ES: {e}")
return None
hits = response.get('hits', {}).get('hits', [])
if hits:
return hits[0]['_source'] # 返回找到的第一个文档的内容
return None
def load_indices_from_es(key: str, index: str, buf: Dict[str, Any]) -> Any:
# 在Elasticsearch的指定索引中搜索
result = search_es(key, index)
if result:
# 更新 buf 字典
buf[key] = result
return result
else:
return None
@app.route('/') @app.route('/')
def index(): def index():
@ -60,6 +135,9 @@ def update_customers(index):
email['category'] = updated_data.get(f'{domain}_category_{j}', [email['category']])[0] email['category'] = updated_data.get(f'{domain}_category_{j}', [email['category']])[0]
email['promotion history'] = updated_data.get(f'{domain}_promotion_history_{j}', [', '.join(email['promotion history'])])[0].split(', ') email['promotion history'] = updated_data.get(f'{domain}_promotion_history_{j}', [', '.join(email['promotion history'])])[0].split(', ')
# 将更新后的数据保存到 ES 数据库
store_to_es(key, buf, 'customer')
return redirect(url_for('view_customer', index=index)) return redirect(url_for('view_customer', index=index))
def set_customer_data(data): def set_customer_data(data):

103
caller.py
View File

@ -1,110 +1,9 @@
import app import app
from app import key, buf from app import key, buf, load_indices_from_es
import json
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import ApiError, TransportError
from dataclasses import dataclass, asdict, field
from typing import List, Dict, Any, Optional
import uuid
import logging
from datetime import datetime, timedelta from datetime import datetime, timedelta
import threading import threading
from datetime import datetime, timezone from datetime import datetime, timezone
es = Elasticsearch(
["https://es.jellydropsllc.com"],
verify_certs=True,
ca_certs='/etc/ssl/certs/ISRG_Root_X1.pem' # 使用 ISRG Root X1 验证
)
def store_to_es(key, data: Dict[str, Any], index: str) -> bool:
aiemployee_id = data[key].get('aiemployee')
if not aiemployee_id:
logging.info(f".....................保存时 Data must contain an 'aiemployee' key.")
return False
try:
# index方法将数据存储到指定的索引中使用aiemployee作为文档ID
response = es.index(index=index, id=aiemployee_id, body=data[key], refresh=True)
logging.info(f"..........................The document was successfully stored/updated in ES with ID:{response}")
return True
except Exception as e:
logging.info(f".............保存到ES出错...............{e}..")
#print(e)
return False
def search_es(key: str, index: str) -> Optional[Dict]:
# 使用term查询来根据文档的_id搜索
query = {
"query": {
"term": { "_id": key }
}
}
try:
response = es.search(index=index, body=query)
except Exception as e:
logging.error(f"Error searching ES: {e}")
return None
hits = response.get('hits', {}).get('hits', [])
if hits:
return hits[0]['_source'] # 返回找到的第一个文档的内容
return None
def load_indices_from_es(key: str, index: str, buf: Dict[str, Any]) -> Any:
# 在Elasticsearch的指定索引中搜索
result = search_es(key, index)
if result:
# 更新 buf 字典
buf[key] = result
return result
else:
return None
# 示例客户数据,包含多个 email_addresses
# customer_data = [
# {
# "jellydropsllc.com": {
# "email_addresses": [
# {
# "owner_name": "John Doe",
# "email_address": "john@jellydropsllc.com",
# "category": "lead",
# "promotion_history": ["2024-09-15T09:45:30.123456"]
# },
# {
# "owner_name": "Jane Doe",
# "email_address": "jane@jellydropsllc.com",
# "category": "contact",
# "promotion_history": ["2024-09-16T10:30:15.654321"]
# }
# ],
# "customer_industry": "legal industry"
# }
# },
# {
# "singhahluwalia.com": {
# "email_addresses": [
# {
# "owner_name": "Alice Smith",
# "email_address": "alice@singhahluwalia.com",
# "category": "cold lead",
# "promotion_history": [
# "2024-10-04T15:31:03.385288"
# ]
# }
# ],
# "customer_industry": "legal industry"
# }
# }
# ]
# 传递数据并启动 Flask 应用 # 传递数据并启动 Flask 应用
# store_to_es(key, buf, 'customer') # store_to_es(key, buf, 'customer')