diff --git a/__pycache__/app.cpython-39.pyc b/__pycache__/app.cpython-39.pyc index 094639c..db0e371 100644 Binary files a/__pycache__/app.cpython-39.pyc and b/__pycache__/app.cpython-39.pyc differ diff --git a/app.py b/app.py index 1f5a318..27f5bac 100644 --- a/app.py +++ b/app.py @@ -1,4 +1,24 @@ from flask import Flask, render_template, request, redirect, url_for +import uuid +import logging +import json +from elasticsearch import Elasticsearch +from elasticsearch.exceptions import ApiError, TransportError +from dataclasses import dataclass, asdict, field +from typing import List, Dict, Any, Optional + + +# 配置日志级别、格式和输出位置 +logging.basicConfig( + level=logging.INFO, # 设置日志级别为 INFO + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + handlers=[ + logging.FileHandler("app.log"), # 将日志记录到文件 + logging.StreamHandler() # 同时输出到控制台 + ] +) + + app = Flask(__name__) @@ -7,6 +27,61 @@ customer_data = [] buf = {} key = "ZbIF9jKqzDyChwiB" +es = Elasticsearch( + ["https://es.jellydropsllc.com"], + verify_certs=True, + ca_certs='/etc/ssl/certs/ISRG_Root_X1.pem' # 使用 ISRG Root X1 验证 +) + + +def store_to_es(key, data: Dict[str, Any], index: str) -> bool: + aiemployee_id = data[key].get('aiemployee') + if not aiemployee_id: + logging.info(f".....................保存时 Data must contain an 'aiemployee' key.") + return False + try: + # index方法将数据存储到指定的索引中,使用aiemployee作为文档ID + response = es.index(index=index, id=aiemployee_id, body=data[key], refresh=True) + + logging.info(f"..........................The document was successfully stored/updated in ES with ID:{response}") + return True + except Exception as e: + logging.info(f".............保存到ES出错!...............{e}..") + #print(e) + return False + + +def search_es(key: str, index: str) -> Optional[Dict]: + # 使用term查询来根据文档的_id搜索 + query = { + "query": { + "term": { "_id": key } + } + } + try: + response = es.search(index=index, body=query) + except Exception as e: + logging.error(f"Error searching ES: {e}") + return None + + hits = response.get('hits', {}).get('hits', []) + if hits: + return hits[0]['_source'] # 返回找到的第一个文档的内容 + return None + + +def load_indices_from_es(key: str, index: str, buf: Dict[str, Any]) -> Any: + # 在Elasticsearch的指定索引中搜索 + result = search_es(key, index) + if result: + # 更新 buf 字典 + buf[key] = result + return result + else: + return None + + + @app.route('/') def index(): @@ -60,6 +135,9 @@ def update_customers(index): email['category'] = updated_data.get(f'{domain}_category_{j}', [email['category']])[0] email['promotion history'] = updated_data.get(f'{domain}_promotion_history_{j}', [', '.join(email['promotion history'])])[0].split(', ') + # 将更新后的数据保存到 ES 数据库 + store_to_es(key, buf, 'customer') + return redirect(url_for('view_customer', index=index)) def set_customer_data(data): diff --git a/caller.py b/caller.py index 12f762d..3e7a402 100644 --- a/caller.py +++ b/caller.py @@ -1,110 +1,9 @@ import app -from app import key, buf -import json -from elasticsearch import Elasticsearch -from elasticsearch.exceptions import ApiError, TransportError -from dataclasses import dataclass, asdict, field -from typing import List, Dict, Any, Optional -import uuid -import logging +from app import key, buf, load_indices_from_es from datetime import datetime, timedelta import threading from datetime import datetime, timezone - -es = Elasticsearch( - ["https://es.jellydropsllc.com"], - verify_certs=True, - ca_certs='/etc/ssl/certs/ISRG_Root_X1.pem' # 使用 ISRG Root X1 验证 -) - - -def store_to_es(key, data: Dict[str, Any], index: str) -> bool: - aiemployee_id = data[key].get('aiemployee') - if not aiemployee_id: - logging.info(f".....................保存时 Data must contain an 'aiemployee' key.") - return False - try: - # index方法将数据存储到指定的索引中,使用aiemployee作为文档ID - response = es.index(index=index, id=aiemployee_id, body=data[key], refresh=True) - - logging.info(f"..........................The document was successfully stored/updated in ES with ID:{response}") - return True - except Exception as e: - logging.info(f".............保存到ES出错!...............{e}..") - #print(e) - return False - - -def search_es(key: str, index: str) -> Optional[Dict]: - # 使用term查询来根据文档的_id搜索 - query = { - "query": { - "term": { "_id": key } - } - } - try: - response = es.search(index=index, body=query) - except Exception as e: - logging.error(f"Error searching ES: {e}") - return None - - hits = response.get('hits', {}).get('hits', []) - if hits: - return hits[0]['_source'] # 返回找到的第一个文档的内容 - return None - - -def load_indices_from_es(key: str, index: str, buf: Dict[str, Any]) -> Any: - # 在Elasticsearch的指定索引中搜索 - result = search_es(key, index) - if result: - # 更新 buf 字典 - buf[key] = result - return result - else: - return None - - - -# 示例客户数据,包含多个 email_addresses -# customer_data = [ -# { -# "jellydropsllc.com": { -# "email_addresses": [ -# { -# "owner_name": "John Doe", -# "email_address": "john@jellydropsllc.com", -# "category": "lead", -# "promotion_history": ["2024-09-15T09:45:30.123456"] -# }, -# { -# "owner_name": "Jane Doe", -# "email_address": "jane@jellydropsllc.com", -# "category": "contact", -# "promotion_history": ["2024-09-16T10:30:15.654321"] -# } -# ], -# "customer_industry": "legal industry" -# } -# }, -# { -# "singhahluwalia.com": { -# "email_addresses": [ -# { -# "owner_name": "Alice Smith", -# "email_address": "alice@singhahluwalia.com", -# "category": "cold lead", -# "promotion_history": [ -# "2024-10-04T15:31:03.385288" -# ] -# } -# ], -# "customer_industry": "legal industry" -# } -# } -# ] - # 传递数据并启动 Flask 应用 # store_to_es(key, buf, 'customer')