from flask import Flask, render_template, request, redirect, url_for import uuid import logging import json from elasticsearch import Elasticsearch from elasticsearch.exceptions import ApiError, TransportError from dataclasses import dataclass, asdict, field from typing import List, Dict, Any, Optional # 配置日志级别、格式和输出位置 logging.basicConfig( level=logging.INFO, # 设置日志级别为 INFO format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[ logging.FileHandler("app.log"), # 将日志记录到文件 logging.StreamHandler() # 同时输出到控制台 ] ) app = Flask(__name__) # 全局变量,存储传递进来的嵌套字典数据 customer_data = [] buf = {} key = "ZbIF9jKqzDyChwiB" es = Elasticsearch( ["https://es.jellydropsllc.com"], verify_certs=True, ca_certs='/etc/ssl/certs/ISRG_Root_X1.pem' # 使用 ISRG Root X1 验证 ) def store_to_es(key, data: Dict[str, Any], index: str) -> bool: aiemployee_id = data[key].get('aiemployee') if not aiemployee_id: logging.info(f".....................保存时 Data must contain an 'aiemployee' key.") return False try: # index方法将数据存储到指定的索引中,使用aiemployee作为文档ID response = es.index(index=index, id=aiemployee_id, body=data[key], refresh=True) logging.info(f"..........................The document was successfully stored/updated in ES with ID:{response}") return True except Exception as e: logging.info(f".............保存到ES出错!...............{e}..") #print(e) return False def search_es(key: str, index: str) -> Optional[Dict]: # 使用term查询来根据文档的_id搜索 query = { "query": { "term": { "_id": key } } } try: response = es.search(index=index, body=query) except Exception as e: logging.error(f"Error searching ES: {e}") return None hits = response.get('hits', {}).get('hits', []) if hits: return hits[0]['_source'] # 返回找到的第一个文档的内容 return None def load_indices_from_es(key: str, index: str, buf: Dict[str, Any]) -> Any: # 在Elasticsearch的指定索引中搜索 result = search_es(key, index) if result: # 更新 buf 字典 buf[key] = result return result else: return None def calculate_statistics(): total_emails = 0 cold_lead_count = 0 no_promotion_count = 0 # 遍历所有客户数据以计算统计信息 for customer in customer_data: for domain, info in customer.items(): if 'email addresses' in info and isinstance(info['email addresses'], list): for email in info['email addresses']: total_emails += 1 # 判断 "cold lead" 类型 if email.get('category', '').lower() == "cold lead": cold_lead_count += 1 # 判断是否有推广历史 promotion_history = email.get('promotion history', []) if not promotion_history: # 如果推广历史为空或不存在 no_promotion_count += 1 return total_emails, cold_lead_count, no_promotion_count @app.route('/customer/') def view_customer(index): # 判断是否有数据 if not customer_data: return "No customer data provided.", 400 # 确保 index 在合法范围内 if index < 0 or index >= len(customer_data): return "Index out of range.", 400 customer = customer_data[index] # 为每个 email addresses 列表中的项添加唯一的索引 for domain, info in customer.items(): print(f"..........domain={domain}") print(f"..........info={info}") if 'email addresses' in info and isinstance(info['email addresses'], list): for email_idx, email in enumerate(info['email addresses']): email['idx'] = email_idx # 给每个 email 项添加唯一的索引 is_first = index == 0 is_last = index == len(customer_data) - 1 # 计算统计数据 total_emails, cold_lead_count, no_promotion_count = calculate_statistics() return render_template('edit_customers.html', customer=customer, index=index, is_first=is_first, is_last=is_last, total_emails=total_emails, cold_lead_count=cold_lead_count, no_promotion_count=no_promotion_count) # @app.route('/customer/') # def view_customer(index): # # 判断是否有数据 # if not customer_data: # return "No customer data provided.", 400 # # 确保 index 在合法范围内 # if index < 0 or index >= len(customer_data): # return "Index out of range.", 400 # customer = customer_data[index] # # 为每个 email addresses 列表中的项添加唯一的索引 # for domain, info in customer.items(): # print(f"..........domain={domain}") # print(f"..........info={info}") # if 'email addresses' in info and isinstance(info['email addresses'], list): # for email_idx, email in enumerate(info['email addresses']): # email['idx'] = email_idx # 给每个 email 项添加唯一的索引 # is_first = index == 0 # is_last = index == len(customer_data) - 1 # return render_template('edit_customers.html', # customer=customer, # index=index, # is_first=is_first, # is_last=is_last) @app.route('/update/', methods=['POST']) def update_customers(index): updated_data = request.form.to_dict(flat=False) # 更新嵌套字典数据 if 0 <= index < len(customer_data): customer = customer_data[index] for domain, info in customer.items(): if 'customer industry' in info: info['customer industry'] = updated_data.get(f'{domain}_industry', [info['customer industry']])[0] if 'email addresses' in info and isinstance(info['email addresses'], list): for j, email in enumerate(info['email addresses']): email["owner's name"] = updated_data.get(f'{domain}_owner_name_{j}', [email["owner's name"]])[0] email['email address'] = updated_data.get(f'{domain}_email_address_{j}', [email['email address']])[0] email['category'] = updated_data.get(f'{domain}_category_{j}', [email['category']])[0] #email['promotion history'] = updated_data.get(f'{domain}_promotion_history_{j}', [', '.join(email['promotion history'])])[0].split(', ') if 'promotion history' in email: email['promotion history'] = updated_data.get(f'{domain}_promotion_history_{j}', [', '.join(email['promotion history'])])[0].split(', ') # 如果 'promotion history' 不存在,什么都不做,直接跳过 # 将更新后的数据保存到 ES 数据库 store_to_es(key, buf, 'customer') return redirect(url_for('view_customer', index=index)) def set_customer_data(data): global customer_data # 确保数据为嵌套字典格式 if isinstance(data, list) and all(isinstance(customer, dict) for customer in data): customer_data = data else: raise ValueError("Data must be a list of dictionaries.") @app.route('/customer/top') def view_top_customer(): # 跳转到数据库的第一条记录 return redirect(url_for('view_customer', index=0)) @app.route('/customer/bottom') def view_bottom_customer(): # 跳转到数据库的最后一条记录 if not customer_data: return "No customer data available.", 400 last_index = len(customer_data) - 1 # 计算最后一条记录的索引 return redirect(url_for('view_customer', index=last_index)) @app.route('/search') def search_by_email(): email = request.args.get('email') if not email: return "Email address is required.", 400 # 遍历 customer_data 查找匹配的邮件地址 for index, customer in enumerate(customer_data): for domain, info in customer.items(): if 'email addresses' in info and isinstance(info['email addresses'], list): for email_record in info['email addresses']: if email_record.get('email address') == email: # 找到匹配的记录,跳转到对应的 index return redirect(url_for('view_customer', index=index)) # 如果没有找到匹配的邮件地址 return "Email address not found.", 404