sntemail/send_email_module.py

158 lines
5.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
send_email_module.py
--------------------------------------------------
补全了你原来的 send_email 及依赖:
• load_email_config() 读 config/<key>.json
• detect_smtp_port() 自动尝试 465 / 587
• xor_encode() 简单异或编码
"""
import ssl, smtplib, json, logging, os, random
from datetime import datetime
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.utils import formataddr
CONFIG_DIR = os.path.join(os.path.dirname(__file__), "config")
logging.basicConfig(level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s")
# ---------- 工具 ----------
def load_email_config(key: str) -> dict | None:
path = os.path.join(CONFIG_DIR, f"{key}.json")
if not os.path.exists(path):
logging.error(f"配置文件不存在: {path}")
return None
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def detect_smtp_port(host: str):
"""
先尝试 465 (SSL),失败再试 587 (STARTTLS)。
返回 smtplib.SMTP or smtplib.SMTP_SSL 实例,失败返回 None
"""
try:
context = ssl.create_default_context()
server = smtplib.SMTP_SSL(host, 465, context=context, timeout=20)
server.noop()
logging.info("使用 465/SSL 连接 SMTP")
return server
except Exception:
pass # 再试 587
try:
server = smtplib.SMTP(host, 587, timeout=20)
server.starttls(context=ssl.create_default_context())
server.noop()
logging.info("使用 587/STARTTLS 连接 SMTP")
return server
except Exception as e:
logging.error(f"SMTP 端口探测失败: {e}")
return None
def xor_encode(src: str, key: int) -> str:
"""把邮箱做异或后返回 16 进制字符串"""
return "".join(f"{ord(c) ^ key:02x}" for c in src)
# ---------- 读取 port 并建立连接 ---------- ✨ new
def connect_smtp(host: str, config: dict):
"""
优先按 config["smtp_port"] 建联:
• 465 ➜ 直接 SSL
• 其它端口 ➜ 先连再 starttls()
若没写端口 ➜ 回退 detect_smtp_port() 的 465→587 自动探测
"""
fixed_port = int(config.get("smtp_port", 0)) # 没写就 0
if fixed_port:
if fixed_port == 465:
return smtplib.SMTP_SSL(host, 465, timeout=20)
s = smtplib.SMTP(host, fixed_port, timeout=20)
s.starttls(context=ssl.create_default_context())
return s
# 没配端口 → 用原自动探测
return detect_smtp_port(host)
# ---------- send_email 主函数 ----------
def send_email(key, to_address, subject, body_template, recipient_name,
request_receipt=False, request_read_receipt=False,
language=None, timestamp=None):
if language is None:
language = "english"
if timestamp is None:
timestamp = datetime.now().isoformat()
result = {"status": "", "message": ""}
config = load_email_config(key)
if not config:
result.update(status="fail", message="未能加载配置")
return result
server = None
try:
# 构造 MIME 邮件
msg = MIMEMultipart()
if language == "chinese":
msg['From'] = formataddr(("深圳果冻人工智能", config['email user'].strip()))
else:
msg['From'] = formataddr(("Jelly Drops LLC Sales Team", config['email user'].strip()))
msg['To'] = to_address
msg['Subject'] = subject
if not recipient_name:
recipient_name = "老板" if language == "chinese" else "there"
encoded_email = xor_encode(to_address, 0xAE)
body = (
body_template
.replace("{{recipient_name}}", recipient_name)
.replace("{{recipient_email}}", to_address)
.replace("{{encoded_recipient_email}}", encoded_email)
.replace("{{timestamp}}", timestamp)
)
msg.attach(MIMEText(body, 'html'))
if request_receipt:
msg.add_header('Return-Receipt-To', config['email user'].strip())
if request_read_receipt:
msg.add_header('Disposition-Notification-To', config['email user'].strip())
# --- 发送尝试 ---
MAX_RETRIES, RETRY_DELAY = 6, 600 # 最多 1 小时
host = config['outgoing server'].strip() # ✨ new放在循环前也行
for attempt in range(MAX_RETRIES):
server = connect_smtp(host, config)
if server is None:
result.update(status="fail", message="无法连接到任何 SMTP 端口")
return result
try:
server.login(config['email user'].strip(), config['email password'].strip())
resp = server.sendmail(config['email user'].strip(), to_address, msg.as_string())
if resp == {}:
result.update(status="success", message=f"邮件发送成功: {to_address}")
return result
else:
result.update(status="fail", message=f"邮件发送未知结果: {resp}")
return result
except (smtplib.SMTPException, ssl.SSLError) as e:
if server:
server.quit()
if attempt < MAX_RETRIES - 1:
wait = random.uniform(240, RETRY_DELAY)
logging.warning(f"SMTP 临时错误,第 {attempt+1}/{MAX_RETRIES} 次重试,等待 {wait:.0f}s: {e}")
import time; time.sleep(wait)
continue
result.update(status="fail", message=f"SMTP 错误: {e}")
return result
except Exception as e:
result.update(status="fail", message=f"发送异常: {e}")
return result
finally:
if server:
server.quit()