diff --git a/__pycache__/app.cpython-39.pyc b/__pycache__/app.cpython-39.pyc new file mode 100644 index 0000000..094639c Binary files /dev/null and b/__pycache__/app.cpython-39.pyc differ diff --git a/app.py b/app.py new file mode 100644 index 0000000..65bca58 --- /dev/null +++ b/app.py @@ -0,0 +1,66 @@ +from flask import Flask, render_template, request, redirect, url_for + +app = Flask(__name__) + +# 全局变量,存储传递进来的嵌套字典数据 +customer_data = [] + +@app.route('/') +def index(): + # 默认展示第一个客户 + return redirect(url_for('view_customer', index=0)) + +@app.route('/customer/') +def view_customer(index): + # 判断是否有数据 + if not customer_data: + return "No customer data provided.", 400 + + # 确保index在合法范围内 + if index < 0 or index >= len(customer_data): + return "Index out of range.", 400 + + customer = customer_data[index] + + # 为每个 email_addresses 列表中的项添加唯一的索引 + for domain, info in customer.items(): + if 'email_addresses' in info and isinstance(info['email_addresses'], list): + for email_idx, email in enumerate(info['email_addresses']): + email['idx'] = email_idx # 给每个email项添加唯一的索引 + + is_first = index == 0 + is_last = index == len(customer_data) - 1 + + return render_template('edit_customers.html', + customer=customer, + index=index, + is_first=is_first, + is_last=is_last) + +@app.route('/update/', methods=['POST']) +def update_customers(index): + updated_data = request.form.to_dict(flat=False) + + # 更新嵌套字典数据 + if 0 <= index < len(customer_data): + customer = customer_data[index] + for domain, info in customer.items(): + if 'customer_industry' in info: + info['customer_industry'] = updated_data.get(f'{domain}_industry', [info['customer_industry']])[0] + + if 'email_addresses' in info and isinstance(info['email_addresses'], list): + for j, email in enumerate(info['email_addresses']): + email['owner_name'] = updated_data.get(f'{domain}_owner_name_{j}', [email['owner_name']])[0] + email['email_address'] = updated_data.get(f'{domain}_email_address_{j}', [email['email_address']])[0] + email['category'] = updated_data.get(f'{domain}_category_{j}', [email['category']])[0] + email['promotion_history'] = updated_data.get(f'{domain}_promotion_history_{j}', [', '.join(email['promotion_history'])])[0].split(', ') + + return redirect(url_for('view_customer', index=index)) + +def set_customer_data(data): + global customer_data + # 确保数据为嵌套字典格式 + if isinstance(data, list) and all(isinstance(customer, dict) for customer in data): + customer_data = data + else: + raise ValueError("Data must be a list of dictionaries.") diff --git a/caller.py b/caller.py new file mode 100644 index 0000000..b6a2177 --- /dev/null +++ b/caller.py @@ -0,0 +1,118 @@ +import app +import json +from elasticsearch import Elasticsearch +from elasticsearch.exceptions import ApiError, TransportError +from dataclasses import dataclass, asdict, field +from typing import List, Dict, Any, Optional +import uuid +import logging +from datetime import datetime, timedelta +import threading +from datetime import datetime, timezone + + +es = Elasticsearch( + ["https://es.jellydropsllc.com"], + verify_certs=True, + ca_certs='/etc/ssl/certs/ISRG_Root_X1.pem' # 使用 ISRG Root X1 验证 +) + + +def store_to_es(key, data: Dict[str, Any], index: str) -> bool: + aiemployee_id = data[key].get('aiemployee') + if not aiemployee_id: + logging.info(f".....................保存时 Data must contain an 'aiemployee' key.") + return False + try: + # index方法将数据存储到指定的索引中,使用aiemployee作为文档ID + response = es.index(index=index, id=aiemployee_id, body=data[key], refresh=True) + + logging.info(f"..........................The document was successfully stored/updated in ES with ID:{response}") + return True + except Exception as e: + logging.info(f".............保存到ES出错!...............{e}..") + #print(e) + return False + + +def search_es(key: str, index: str) -> Optional[Dict]: + # 使用term查询来根据文档的_id搜索 + query = { + "query": { + "term": { "_id": key } + } + } + try: + response = es.search(index=index, body=query) + except Exception as e: + logging.error(f"Error searching ES: {e}") + return None + + hits = response.get('hits', {}).get('hits', []) + if hits: + return hits[0]['_source'] # 返回找到的第一个文档的内容 + return None + + +def load_indices_from_es(key: str, index: str, buf: Dict[str, Any]) -> Any: + # 在Elasticsearch的指定索引中搜索 + result = search_es(key, index) + if result: + # 更新 buf 字典 + buf[key] = result + return result + else: + return None + + + +# 示例客户数据,包含多个 email_addresses +# customer_data = [ +# { +# "jellydropsllc.com": { +# "email_addresses": [ +# { +# "owner_name": "John Doe", +# "email_address": "john@jellydropsllc.com", +# "category": "lead", +# "promotion_history": ["2024-09-15T09:45:30.123456"] +# }, +# { +# "owner_name": "Jane Doe", +# "email_address": "jane@jellydropsllc.com", +# "category": "contact", +# "promotion_history": ["2024-09-16T10:30:15.654321"] +# } +# ], +# "customer_industry": "legal industry" +# } +# }, +# { +# "singhahluwalia.com": { +# "email_addresses": [ +# { +# "owner_name": "Alice Smith", +# "email_address": "alice@singhahluwalia.com", +# "category": "cold lead", +# "promotion_history": [ +# "2024-10-04T15:31:03.385288" +# ] +# } +# ], +# "customer_industry": "legal industry" +# } +# } +# ] + +# 传递数据并启动 Flask 应用 +# store_to_es(key, buf, 'customer') + +buf = {} +key = "ZbIF9jKqzDyChwiB" +ret = load_indices_from_es(key, 'customer', buf) +if not ret: + exit("客服AI员工不能拥有数据库。") + +customer_data=buf[key]['customer'] +app.set_customer_data(customer_data) +app.app.run() diff --git a/templates/edit_customers.html b/templates/edit_customers.html new file mode 100644 index 0000000..0a1cd19 --- /dev/null +++ b/templates/edit_customers.html @@ -0,0 +1,46 @@ + + + + + + Edit Customer Data + + +

Edit Customer Data

+ +
+ {% for domain, info in customer.items() %} +

Domain: {{ domain }}

+

Customer Industry:

+ +

Email Addresses

+
+ {% if info.get('email_addresses') %} + {% for email in info['email_addresses'] %} +
+

Owner Name:

+

Email Address:

+

Category:

+

Promotion History:

+
+ {% endfor %} + {% else %} +

No email addresses available.

+ {% endif %} +
+ {% endfor %} + +
+ +
+ +
+ + +
+ +