From 8a4e59c556aade0bb6aab58352b413dad6eec460 Mon Sep 17 00:00:00 2001 From: hailin Date: Mon, 21 Jul 2025 18:58:35 +0800 Subject: [PATCH] . --- config/default.json | 6 ++ email_ui.py | 90 ++++++++++++++++++++++++++++ requirements.txt | 2 + send_email_module.py | 137 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 235 insertions(+) create mode 100644 config/default.json create mode 100644 email_ui.py create mode 100644 requirements.txt create mode 100644 send_email_module.py diff --git a/config/default.json b/config/default.json new file mode 100644 index 0000000..b58354d --- /dev/null +++ b/config/default.json @@ -0,0 +1,6 @@ +{ + "outgoing server": "smtp.example.com", + "email user": "no-reply@example.com", + "email password": "YOUR_APP_PASSWORD" +} + diff --git a/email_ui.py b/email_ui.py new file mode 100644 index 0000000..d38c261 --- /dev/null +++ b/email_ui.py @@ -0,0 +1,90 @@ +# -*- coding: utf-8 -*- +""" +email_ui.py — 一键群发工具 (独立可跑版) +-------------------------------------------------- +运行: python email_ui.py +""" +import os, logging, random +from datetime import datetime +import gradio as gr +from send_email_module import send_email # 同目录导入 + +EMAIL_CONFIG_KEY = "default" # 对应 config/default.json +LOG_FILE = "email_send_log.txt" +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") + + +# -------------------- 内部函数 -------------------- +def _load_template(file_obj): + if file_obj is None: + return "" + with open(file_obj.name, "r", encoding="utf-8") as f: + return f.read() + + +def preview_email(to_addrs, subject, html_file, language, + want_receipt, want_read_receipt): + tpl = _load_template(html_file) + if not tpl: + return gr.HTML.update("

❌ 请先上传 HTML 模板

