diff --git a/app.py b/app.py index f0754c8..ee21705 100644 --- a/app.py +++ b/app.py @@ -2,11 +2,11 @@ from flask import Flask, render_template, request, redirect, url_for import uuid import logging import json -from elasticsearch import Elasticsearch +from elasticsearch import Elasticsearch, helpers from elasticsearch.exceptions import ApiError, TransportError from dataclasses import dataclass, asdict, field from typing import List, Dict, Any, Optional - +import json # 配置日志级别、格式和输出位置 logging.basicConfig( @@ -81,6 +81,62 @@ def load_indices_from_es(key: str, index: str, buf: Dict[str, Any]) -> Any: return None + +def extract_index_structure(es_client, index_name): + """ + 扫描 Elasticsearch 索引中的一个文档,提取字段结构(包括嵌套字段),生成包含所有字段的层级关系和类型信息。 + + :param es_client: Elasticsearch 客户端实例 + :param index_name: 要提取的索引名称 + :return: 包含字段结构的字典 + """ + try: + # 获取索引中的一个文档 + response = es_client.search(index=index_name, size=1) + field_structure = {} + + # 递归解析字段结构 + def parse_document(doc, parent_key="", existing_fields=None): + if existing_fields is None: + existing_fields = {} + + if isinstance(doc, dict): + for key, value in doc.items(): + full_key = f"{parent_key}.{key}" if parent_key else key + if full_key not in existing_fields: + existing_fields[full_key] = type(value).__name__ + if isinstance(value, dict): + parse_document(value, full_key, existing_fields) + elif isinstance(value, list) and len(value) > 0: + if isinstance(value[0], dict): + parse_document(value[0], full_key, existing_fields) + else: + existing_fields[full_key] = f"list[{type(value[0]).__name__}]" + else: + existing_fields[full_key] = type(value).__name__ + elif isinstance(doc, list): + if len(doc) > 0: + if isinstance(doc[0], dict): + parse_document(doc[0], parent_key, existing_fields) + else: + existing_fields[parent_key] = f"list[{type(doc[0]).__name__}]" + else: + existing_fields[parent_key] = type(doc).__name__ + + return existing_fields + + # 解析获取到的第一个文档的字段结构 + if 'hits' in response and 'hits' in response['hits'] and len(response['hits']['hits']) > 0: + source = response['hits']['hits'][0].get('_source', {}) + field_structure = parse_document(source) + + return field_structure + except Exception as e: + print(f"Error extracting structure for index {index_name}: {e}") + return {} + + + @app.route('/') def index(): # 默认展示第一个客户 @@ -88,6 +144,9 @@ def index(): + + + def calculate_statistics(): total_emails = 0 cold_lead_count = 0 @@ -215,3 +274,11 @@ def search_by_email(): # 如果没有找到匹配的邮件地址 return "Email address not found.", 404 + + + +# 提取字段结构 +field_structure = extract_index_structure(es, 'customer') + +# 打印字段结构 +print(json.dumps(field_structure, indent=2, ensure_ascii=False)) \ No newline at end of file diff --git a/caller.py b/caller.py index 3bca0b1..cc6f472 100644 --- a/caller.py +++ b/caller.py @@ -14,4 +14,6 @@ if not ret: customer_data=buf[key]['customer database'] app.set_customer_data(customer_data) + + app.app.run(host="0.0.0.0", port=5001)