127 lines
4.4 KiB
Python
127 lines
4.4 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
email_ui.py — 一键群发工具 (强化校验版,支持自定义默认称呼)
|
||
------------------------------------------------------------
|
||
运行: python email_ui.py
|
||
"""
|
||
import os, logging, re
|
||
from datetime import datetime
|
||
import gradio as gr
|
||
from send_email_module import send_email
|
||
|
||
EMAIL_CONFIG_KEY = "default"
|
||
LOG_FILE = "email_send_log.txt"
|
||
logging.basicConfig(level=logging.INFO,
|
||
format="%(asctime)s %(levelname)s %(message)s")
|
||
|
||
EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") # 简易邮箱正则
|
||
|
||
|
||
# ---------- 工具 ----------
|
||
def _load_template(file_obj):
|
||
if file_obj is None:
|
||
return ""
|
||
with open(file_obj.name, "r", encoding="utf-8") as f:
|
||
return f.read()
|
||
|
||
|
||
def _error(msg): # 统一红字提示
|
||
return gr.update(value=f"<p style='color:red'>❌ {msg}</p>"), None
|
||
|
||
|
||
# ---------- 预览 ----------
|
||
def preview_email(to_addrs, subject, html_file, language,
|
||
want_receipt, want_read_receipt, default_name): # ✨ new
|
||
tpl = _load_template(html_file)
|
||
if not tpl:
|
||
return _error("请先上传 HTML 模板")
|
||
|
||
raw_addrs = [a.strip() for a in to_addrs.split(",") if a.strip()]
|
||
if not raw_addrs:
|
||
return _error("收件人邮箱不能为空")
|
||
if invalid := [a for a in raw_addrs if not EMAIL_RE.match(a)]:
|
||
return _error(f"邮箱格式非法: {', '.join(invalid)}")
|
||
|
||
if not subject.strip():
|
||
return _error("主题不能为空")
|
||
|
||
if "{{recipient_name}}" not in tpl:
|
||
return _error("模板缺少 {{recipient_name}} 占位符")
|
||
|
||
# 生成预览,用用户填写的 default_name(留空时仍展示 there)
|
||
preview = (tpl.replace("{{recipient_name}}", default_name or ("老板" if language == "chinese" else "there"))
|
||
.replace("{{recipient_email}}", "example@example.com")
|
||
.replace("{{encoded_recipient_email}}", "ENCODED_EXAMPLE")
|
||
.replace("{{timestamp}}", datetime.now().isoformat()))
|
||
|
||
state = {
|
||
"addresses": raw_addrs,
|
||
"subject": subject.strip(),
|
||
"template": tpl,
|
||
"language": language,
|
||
"want_receipt": want_receipt,
|
||
"want_read_receipt": want_read_receipt,
|
||
"default_name": default_name.strip(), # ✨ new
|
||
}
|
||
return gr.update(value=preview), state
|
||
|
||
|
||
# ---------- 发送 ----------
|
||
def send_emails(state):
|
||
if not state:
|
||
return "⚠️ 请先 Preview 成功后再发送。"
|
||
|
||
out_lines = []
|
||
for addr in state["addresses"]:
|
||
res = send_email(
|
||
EMAIL_CONFIG_KEY,
|
||
addr,
|
||
state["subject"],
|
||
state["template"],
|
||
state["default_name"], # ✨ new (可为空串)
|
||
request_receipt=state["want_receipt"],
|
||
request_read_receipt=state["want_read_receipt"],
|
||
language=state["language"],
|
||
)
|
||
ts = datetime.now().isoformat(timespec="seconds")
|
||
line = f"{ts}\t{addr}\t{res['status']}\t{res['message']}"
|
||
out_lines.append(line)
|
||
with open(LOG_FILE, "a", encoding="utf-8") as f:
|
||
f.write(line + "\n")
|
||
return "<br>".join(out_lines).replace("\n", "<br>")
|
||
|
||
|
||
# ---------- Gradio UI ----------
|
||
with gr.Blocks(css=".gr-button {min-width:8rem}") as demo:
|
||
gr.Markdown("## ✉️ 简易 EDM 群发工具")
|
||
|
||
addrs = gr.Textbox(label="收件人(逗号分隔)", placeholder="foo@bar.com, alice@x.com")
|
||
subj = gr.Textbox(label="主题", placeholder="Subject here")
|
||
tpl = gr.File(label="上传 HTML 模板")
|
||
lang = gr.Radio(["english", "chinese"], value="chinese", label="语言")
|
||
default_name = gr.Textbox( # ✨ new
|
||
label="默认称呼(留空=老板/there)", value=""
|
||
)
|
||
|
||
with gr.Row():
|
||
rcpt = gr.Checkbox(label="投递回执")
|
||
read_r = gr.Checkbox(label="已读回执")
|
||
|
||
preview_btn = gr.Button("Preview ⬇️")
|
||
send_btn = gr.Button("Send (确认发送) ✈️", interactive=False)
|
||
out_html = gr.HTML()
|
||
state_box = gr.State()
|
||
|
||
preview_btn.click(
|
||
preview_email,
|
||
[addrs, subj, tpl, lang, rcpt, read_r, default_name], # ✨ new
|
||
[out_html, state_box]
|
||
).then(
|
||
lambda s: gr.update(interactive=s is not None),
|
||
inputs=state_box, outputs=send_btn
|
||
)
|
||
send_btn.click(send_emails, state_box, out_html)
|
||
|
||
if __name__ == "__main__":
|
||
demo.queue().launch(server_port=7880, show_api=False)
|