# -*- coding: utf-8 -*- """ email_ui.py — 一键群发工具 (强化校验版) -------------------------------------- 运行: python email_ui.py """ import os, logging, re, random from datetime import datetime import gradio as gr from send_email_module import send_email EMAIL_CONFIG_KEY = "default" # 对应 config/default.json LOG_FILE = "email_send_log.txt" logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") # 简易邮箱正则 # -------------------- 工具 -------------------- def _load_template(file_obj): if file_obj is None: return "" with open(file_obj.name, "r", encoding="utf-8") as f: return f.read() def _error(msg): """生成统一的红字提示 + 让 state 为空 -> Send 仍锁定""" return gr.update(value=f"

❌ {msg}

"), None # -------------------- 预览 -------------------- def preview_email(to_addrs, subject, html_file, language, want_receipt, want_read_receipt): # 0. 模板存在 tpl = _load_template(html_file) if not tpl: return _error("请先上传 HTML 模板") # 1. 收件人非空且格式合法 raw_addrs = [a.strip() for a in to_addrs.split(",") if a.strip()] if not raw_addrs: return _error("收件人邮箱不能为空") if invalid := [a for a in raw_addrs if not EMAIL_RE.match(a)]: return _error(f"邮箱格式非法: {', '.join(invalid)}") # 2. 主题非空 if not subject.strip(): return _error("主题不能为空") # 3. 模板至少包含 {{recipient_name}} if "{{recipient_name}}" not in tpl: return _error("模板缺少 {{recipient_name}} 占位符") # ---- 全部通过,生成预览 ---- preview = (tpl.replace("{{recipient_name}}", "there") .replace("{{recipient_email}}", "example@example.com") .replace("{{encoded_recipient_email}}", "ENCODED_EXAMPLE") .replace("{{timestamp}}", datetime.now().isoformat())) state = { "addresses": raw_addrs, "subject": subject.strip(), "template": tpl, "language": language, "want_receipt": want_receipt, "want_read_receipt": want_read_receipt, } return gr.update(value=preview), state # -------------------- 发送 -------------------- def send_emails(state): if not state: return "⚠️ 请先 Preview 成功后再发送。" out_lines = [] for addr in state["addresses"]: res = send_email( EMAIL_CONFIG_KEY, addr, state["subject"], state["template"], "", request_receipt=state["want_receipt"], request_read_receipt=state["want_read_receipt"], language=state["language"], ) ts = datetime.now().isoformat(timespec="seconds") line = f"{ts}\t{addr}\t{res['status']}\t{res['message']}" out_lines.append(line) with open(LOG_FILE, "a", encoding="utf-8") as f: f.write(line + "\n") return "
".join(out_lines).replace("\n", "
") # -------------------- Gradio UI -------------------- with gr.Blocks(css=".gr-button {min-width:8rem}") as demo: gr.Markdown("## ✉️ 简易 EDM 群发工具") addrs = gr.Textbox(label="收件人(逗号分隔)", placeholder="foo@bar.com, alice@x.com") subj = gr.Textbox(label="主题", placeholder="Subject here") tpl = gr.File(label="上传 HTML 模板") lang = gr.Radio(["english", "chinese"], value="chinese", label="语言") with gr.Row(): rcpt = gr.Checkbox(label="投递回执") read_r = gr.Checkbox(label="已读回执") preview_btn = gr.Button("Preview ⬇️") send_btn = gr.Button("Send (确认发送) ✈️", interactive=False) out_html = gr.HTML() state_box = gr.State() preview_btn.click( preview_email, [addrs, subj, tpl, lang, rcpt, read_r], [out_html, state_box] ).then( lambda s: gr.update(interactive=s is not None), inputs=state_box, outputs=send_btn ) send_btn.click(send_emails, state_box, out_html) if __name__ == "__main__": demo.queue().launch(server_port=7880, show_api=False)