cm/app.py

214 lines
7.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Flask, render_template, request, redirect, url_for
import uuid
import logging
import json
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import ApiError, TransportError
from dataclasses import dataclass, asdict, field
from typing import List, Dict, Any, Optional
# 配置日志级别、格式和输出位置
logging.basicConfig(
level=logging.INFO, # 设置日志级别为 INFO
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("app.log"), # 将日志记录到文件
logging.StreamHandler() # 同时输出到控制台
]
)
app = Flask(__name__)
# 全局变量,存储传递进来的嵌套字典数据
customer_data = []
buf = {}
key = "ZbIF9jKqzDyChwiB"
es = Elasticsearch(
["https://es.jellydropsllc.com"],
verify_certs=True,
ca_certs='/etc/ssl/certs/ISRG_Root_X1.pem' # 使用 ISRG Root X1 验证
)
def store_to_es(key, data: Dict[str, Any], index: str) -> bool:
aiemployee_id = data[key].get('aiemployee')
if not aiemployee_id:
logging.info(f".....................保存时 Data must contain an 'aiemployee' key.")
return False
try:
# index方法将数据存储到指定的索引中使用aiemployee作为文档ID
response = es.index(index=index, id=aiemployee_id, body=data[key], refresh=True)
logging.info(f"..........................The document was successfully stored/updated in ES with ID:{response}")
return True
except Exception as e:
logging.info(f".............保存到ES出错...............{e}..")
#print(e)
return False
def search_es(key: str, index: str) -> Optional[Dict]:
# 使用term查询来根据文档的_id搜索
query = {
"query": {
"term": { "_id": key }
}
}
try:
response = es.search(index=index, body=query)
except Exception as e:
logging.error(f"Error searching ES: {e}")
return None
hits = response.get('hits', {}).get('hits', [])
if hits:
return hits[0]['_source'] # 返回找到的第一个文档的内容
return None
def load_indices_from_es(key: str, index: str, buf: Dict[str, Any]) -> Any:
# 在Elasticsearch的指定索引中搜索
result = search_es(key, index)
if result:
# 更新 buf 字典
buf[key] = result
return result
else:
return None
@app.route('/')
def index():
# 默认展示第一个客户
return redirect(url_for('view_customer', index=0))
@app.route('/customer/<int:index>')
def view_customer(index):
# 判断是否有数据
if not customer_data:
return "No customer data provided.", 400
# 确保 index 在合法范围内
if index < 0 or index >= len(customer_data):
return "Index out of range.", 400
customer = customer_data[index]
# 为每个 email addresses 列表中的项添加唯一的索引
for domain, info in customer.items():
print(f"..........domain={domain}")
print(f"..........info={info}")
if 'email addresses' in info and isinstance(info['email addresses'], list):
for email_idx, email in enumerate(info['email addresses']):
email['idx'] = email_idx # 给每个 email 项添加唯一的索引
is_first = index == 0
is_last = index == len(customer_data) - 1
return render_template('edit_customers.html',
customer=customer,
index=index,
is_first=is_first,
is_last=is_last)
# @app.route('/customer/<int:index>')
# def view_customer(index):
# # 判断是否有数据
# if not customer_data:
# return "No customer data provided.", 400
# # 确保 index 在合法范围内
# if index < 0 or index >= len(customer_data):
# return "Index out of range.", 400
# customer = customer_data[index]
# # 为每个 email addresses 列表中的项添加唯一的索引
# for domain, info in customer.items():
# print(f"..........domain={domain}")
# print(f"..........info={info}")
# if 'email addresses' in info and isinstance(info['email addresses'], list):
# for email_idx, email in enumerate(info['email addresses']):
# email['idx'] = email_idx # 给每个 email 项添加唯一的索引
# is_first = index == 0
# is_last = index == len(customer_data) - 1
# return render_template('edit_customers.html',
# customer=customer,
# index=index,
# is_first=is_first,
# is_last=is_last)
@app.route('/update/<int:index>', methods=['POST'])
def update_customers(index):
updated_data = request.form.to_dict(flat=False)
# 更新嵌套字典数据
if 0 <= index < len(customer_data):
customer = customer_data[index]
for domain, info in customer.items():
if 'customer industry' in info:
info['customer industry'] = updated_data.get(f'{domain}_industry', [info['customer industry']])[0]
if 'email addresses' in info and isinstance(info['email addresses'], list):
for j, email in enumerate(info['email addresses']):
email["owner's name"] = updated_data.get(f'{domain}_owner_name_{j}', [email["owner's name"]])[0]
email['email address'] = updated_data.get(f'{domain}_email_address_{j}', [email['email address']])[0]
email['category'] = updated_data.get(f'{domain}_category_{j}', [email['category']])[0]
email['promotion history'] = updated_data.get(f'{domain}_promotion_history_{j}', [', '.join(email['promotion history'])])[0].split(', ')
# 将更新后的数据保存到 ES 数据库
store_to_es(key, buf, 'customer')
return redirect(url_for('view_customer', index=index))
def set_customer_data(data):
global customer_data
# 确保数据为嵌套字典格式
if isinstance(data, list) and all(isinstance(customer, dict) for customer in data):
customer_data = data
else:
raise ValueError("Data must be a list of dictionaries.")
@app.route('/customer/top')
def view_top_customer():
# 跳转到数据库的第一条记录
return redirect(url_for('view_customer', index=0))
@app.route('/customer/bottom')
def view_bottom_customer():
# 跳转到数据库的最后一条记录
if not customer_data:
return "No customer data available.", 400
last_index = len(customer_data) - 1 # 计算最后一条记录的索引
return redirect(url_for('view_customer', index=last_index))
@app.route('/search')
def search_by_email():
email = request.args.get('email')
if not email:
return "Email address is required.", 400
# 遍历 customer_data 查找匹配的邮件地址
for index, customer in enumerate(customer_data):
for domain, info in customer.items():
if 'email addresses' in info and isinstance(info['email addresses'], list):
for email_record in info['email addresses']:
if email_record.get('email address') == email:
# 找到匹配的记录,跳转到对应的 index
return redirect(url_for('view_customer', index=index))
# 如果没有找到匹配的邮件地址
return "Email address not found.", 404