298 lines
11 KiB
Python
298 lines
11 KiB
Python
from flask import Flask, render_template, request, redirect, url_for
|
||
import uuid
|
||
import logging
|
||
import json
|
||
from elasticsearch import Elasticsearch, helpers
|
||
from elasticsearch.exceptions import ApiError, TransportError
|
||
from dataclasses import dataclass, asdict, field
|
||
from typing import List, Dict, Any, Optional
|
||
import json
|
||
|
||
# 配置日志级别、格式和输出位置
|
||
logging.basicConfig(
|
||
level=logging.INFO, # 设置日志级别为 INFO
|
||
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
||
handlers=[
|
||
logging.FileHandler("app.log"), # 将日志记录到文件
|
||
logging.StreamHandler() # 同时输出到控制台
|
||
]
|
||
)
|
||
|
||
|
||
|
||
app = Flask(__name__)
|
||
|
||
# 全局变量,存储传递进来的嵌套字典数据
|
||
customer_data = []
|
||
buf = {}
|
||
key = "ZbIF9jKqzDyChwiB"
|
||
|
||
es = Elasticsearch(
|
||
["https://es.jellydropsllc.com"],
|
||
verify_certs=True,
|
||
ca_certs='/etc/ssl/certs/ISRG_Root_X1.pem' # 使用 ISRG Root X1 验证
|
||
)
|
||
|
||
|
||
def store_to_es(key, data: Dict[str, Any], index: str) -> bool:
|
||
aiemployee_id = data[key].get('aiemployee')
|
||
if not aiemployee_id:
|
||
logging.info(f".....................保存时 Data must contain an 'aiemployee' key.")
|
||
return False
|
||
try:
|
||
# index方法将数据存储到指定的索引中,使用aiemployee作为文档ID
|
||
response = es.index(index=index, id=aiemployee_id, body=data[key], refresh=True)
|
||
|
||
logging.info(f"..........................The document was successfully stored/updated in ES with ID:{response}")
|
||
return True
|
||
except Exception as e:
|
||
logging.info(f".............保存到ES出错!...............{e}..")
|
||
#print(e)
|
||
return False
|
||
|
||
|
||
def search_es(key: str, index: str) -> Optional[Dict]:
|
||
# 使用term查询来根据文档的_id搜索
|
||
query = {
|
||
"query": {
|
||
"term": { "_id": key }
|
||
}
|
||
}
|
||
try:
|
||
response = es.search(index=index, body=query)
|
||
except Exception as e:
|
||
logging.error(f"Error searching ES: {e}")
|
||
return None
|
||
|
||
hits = response.get('hits', {}).get('hits', [])
|
||
if hits:
|
||
return hits[0]['_source'] # 返回找到的第一个文档的内容
|
||
return None
|
||
|
||
|
||
def load_indices_from_es(key: str, index: str, buf: Dict[str, Any]) -> Any:
|
||
# 在Elasticsearch的指定索引中搜索
|
||
result = search_es(key, index)
|
||
if result:
|
||
# 更新 buf 字典
|
||
buf[key] = result
|
||
return result
|
||
else:
|
||
return None
|
||
|
||
|
||
def extract_index_structure(es_client, index_name):
|
||
"""
|
||
扫描 Elasticsearch 索引中的一个文档,递归提取字段结构(包括嵌套字段),生成包含所有字段的层级关系和类型信息。
|
||
|
||
:param es_client: Elasticsearch 客户端实例
|
||
:param index_name: 要提取的索引名称
|
||
:return: 包含字段结构的字典
|
||
"""
|
||
try:
|
||
# 获取索引中的一个文档
|
||
response = es_client.search(index=index_name, size=1)
|
||
field_structure = {}
|
||
|
||
# 递归解析字段结构
|
||
def parse_document(doc, parent_key="", existing_fields=None):
|
||
if existing_fields is None:
|
||
existing_fields = {}
|
||
|
||
if isinstance(doc, dict):
|
||
for key, value in doc.items():
|
||
full_key = f"{parent_key}.{key}" if parent_key else key
|
||
if isinstance(value, dict):
|
||
existing_fields[full_key] = "dict"
|
||
parse_document(value, full_key, existing_fields)
|
||
elif isinstance(value, list):
|
||
# 遍历列表中的所有元素,确保全面检查类型
|
||
list_types = set()
|
||
for item in value:
|
||
list_types.add(type(item).__name__)
|
||
if isinstance(item, dict):
|
||
parse_document(item, full_key, existing_fields)
|
||
|
||
if len(list_types) == 0:
|
||
existing_fields[full_key] = "list[empty]"
|
||
elif len(list_types) == 1:
|
||
existing_fields[full_key] = f"list[{list_types.pop()}]"
|
||
else:
|
||
existing_fields[full_key] = "list[mixed]"
|
||
else:
|
||
existing_fields[full_key] = type(value).__name__
|
||
elif isinstance(doc, list):
|
||
# 如果是根级别的列表,遍历元素并记录类型
|
||
list_types = set()
|
||
for item in doc:
|
||
list_types.add(type(item).__name__)
|
||
if isinstance(item, dict):
|
||
parse_document(item, parent_key, existing_fields)
|
||
|
||
if len(list_types) == 0:
|
||
existing_fields[parent_key] = "list[empty]"
|
||
elif len(list_types) == 1:
|
||
existing_fields[parent_key] = f"list[{list_types.pop()}]"
|
||
else:
|
||
existing_fields[parent_key] = "list[mixed]"
|
||
else:
|
||
existing_fields[parent_key] = type(doc).__name__
|
||
|
||
return existing_fields
|
||
|
||
# 解析获取到的第一个文档的字段结构
|
||
if 'hits' in response and 'hits' in response['hits'] and len(response['hits']['hits']) > 0:
|
||
source = response['hits']['hits'][0].get('_source', {})
|
||
field_structure = parse_document(source)
|
||
|
||
return field_structure
|
||
except Exception as e:
|
||
print(f"Error extracting structure for index {index_name}: {e}")
|
||
return {}
|
||
|
||
|
||
@app.route('/')
|
||
def index():
|
||
# 默认展示第一个客户
|
||
return redirect(url_for('view_customer', index=0))
|
||
|
||
|
||
|
||
|
||
|
||
|
||
def calculate_statistics():
|
||
total_emails = 0
|
||
cold_lead_count = 0
|
||
no_promotion_count = 0
|
||
|
||
# 遍历所有客户数据以计算统计信息
|
||
for customer in customer_data:
|
||
for domain, info in customer.items():
|
||
if 'email addresses' in info and isinstance(info['email addresses'], list):
|
||
for email in info['email addresses']:
|
||
total_emails += 1
|
||
# 判断 "cold lead" 类型
|
||
if email.get('category', '').lower() == "cold lead":
|
||
cold_lead_count += 1
|
||
# 判断是否有推广历史
|
||
promotion_history = email.get('promotion history', [])
|
||
if not promotion_history: # 如果推广历史为空或不存在
|
||
no_promotion_count += 1
|
||
|
||
return total_emails, cold_lead_count, no_promotion_count
|
||
|
||
|
||
@app.route('/customer/<int:index>')
|
||
def view_customer(index):
|
||
# 判断是否有数据
|
||
if not customer_data:
|
||
return "No customer data provided.", 400
|
||
|
||
# 确保 index 在合法范围内
|
||
if index < 0 or index >= len(customer_data):
|
||
return "Index out of range.", 400
|
||
|
||
customer = customer_data[index]
|
||
|
||
# 为每个 email addresses 列表中的项添加唯一的索引
|
||
for domain, info in customer.items():
|
||
print(f"..........domain={domain}")
|
||
print(f"..........info={info}")
|
||
if 'email addresses' in info and isinstance(info['email addresses'], list):
|
||
for email_idx, email in enumerate(info['email addresses']):
|
||
email['idx'] = email_idx # 给每个 email 项添加唯一的索引
|
||
|
||
is_first = index == 0
|
||
is_last = index == len(customer_data) - 1
|
||
|
||
# 调用统计函数,获取统计数据
|
||
total_emails, cold_lead_count, no_promotion_count = calculate_statistics()
|
||
|
||
return render_template('edit_customers.html',
|
||
customer=customer,
|
||
index=index,
|
||
is_first=is_first,
|
||
is_last=is_last,
|
||
total_emails=total_emails,
|
||
cold_lead_count=cold_lead_count,
|
||
no_promotion_count=no_promotion_count)
|
||
|
||
|
||
|
||
@app.route('/update/<int:index>', methods=['POST'])
|
||
def update_customers(index):
|
||
updated_data = request.form.to_dict(flat=False)
|
||
|
||
# 更新嵌套字典数据
|
||
if 0 <= index < len(customer_data):
|
||
customer = customer_data[index]
|
||
for domain, info in customer.items():
|
||
if 'customer industry' in info:
|
||
info['customer industry'] = updated_data.get(f'{domain}_industry', [info['customer industry']])[0]
|
||
|
||
if 'email addresses' in info and isinstance(info['email addresses'], list):
|
||
for j, email in enumerate(info['email addresses']):
|
||
email["owner's name"] = updated_data.get(f'{domain}_owner_name_{j}', [email["owner's name"]])[0]
|
||
email['email address'] = updated_data.get(f'{domain}_email_address_{j}', [email['email address']])[0]
|
||
email['category'] = updated_data.get(f'{domain}_category_{j}', [email['category']])[0]
|
||
#email['promotion history'] = updated_data.get(f'{domain}_promotion_history_{j}', [', '.join(email['promotion history'])])[0].split(', ')
|
||
if 'promotion history' in email:
|
||
email['promotion history'] = updated_data.get(f'{domain}_promotion_history_{j}', [', '.join(email['promotion history'])])[0].split(', ')
|
||
# 如果 'promotion history' 不存在,什么都不做,直接跳过
|
||
|
||
# 将更新后的数据保存到 ES 数据库
|
||
store_to_es(key, buf, 'customer')
|
||
|
||
return redirect(url_for('view_customer', index=index))
|
||
|
||
def set_customer_data(data):
|
||
global customer_data
|
||
# 确保数据为嵌套字典格式
|
||
if isinstance(data, list) and all(isinstance(customer, dict) for customer in data):
|
||
customer_data = data
|
||
else:
|
||
raise ValueError("Data must be a list of dictionaries.")
|
||
|
||
|
||
@app.route('/customer/top')
|
||
def view_top_customer():
|
||
# 跳转到数据库的第一条记录
|
||
return redirect(url_for('view_customer', index=0))
|
||
|
||
@app.route('/customer/bottom')
|
||
def view_bottom_customer():
|
||
# 跳转到数据库的最后一条记录
|
||
if not customer_data:
|
||
return "No customer data available.", 400
|
||
|
||
last_index = len(customer_data) - 1 # 计算最后一条记录的索引
|
||
return redirect(url_for('view_customer', index=last_index))
|
||
|
||
|
||
|
||
@app.route('/search')
|
||
def search_by_email():
|
||
email = request.args.get('email')
|
||
if not email:
|
||
return "Email address is required.", 400
|
||
|
||
# 遍历 customer_data 查找匹配的邮件地址
|
||
for index, customer in enumerate(customer_data):
|
||
for domain, info in customer.items():
|
||
if 'email addresses' in info and isinstance(info['email addresses'], list):
|
||
for email_record in info['email addresses']:
|
||
if email_record.get('email address') == email:
|
||
# 找到匹配的记录,跳转到对应的 index
|
||
return redirect(url_for('view_customer', index=index))
|
||
|
||
# 如果没有找到匹配的邮件地址
|
||
return "Email address not found.", 404
|
||
|
||
|
||
|
||
# 提取字段结构
|
||
field_structure = extract_index_structure(es, 'customer')
|
||
|
||
# 打印字段结构
|
||
print(json.dumps(field_structure, indent=2, ensure_ascii=False)) |