This commit is contained in:
hailin 2024-10-28 22:18:43 +08:00
parent b40708d0ed
commit ea4dee565a
2 changed files with 71 additions and 2 deletions

71
app.py
View File

@ -2,11 +2,11 @@ from flask import Flask, render_template, request, redirect, url_for
import uuid
import logging
import json
from elasticsearch import Elasticsearch
from elasticsearch import Elasticsearch, helpers
from elasticsearch.exceptions import ApiError, TransportError
from dataclasses import dataclass, asdict, field
from typing import List, Dict, Any, Optional
import json
# 配置日志级别、格式和输出位置
logging.basicConfig(
@ -81,6 +81,62 @@ def load_indices_from_es(key: str, index: str, buf: Dict[str, Any]) -> Any:
return None
def extract_index_structure(es_client, index_name):
"""
扫描 Elasticsearch 索引中的一个文档提取字段结构包括嵌套字段生成包含所有字段的层级关系和类型信息
:param es_client: Elasticsearch 客户端实例
:param index_name: 要提取的索引名称
:return: 包含字段结构的字典
"""
try:
# 获取索引中的一个文档
response = es_client.search(index=index_name, size=1)
field_structure = {}
# 递归解析字段结构
def parse_document(doc, parent_key="", existing_fields=None):
if existing_fields is None:
existing_fields = {}
if isinstance(doc, dict):
for key, value in doc.items():
full_key = f"{parent_key}.{key}" if parent_key else key
if full_key not in existing_fields:
existing_fields[full_key] = type(value).__name__
if isinstance(value, dict):
parse_document(value, full_key, existing_fields)
elif isinstance(value, list) and len(value) > 0:
if isinstance(value[0], dict):
parse_document(value[0], full_key, existing_fields)
else:
existing_fields[full_key] = f"list[{type(value[0]).__name__}]"
else:
existing_fields[full_key] = type(value).__name__
elif isinstance(doc, list):
if len(doc) > 0:
if isinstance(doc[0], dict):
parse_document(doc[0], parent_key, existing_fields)
else:
existing_fields[parent_key] = f"list[{type(doc[0]).__name__}]"
else:
existing_fields[parent_key] = type(doc).__name__
return existing_fields
# 解析获取到的第一个文档的字段结构
if 'hits' in response and 'hits' in response['hits'] and len(response['hits']['hits']) > 0:
source = response['hits']['hits'][0].get('_source', {})
field_structure = parse_document(source)
return field_structure
except Exception as e:
print(f"Error extracting structure for index {index_name}: {e}")
return {}
@app.route('/')
def index():
# 默认展示第一个客户
@ -88,6 +144,9 @@ def index():
def calculate_statistics():
total_emails = 0
cold_lead_count = 0
@ -215,3 +274,11 @@ def search_by_email():
# 如果没有找到匹配的邮件地址
return "Email address not found.", 404
# 提取字段结构
field_structure = extract_index_structure(es, 'customer')
# 打印字段结构
print(json.dumps(field_structure, indent=2, ensure_ascii=False))

View File

@ -14,4 +14,6 @@ if not ret:
customer_data=buf[key]['customer database']
app.set_customer_data(customer_data)
app.app.run(host="0.0.0.0", port=5001)