"), None + + preview = (tpl.replace("{{recipient_name}}", "there") + .replace("{{recipient_email}}", "example@example.com") + .replace("{{encoded_recipient_email}}", "ENCODED_EXAMPLE") + .replace("{{timestamp}}", datetime.now().isoformat())) + state = { + "addresses": [a.strip() for a in to_addrs.split(",") if a.strip()], + "subject": subject, + "template": tpl, + "language": language, + "want_receipt": want_receipt, + "want_read_receipt": want_read_receipt, + } + return gr.HTML.update(preview), state + + +def send_emails(state): + if not state: + return "⚠️ 请先 Preview。" + out_lines = [] + for addr in state["addresses"]: + res = send_email( + EMAIL_CONFIG_KEY, addr, state["subject"], state["template"], "", + request_receipt=state["want_receipt"], + request_read_receipt=state["want_read_receipt"], + language=state["language"], + ) + ts = datetime.now().isoformat(timespec="seconds") + line = f"{ts}\t{addr}\t{res['status']}\t{res['message']}" + out_lines.append(line) + with open(LOG_FILE, "a", encoding="utf-8") as f: + f.write(line + "\n") + return "\n".join(out_lines) + + +# -------------------- Gradio UI -------------------- +with gr.Blocks(css=".gr-button {min-width:6rem}") as demo: + gr.Markdown("## ✉️ 简易 EDM 群发工具") + + addrs = gr.Textbox(label="收件人(逗号分隔)", placeholder="foo@bar.com, alice@x.com") + subj = gr.Textbox(label="主题", placeholder="Subject here") + tpl = gr.File(label="上传 HTML 模板") + lang = gr.Radio(["english", "chinese"], value="english", label="语言") + with gr.Row(): + rcpt = gr.Checkbox(label="投递回执") + read_r = gr.Checkbox(label="已读回执") + + preview_btn = gr.Button("Preview ⬇️") + send_btn = gr.Button("Send ✈️", interactive=False) + out_html = gr.HTML() + state_box = gr.State() + + preview_btn.click(preview_email, [addrs, subj, tpl, lang, rcpt, read_r], + [out_html, state_box]) \ + .then(lambda s: gr.Button.update(interactive=s is not None), + inputs=state_box, outputs=send_btn) + send_btn.click(send_emails, state_box, out_html) + +if __name__ == "__main__": + demo.queue().launch() + diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..808366a --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +gradio>=4.0,<5 + diff --git a/send_email_module.py b/send_email_module.py new file mode 100644 index 0000000..a9e73b3 --- /dev/null +++ b/send_email_module.py @@ -0,0 +1,137 @@ +# -*- coding: utf-8 -*- +""" +send_email_module.py +-------------------------------------------------- +补全了你原来的 send_email 及依赖: +• load_email_config() – 读 config/.json +• detect_smtp_port() – 自动尝试 465 / 587 +• xor_encode() – 简单异或编码 +""" +import ssl, smtplib, json, logging, os, random +from datetime import datetime +from email.mime.text import MIMEText +from email.mime.multipart import MIMEMultipart +from email.utils import formataddr + +CONFIG_DIR = os.path.join(os.path.dirname(__file__), "config") +logging.basicConfig(level=logging.INFO, + format="%(asctime)s %(levelname)s %(message)s") + +# ---------- 工具 ---------- +def load_email_config(key: str) -> dict | None: + path = os.path.join(CONFIG_DIR, f"{key}.json") + if not os.path.exists(path): + logging.error(f"配置文件不存在: {path}") + return None + with open(path, "r", encoding="utf-8") as f: + return json.load(f) + + +def detect_smtp_port(host: str): + """ + 先尝试 465 (SSL),失败再试 587 (STARTTLS)。 + 返回 smtplib.SMTP or smtplib.SMTP_SSL 实例,失败返回 None + """ + try: + context = ssl.create_default_context() + server = smtplib.SMTP_SSL(host, 465, context=context, timeout=20) + server.noop() + logging.info("使用 465/SSL 连接 SMTP") + return server + except Exception: + pass # 再试 587 + + try: + server = smtplib.SMTP(host, 587, timeout=20) + server.starttls(context=ssl.create_default_context()) + server.noop() + logging.info("使用 587/STARTTLS 连接 SMTP") + return server + except Exception as e: + logging.error(f"SMTP 端口探测失败: {e}") + return None + + +def xor_encode(src: str, key: int) -> str: + """把邮箱做异或后返回 16 进制字符串""" + return "".join(f"{ord(c) ^ key:02x}" for c in src) + + +# ---------- send_email 主函数 ---------- +def send_email(key, to_address, subject, body_template, recipient_name, + request_receipt=False, request_read_receipt=False, + language=None, timestamp=None): + if language is None: + language = "english" + if timestamp is None: + timestamp = datetime.now().isoformat() + + result = {"status": "", "message": ""} + config = load_email_config(key) + if not config: + result.update(status="fail", message="未能加载配置") + return result + + server = None + try: + # 构造 MIME 邮件 + msg = MIMEMultipart() + if language == "chinese": + msg['From'] = formataddr(("深圳果冻人工智能", config['email user'].strip())) + else: + msg['From'] = formataddr(("Jelly Drops LLC Sales Team", config['email user'].strip())) + msg['To'] = to_address + msg['Subject'] = subject + + if not recipient_name: + recipient_name = "there" + + encoded_email = xor_encode(to_address, 0xAE) + body = ( + body_template + .replace("{{recipient_name}}", recipient_name) + .replace("{{recipient_email}}", to_address) + .replace("{{encoded_recipient_email}}", encoded_email) + .replace("{{timestamp}}", timestamp) + ) + msg.attach(MIMEText(body, 'html')) + + if request_receipt: + msg.add_header('Return-Receipt-To', config['email user'].strip()) + if request_read_receipt: + msg.add_header('Disposition-Notification-To', config['email user'].strip()) + + # --- 发送尝试 --- + MAX_RETRIES, RETRY_DELAY = 6, 600 # 最多 1 小时 + for attempt in range(MAX_RETRIES): + server = detect_smtp_port(config['outgoing server'].strip()) + if server is None: + result.update(status="fail", message="无法连接到任何 SMTP 端口") + return result + + try: + server.login(config['email user'].strip(), config['email password'].strip()) + resp = server.sendmail(config['email user'].strip(), to_address, msg.as_string()) + if resp == {}: + result.update(status="success", message=f"邮件发送成功: {to_address}") + return result + else: + result.update(status="fail", message=f"邮件发送未知结果: {resp}") + return result + except (smtplib.SMTPException, ssl.SSLError) as e: + if server: + server.quit() + if attempt < MAX_RETRIES - 1: + wait = random.uniform(240, RETRY_DELAY) + logging.warning(f"SMTP 临时错误,第 {attempt+1}/{MAX_RETRIES} 次重试,等待 {wait:.0f}s: {e}") + import time; time.sleep(wait) + continue + result.update(status="fail", message=f"SMTP 错误: {e}") + return result + except Exception as e: + result.update(status="fail", message=f"发送异常: {e}") + return result + finally: + if server: + server.quit() +