From b7829fd87942e4119530c918f0fd6efc71e0b4f6 Mon Sep 17 00:00:00 2001 From: hailin Date: Fri, 25 Oct 2024 15:18:10 +0800 Subject: [PATCH] . --- __pycache__/app.cpython-39.pyc | Bin 2226 -> 2343 bytes app.py | 78 +++++++++++++++++++++++++ caller.py | 103 +-------------------------------- 3 files changed, 79 insertions(+), 102 deletions(-) diff --git a/__pycache__/app.cpython-39.pyc b/__pycache__/app.cpython-39.pyc index 094639c81cc3c4f0e387963fa4085874c9986751..db0e371354c9273b6dc4dec462e7869062e172bd 100644 GIT binary patch delta 667 zcmX|9O;6N77@l{gotAdHA86!jO*V^J11RBu2LmK3tBEms&=^hCYzkAbu-n3P0BO4M zu$)XV+04JTvddJee<*Ps5JqI2KatZ*yY(UEjG3 zZ~8Mcj|1cmG3;8tabz=c3`}GC0qTKU7${UJ1~UhW;~N_`8)DWvF#8a*0&@m>kri3# z0J$YreuAbd{iB6px9qOmzjt%v!FJNzUw*w8-0A;123vOF7M#c*F#q=XOWeg>A_&t` z`~iK$JpiPbo{}@hXi9bwpA%Y27#%}G?_tH1nzJ4>(W|nmqILCDOLb-}qV3)7xfC(H ziMupK0))E7%u%GcAq+N@LW)wF8qA`Hx|-FNFYw)KNoDqYv8dw(;ZjmMS7w87En50h z^3_4RCO_kclV;|u?!+SM_`JnD;bo-lhnXG-zY}Nr77s!pE2KSMR&K|Q+;gIVMlOPe zO;Uy;6u(OH9qgoXR;DJO>Wzh@5bcFNzZ%yOnyfJP3=jES)8!HQ9D{Y7r)< z^jdM5gC6Zk!J8fg|ArUYAE3WL=+SqRlLPbcJP-4JJj|z+udN^s0-xadeWN?#d2j^| z^R2B%iTAN0={ZOyol`Q0&|f$%I$U}SrC6)9w81kyjFvMOr}za|4&KP6!x8_&pQ zGyi-$|9cL?zK6T;!Ty4s@$dzm(HV9&>@`u_ni=G4L_2ee%3Z2C+$S%a_|loV#xbsR zhvX?yU~7Ama#x^?*|~K=x#_Zga8I30;*Q g>qM15t3(O?5(f5<=(ksp_Ao342H(NEUI+iuKQSkOQUCw| diff --git a/app.py b/app.py index 1f5a318..27f5bac 100644 --- a/app.py +++ b/app.py @@ -1,4 +1,24 @@ from flask import Flask, render_template, request, redirect, url_for +import uuid +import logging +import json +from elasticsearch import Elasticsearch +from elasticsearch.exceptions import ApiError, TransportError +from dataclasses import dataclass, asdict, field +from typing import List, Dict, Any, Optional + + +# 配置日志级别、格式和输出位置 +logging.basicConfig( + level=logging.INFO, # 设置日志级别为 INFO + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + handlers=[ + logging.FileHandler("app.log"), # 将日志记录到文件 + logging.StreamHandler() # 同时输出到控制台 + ] +) + + app = Flask(__name__) @@ -7,6 +27,61 @@ customer_data = [] buf = {} key = "ZbIF9jKqzDyChwiB" +es = Elasticsearch( + ["https://es.jellydropsllc.com"], + verify_certs=True, + ca_certs='/etc/ssl/certs/ISRG_Root_X1.pem' # 使用 ISRG Root X1 验证 +) + + +def store_to_es(key, data: Dict[str, Any], index: str) -> bool: + aiemployee_id = data[key].get('aiemployee') + if not aiemployee_id: + logging.info(f".....................保存时 Data must contain an 'aiemployee' key.") + return False + try: + # index方法将数据存储到指定的索引中,使用aiemployee作为文档ID + response = es.index(index=index, id=aiemployee_id, body=data[key], refresh=True) + + logging.info(f"..........................The document was successfully stored/updated in ES with ID:{response}") + return True + except Exception as e: + logging.info(f".............保存到ES出错!...............{e}..") + #print(e) + return False + + +def search_es(key: str, index: str) -> Optional[Dict]: + # 使用term查询来根据文档的_id搜索 + query = { + "query": { + "term": { "_id": key } + } + } + try: + response = es.search(index=index, body=query) + except Exception as e: + logging.error(f"Error searching ES: {e}") + return None + + hits = response.get('hits', {}).get('hits', []) + if hits: + return hits[0]['_source'] # 返回找到的第一个文档的内容 + return None + + +def load_indices_from_es(key: str, index: str, buf: Dict[str, Any]) -> Any: + # 在Elasticsearch的指定索引中搜索 + result = search_es(key, index) + if result: + # 更新 buf 字典 + buf[key] = result + return result + else: + return None + + + @app.route('/') def index(): @@ -60,6 +135,9 @@ def update_customers(index): email['category'] = updated_data.get(f'{domain}_category_{j}', [email['category']])[0] email['promotion history'] = updated_data.get(f'{domain}_promotion_history_{j}', [', '.join(email['promotion history'])])[0].split(', ') + # 将更新后的数据保存到 ES 数据库 + store_to_es(key, buf, 'customer') + return redirect(url_for('view_customer', index=index)) def set_customer_data(data): diff --git a/caller.py b/caller.py index 12f762d..3e7a402 100644 --- a/caller.py +++ b/caller.py @@ -1,110 +1,9 @@ import app -from app import key, buf -import json -from elasticsearch import Elasticsearch -from elasticsearch.exceptions import ApiError, TransportError -from dataclasses import dataclass, asdict, field -from typing import List, Dict, Any, Optional -import uuid -import logging +from app import key, buf, load_indices_from_es from datetime import datetime, timedelta import threading from datetime import datetime, timezone - -es = Elasticsearch( - ["https://es.jellydropsllc.com"], - verify_certs=True, - ca_certs='/etc/ssl/certs/ISRG_Root_X1.pem' # 使用 ISRG Root X1 验证 -) - - -def store_to_es(key, data: Dict[str, Any], index: str) -> bool: - aiemployee_id = data[key].get('aiemployee') - if not aiemployee_id: - logging.info(f".....................保存时 Data must contain an 'aiemployee' key.") - return False - try: - # index方法将数据存储到指定的索引中,使用aiemployee作为文档ID - response = es.index(index=index, id=aiemployee_id, body=data[key], refresh=True) - - logging.info(f"..........................The document was successfully stored/updated in ES with ID:{response}") - return True - except Exception as e: - logging.info(f".............保存到ES出错!...............{e}..") - #print(e) - return False - - -def search_es(key: str, index: str) -> Optional[Dict]: - # 使用term查询来根据文档的_id搜索 - query = { - "query": { - "term": { "_id": key } - } - } - try: - response = es.search(index=index, body=query) - except Exception as e: - logging.error(f"Error searching ES: {e}") - return None - - hits = response.get('hits', {}).get('hits', []) - if hits: - return hits[0]['_source'] # 返回找到的第一个文档的内容 - return None - - -def load_indices_from_es(key: str, index: str, buf: Dict[str, Any]) -> Any: - # 在Elasticsearch的指定索引中搜索 - result = search_es(key, index) - if result: - # 更新 buf 字典 - buf[key] = result - return result - else: - return None - - - -# 示例客户数据,包含多个 email_addresses -# customer_data = [ -# { -# "jellydropsllc.com": { -# "email_addresses": [ -# { -# "owner_name": "John Doe", -# "email_address": "john@jellydropsllc.com", -# "category": "lead", -# "promotion_history": ["2024-09-15T09:45:30.123456"] -# }, -# { -# "owner_name": "Jane Doe", -# "email_address": "jane@jellydropsllc.com", -# "category": "contact", -# "promotion_history": ["2024-09-16T10:30:15.654321"] -# } -# ], -# "customer_industry": "legal industry" -# } -# }, -# { -# "singhahluwalia.com": { -# "email_addresses": [ -# { -# "owner_name": "Alice Smith", -# "email_address": "alice@singhahluwalia.com", -# "category": "cold lead", -# "promotion_history": [ -# "2024-10-04T15:31:03.385288" -# ] -# } -# ], -# "customer_industry": "legal industry" -# } -# } -# ] - # 传递数据并启动 Flask 应用 # store_to_es(key, buf, 'customer')