cm/app.py

329 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Flask, render_template, request, redirect, url_for
import uuid
import logging
import json
from elasticsearch import Elasticsearch, helpers
from elasticsearch.exceptions import ApiError, TransportError
from dataclasses import dataclass, asdict, field
from typing import List, Dict, Any, Optional
import json
# 配置日志级别、格式和输出位置
logging.basicConfig(
level=logging.INFO, # 设置日志级别为 INFO
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("app.log"), # 将日志记录到文件
logging.StreamHandler() # 同时输出到控制台
]
)
app = Flask(__name__)
# 全局变量,存储传递进来的嵌套字典数据
customer_data = []
buf = {}
key = "ZbIF9jKqzDyChwiB"
es = Elasticsearch(
["https://es.jellydropsllc.com"],
verify_certs=True,
ca_certs='/etc/ssl/certs/ISRG_Root_X1.pem' # 使用 ISRG Root X1 验证
)
def store_to_es(key, data: Dict[str, Any], index: str) -> bool:
aiemployee_id = data[key].get('aiemployee')
if not aiemployee_id:
logging.info(f".....................保存时 Data must contain an 'aiemployee' key.")
return False
try:
# index方法将数据存储到指定的索引中使用aiemployee作为文档ID
response = es.index(index=index, id=aiemployee_id, body=data[key], refresh=True)
logging.info(f"..........................The document was successfully stored/updated in ES with ID:{response}")
return True
except Exception as e:
logging.info(f".............保存到ES出错...............{e}..")
#print(e)
return False
def search_es(key: str, index: str) -> Optional[Dict]:
# 使用term查询来根据文档的_id搜索
query = {
"query": {
"term": { "_id": key }
}
}
try:
response = es.search(index=index, body=query)
except Exception as e:
logging.error(f"Error searching ES: {e}")
return None
hits = response.get('hits', {}).get('hits', [])
if hits:
return hits[0]['_source'] # 返回找到的第一个文档的内容
return None
def load_indices_from_es(key: str, index: str, buf: Dict[str, Any]) -> Any:
# 在Elasticsearch的指定索引中搜索
result = search_es(key, index)
if result:
# 更新 buf 字典
buf[key] = result
return result
else:
return None
def extract_index_structure(es_client, index_name):
"""
扫描 Elasticsearch 索引中的一个文档,递归提取字段结构(包括嵌套字段),生成包含所有字段的层级关系和类型信息。
:param es_client: Elasticsearch 客户端实例
:param index_name: 要提取的索引名称
:return: 包含字段结构的字典
"""
try:
# 获取索引中的一个文档
response = es_client.search(index=index_name, size=1)
field_structure = {}
# 递归解析字段结构
def parse_document(doc, parent_key="", existing_fields=None):
if existing_fields is None:
existing_fields = {}
if isinstance(doc, dict):
for key, value in doc.items():
full_key = f"{parent_key}.{key}" if parent_key else key
if isinstance(value, dict):
existing_fields[full_key] = "dict"
parse_document(value, full_key, existing_fields)
elif isinstance(value, list):
# 遍历列表中的所有元素,确保全面检查类型
list_types = set()
for item in value:
list_types.add(type(item).__name__)
if isinstance(item, dict):
parse_document(item, full_key, existing_fields)
if len(list_types) == 0:
existing_fields[full_key] = "list[empty]"
elif len(list_types) == 1:
existing_fields[full_key] = f"list[{list_types.pop()}]"
else:
existing_fields[full_key] = "list[mixed]"
else:
existing_fields[full_key] = type(value).__name__
elif isinstance(doc, list):
# 如果是根级别的列表,遍历元素并记录类型
list_types = set()
for item in doc:
list_types.add(type(item).__name__)
if isinstance(item, dict):
parse_document(item, parent_key, existing_fields)
if len(list_types) == 0:
existing_fields[parent_key] = "list[empty]"
elif len(list_types) == 1:
existing_fields[parent_key] = f"list[{list_types.pop()}]"
else:
existing_fields[parent_key] = "list[mixed]"
else:
existing_fields[parent_key] = type(doc).__name__
return existing_fields
# 解析获取到的第一个文档的字段结构
if 'hits' in response and 'hits' in response['hits'] and len(response['hits']['hits']) > 0:
source = response['hits']['hits'][0].get('_source', {})
field_structure = parse_document(source)
return field_structure
except Exception as e:
print(f"Error extracting structure for index {index_name}: {e}")
return {}
@app.route('/')
def index():
# 默认展示第一个客户
return redirect(url_for('view_customer', index=0))
# New function to count the number of companies
def count_companies():
company_count = 0
for customer in customer_data:
company_count += len(customer) # Each domain in the customer dictionary represents a company
return company_count
def calculate_statistics():
total_emails = 0
cold_lead_count = 0
no_promotion_count = 0
disqualified_lead_count = 0
english_cold_lead_count = 0 # Field for English "cold lead" customers
chinese_cold_lead_count = 0 # Field for Chinese "cold lead" customers
other_lead_count = 0 # New field for other leads (not cold or disqualified)
# Iterate over all customer data to calculate statistics
for customer in customer_data:
for domain, info in customer.items():
if 'email addresses' in info and isinstance(info['email addresses'], list):
for email in info['email addresses']:
total_emails += 1
category = email.get('category', '').lower()
# Count "cold lead" type
if category == "cold lead":
cold_lead_count += 1
# Check language-specific cold leads
language = email.get('language', 'en') # Default to English if language is absent or other
if language == 'cn':
chinese_cold_lead_count += 1
else:
english_cold_lead_count += 1
# Count "disqualified lead" type
elif category == "disqualified lead":
disqualified_lead_count += 1
# Count emails that are neither "cold lead" nor "disqualified lead"
else:
other_lead_count += 1
# Check if there is a promotion history
promotion_history = email.get('promotion history', [])
if not promotion_history: # If promotion history is empty or nonexistent
no_promotion_count += 1
return (total_emails, cold_lead_count, no_promotion_count, disqualified_lead_count,
english_cold_lead_count, chinese_cold_lead_count, other_lead_count)
@app.route('/customer/<int:index>')
def view_customer(index):
# 判断是否有数据
if not customer_data:
return "No customer data provided.", 400
# 确保 index 在合法范围内
if index < 0 or index >= len(customer_data):
return "Index out of range.", 400
customer = customer_data[index]
# 为每个 email addresses 列表中的项添加唯一的索引
for domain, info in customer.items():
print(f"..........domain={domain}")
print(f"..........info={info}")
if 'email addresses' in info and isinstance(info['email addresses'], list):
for email_idx, email in enumerate(info['email addresses']):
email['idx'] = email_idx # 给每个 email 项添加唯一的索引
is_first = index == 0
is_last = index == len(customer_data) - 1
# Call the statistics function
(total_emails, cold_lead_count, no_promotion_count, disqualified_lead_count,
english_cold_lead_count, chinese_cold_lead_count, other_lead_count) = calculate_statistics()
# Call new function to count companies
company_count = count_companies()
return render_template('edit_customers.html',
customer=customer,
index=index,
is_first=is_first,
is_last=is_last,
total_emails=total_emails,
cold_lead_count=cold_lead_count,
no_promotion_count=no_promotion_count,
disqualified_lead_count=disqualified_lead_count,
english_cold_lead_count=english_cold_lead_count,
chinese_cold_lead_count=chinese_cold_lead_count,
other_lead_count=other_lead_count,
company_count=company_count)
@app.route('/update/<int:index>', methods=['POST'])
def update_customers(index):
updated_data = request.form.to_dict(flat=False)
# 更新嵌套字典数据
if 0 <= index < len(customer_data):
customer = customer_data[index]
for domain, info in customer.items():
if 'customer industry' in info:
info['customer industry'] = updated_data.get(f'{domain}_industry', [info['customer industry']])[0]
if 'email addresses' in info and isinstance(info['email addresses'], list):
for j, email in enumerate(info['email addresses']):
email["owner's name"] = updated_data.get(f'{domain}_owner_name_{j}', [email["owner's name"]])[0]
email['email address'] = updated_data.get(f'{domain}_email_address_{j}', [email['email address']])[0]
email['category'] = updated_data.get(f'{domain}_category_{j}', [email['category']])[0]
#email['promotion history'] = updated_data.get(f'{domain}_promotion_history_{j}', [', '.join(email['promotion history'])])[0].split(', ')
if 'promotion history' in email:
email['promotion history'] = updated_data.get(f'{domain}_promotion_history_{j}', [', '.join(email['promotion history'])])[0].split(', ')
# 如果 'promotion history' 不存在,什么都不做,直接跳过
# 将更新后的数据保存到 ES 数据库
store_to_es(key, buf, 'customer')
return redirect(url_for('view_customer', index=index))
def set_customer_data(data):
global customer_data
# 确保数据为嵌套字典格式
if isinstance(data, list) and all(isinstance(customer, dict) for customer in data):
customer_data = data
else:
raise ValueError("Data must be a list of dictionaries.")
@app.route('/customer/top')
def view_top_customer():
# 跳转到数据库的第一条记录
return redirect(url_for('view_customer', index=0))
@app.route('/customer/bottom')
def view_bottom_customer():
# 跳转到数据库的最后一条记录
if not customer_data:
return "No customer data available.", 400
last_index = len(customer_data) - 1 # 计算最后一条记录的索引
return redirect(url_for('view_customer', index=last_index))
@app.route('/search')
def search_by_email():
email = request.args.get('email')
if not email:
return "Email address is required.", 400
# 遍历 customer_data 查找匹配的邮件地址
for index, customer in enumerate(customer_data):
for domain, info in customer.items():
if 'email addresses' in info and isinstance(info['email addresses'], list):
for email_record in info['email addresses']:
if email_record.get('email address') == email:
# 找到匹配的记录,跳转到对应的 index
return redirect(url_for('view_customer', index=index))
# 如果没有找到匹配的邮件地址
return "Email address not found.", 404
# # 提取字段结构
# field_structure = extract_index_structure(es, 'customer')
# # 打印字段结构
# print(json.dumps(field_structure, indent=2, ensure_ascii=False))