# -*- coding: utf-8 -*- """ send_email_module.py -------------------------------------------------- 补全了你原来的 send_email 及依赖: • load_email_config() – 读 config/.json • detect_smtp_port() – 自动尝试 465 / 587 • xor_encode() – 简单异或编码 """ import ssl, smtplib, json, logging, os, random from datetime import datetime from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.utils import formataddr CONFIG_DIR = os.path.join(os.path.dirname(__file__), "config") logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") # ---------- 工具 ---------- def load_email_config(key: str) -> dict | None: path = os.path.join(CONFIG_DIR, f"{key}.json") if not os.path.exists(path): logging.error(f"配置文件不存在: {path}") return None with open(path, "r", encoding="utf-8") as f: return json.load(f) def detect_smtp_port(host: str): """ 先尝试 465 (SSL),失败再试 587 (STARTTLS)。 返回 smtplib.SMTP or smtplib.SMTP_SSL 实例,失败返回 None """ try: context = ssl.create_default_context() server = smtplib.SMTP_SSL(host, 465, context=context, timeout=20) server.noop() logging.info("使用 465/SSL 连接 SMTP") return server except Exception: pass # 再试 587 try: server = smtplib.SMTP(host, 587, timeout=20) server.starttls(context=ssl.create_default_context()) server.noop() logging.info("使用 587/STARTTLS 连接 SMTP") return server except Exception as e: logging.error(f"SMTP 端口探测失败: {e}") return None def xor_encode(src: str, key: int) -> str: """把邮箱做异或后返回 16 进制字符串""" return "".join(f"{ord(c) ^ key:02x}" for c in src) # ---------- send_email 主函数 ---------- def send_email(key, to_address, subject, body_template, recipient_name, request_receipt=False, request_read_receipt=False, language=None, timestamp=None): if language is None: language = "english" if timestamp is None: timestamp = datetime.now().isoformat() result = {"status": "", "message": ""} config = load_email_config(key) if not config: result.update(status="fail", message="未能加载配置") return result server = None try: # 构造 MIME 邮件 msg = MIMEMultipart() if language == "chinese": msg['From'] = formataddr(("深圳果冻人工智能", config['email user'].strip())) else: msg['From'] = formataddr(("Jelly Drops LLC Sales Team", config['email user'].strip())) msg['To'] = to_address msg['Subject'] = subject if not recipient_name: recipient_name = "老板" if language == "chinese" else "there" encoded_email = xor_encode(to_address, 0xAE) body = ( body_template .replace("{{recipient_name}}", recipient_name) .replace("{{recipient_email}}", to_address) .replace("{{encoded_recipient_email}}", encoded_email) .replace("{{timestamp}}", timestamp) ) msg.attach(MIMEText(body, 'html')) if request_receipt: msg.add_header('Return-Receipt-To', config['email user'].strip()) if request_read_receipt: msg.add_header('Disposition-Notification-To', config['email user'].strip()) # --- 发送尝试 --- MAX_RETRIES, RETRY_DELAY = 6, 600 # 最多 1 小时 for attempt in range(MAX_RETRIES): server = detect_smtp_port(config['outgoing server'].strip()) if server is None: result.update(status="fail", message="无法连接到任何 SMTP 端口") return result try: server.login(config['email user'].strip(), config['email password'].strip()) resp = server.sendmail(config['email user'].strip(), to_address, msg.as_string()) if resp == {}: result.update(status="success", message=f"邮件发送成功: {to_address}") return result else: result.update(status="fail", message=f"邮件发送未知结果: {resp}") return result except (smtplib.SMTPException, ssl.SSLError) as e: if server: server.quit() if attempt < MAX_RETRIES - 1: wait = random.uniform(240, RETRY_DELAY) logging.warning(f"SMTP 临时错误,第 {attempt+1}/{MAX_RETRIES} 次重试,等待 {wait:.0f}s: {e}") import time; time.sleep(wait) continue result.update(status="fail", message=f"SMTP 错误: {e}") return result except Exception as e: result.update(status="fail", message=f"发送异常: {e}") return result finally: if server: server.quit()