This commit is contained in:
hailin 2025-07-21 18:58:35 +08:00
commit 8a4e59c556
4 changed files with 235 additions and 0 deletions

6
config/default.json Normal file
View File

@ -0,0 +1,6 @@
{
"outgoing server": "smtp.example.com",
"email user": "no-reply@example.com",
"email password": "YOUR_APP_PASSWORD"
}

90
email_ui.py Normal file
View File

@ -0,0 +1,90 @@
# -*- coding: utf-8 -*-
"""
email_ui.py 一键群发工具 (独立可跑版)
--------------------------------------------------
运行: python email_ui.py
"""
import os, logging, random
from datetime import datetime
import gradio as gr
from send_email_module import send_email # 同目录导入
EMAIL_CONFIG_KEY = "default" # 对应 config/default.json
LOG_FILE = "email_send_log.txt"
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
# -------------------- 内部函数 --------------------
def _load_template(file_obj):
if file_obj is None:
return ""
with open(file_obj.name, "r", encoding="utf-8") as f:
return f.read()
def preview_email(to_addrs, subject, html_file, language,
want_receipt, want_read_receipt):
tpl = _load_template(html_file)
if not tpl:
return gr.HTML.update("<p style='color:red'>❌ 请先上传 HTML 模板</p>"), None
preview = (tpl.replace("{{recipient_name}}", "there")
.replace("{{recipient_email}}", "example@example.com")
.replace("{{encoded_recipient_email}}", "ENCODED_EXAMPLE")
.replace("{{timestamp}}", datetime.now().isoformat()))
state = {
"addresses": [a.strip() for a in to_addrs.split(",") if a.strip()],
"subject": subject,
"template": tpl,
"language": language,
"want_receipt": want_receipt,
"want_read_receipt": want_read_receipt,
}
return gr.HTML.update(preview), state
def send_emails(state):
if not state:
return "⚠️ 请先 Preview。"
out_lines = []
for addr in state["addresses"]:
res = send_email(
EMAIL_CONFIG_KEY, addr, state["subject"], state["template"], "",
request_receipt=state["want_receipt"],
request_read_receipt=state["want_read_receipt"],
language=state["language"],
)
ts = datetime.now().isoformat(timespec="seconds")
line = f"{ts}\t{addr}\t{res['status']}\t{res['message']}"
out_lines.append(line)
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(line + "\n")
return "\n".join(out_lines)
# -------------------- Gradio UI --------------------
with gr.Blocks(css=".gr-button {min-width:6rem}") as demo:
gr.Markdown("## ✉️ 简易 EDM 群发工具")
addrs = gr.Textbox(label="收件人(逗号分隔)", placeholder="foo@bar.com, alice@x.com")
subj = gr.Textbox(label="主题", placeholder="Subject here")
tpl = gr.File(label="上传 HTML 模板")
lang = gr.Radio(["english", "chinese"], value="english", label="语言")
with gr.Row():
rcpt = gr.Checkbox(label="投递回执")
read_r = gr.Checkbox(label="已读回执")
preview_btn = gr.Button("Preview ⬇️")
send_btn = gr.Button("Send ✈️", interactive=False)
out_html = gr.HTML()
state_box = gr.State()
preview_btn.click(preview_email, [addrs, subj, tpl, lang, rcpt, read_r],
[out_html, state_box]) \
.then(lambda s: gr.Button.update(interactive=s is not None),
inputs=state_box, outputs=send_btn)
send_btn.click(send_emails, state_box, out_html)
if __name__ == "__main__":
demo.queue().launch()

2
requirements.txt Normal file
View File

@ -0,0 +1,2 @@
gradio>=4.0,<5

137
send_email_module.py Normal file
View File

@ -0,0 +1,137 @@
# -*- coding: utf-8 -*-
"""
send_email_module.py
--------------------------------------------------
补全了你原来的 send_email 及依赖
load_email_config() config/<key>.json
detect_smtp_port() 自动尝试 465 / 587
xor_encode() 简单异或编码
"""
import ssl, smtplib, json, logging, os, random
from datetime import datetime
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.utils import formataddr
CONFIG_DIR = os.path.join(os.path.dirname(__file__), "config")
logging.basicConfig(level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s")
# ---------- 工具 ----------
def load_email_config(key: str) -> dict | None:
path = os.path.join(CONFIG_DIR, f"{key}.json")
if not os.path.exists(path):
logging.error(f"配置文件不存在: {path}")
return None
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def detect_smtp_port(host: str):
"""
先尝试 465 (SSL)失败再试 587 (STARTTLS)
返回 smtplib.SMTP or smtplib.SMTP_SSL 实例失败返回 None
"""
try:
context = ssl.create_default_context()
server = smtplib.SMTP_SSL(host, 465, context=context, timeout=20)
server.noop()
logging.info("使用 465/SSL 连接 SMTP")
return server
except Exception:
pass # 再试 587
try:
server = smtplib.SMTP(host, 587, timeout=20)
server.starttls(context=ssl.create_default_context())
server.noop()
logging.info("使用 587/STARTTLS 连接 SMTP")
return server
except Exception as e:
logging.error(f"SMTP 端口探测失败: {e}")
return None
def xor_encode(src: str, key: int) -> str:
"""把邮箱做异或后返回 16 进制字符串"""
return "".join(f"{ord(c) ^ key:02x}" for c in src)
# ---------- send_email 主函数 ----------
def send_email(key, to_address, subject, body_template, recipient_name,
request_receipt=False, request_read_receipt=False,
language=None, timestamp=None):
if language is None:
language = "english"
if timestamp is None:
timestamp = datetime.now().isoformat()
result = {"status": "", "message": ""}
config = load_email_config(key)
if not config:
result.update(status="fail", message="未能加载配置")
return result
server = None
try:
# 构造 MIME 邮件
msg = MIMEMultipart()
if language == "chinese":
msg['From'] = formataddr(("深圳果冻人工智能", config['email user'].strip()))
else:
msg['From'] = formataddr(("Jelly Drops LLC Sales Team", config['email user'].strip()))
msg['To'] = to_address
msg['Subject'] = subject
if not recipient_name:
recipient_name = "there"
encoded_email = xor_encode(to_address, 0xAE)
body = (
body_template
.replace("{{recipient_name}}", recipient_name)
.replace("{{recipient_email}}", to_address)
.replace("{{encoded_recipient_email}}", encoded_email)
.replace("{{timestamp}}", timestamp)
)
msg.attach(MIMEText(body, 'html'))
if request_receipt:
msg.add_header('Return-Receipt-To', config['email user'].strip())
if request_read_receipt:
msg.add_header('Disposition-Notification-To', config['email user'].strip())
# --- 发送尝试 ---
MAX_RETRIES, RETRY_DELAY = 6, 600 # 最多 1 小时
for attempt in range(MAX_RETRIES):
server = detect_smtp_port(config['outgoing server'].strip())
if server is None:
result.update(status="fail", message="无法连接到任何 SMTP 端口")
return result
try:
server.login(config['email user'].strip(), config['email password'].strip())
resp = server.sendmail(config['email user'].strip(), to_address, msg.as_string())
if resp == {}:
result.update(status="success", message=f"邮件发送成功: {to_address}")
return result
else:
result.update(status="fail", message=f"邮件发送未知结果: {resp}")
return result
except (smtplib.SMTPException, ssl.SSLError) as e:
if server:
server.quit()
if attempt < MAX_RETRIES - 1:
wait = random.uniform(240, RETRY_DELAY)
logging.warning(f"SMTP 临时错误,第 {attempt+1}/{MAX_RETRIES} 次重试,等待 {wait:.0f}s: {e}")
import time; time.sleep(wait)
continue
result.update(status="fail", message=f"SMTP 错误: {e}")
return result
except Exception as e:
result.update(status="fail", message=f"发送异常: {e}")
return result
finally:
if server:
server.quit()