gcx/scripts/namecheap_batch.py

402 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Namecheap DNS Batch Provisioner
Reads a JSON plan file and replaces all DNS records in one atomic operation.
CRITICAL: Namecheap setHosts OVERWRITES all records. This script:
1. Backs up current records before any changes
2. Validates the plan for common errors
3. Performs a dry-run by default
4. Requires explicit --apply flag to make changes
Usage:
python namecheap_batch.py --domain gogenex.com --plan plan.json # dry-run (default)
python namecheap_batch.py --domain gogenex.com --plan plan.json --apply # apply changes
python namecheap_batch.py --domain gogenex.com --plan plan.json --merge # merge with existing
python namecheap_batch.py --domain gogenex.com --diff plan.json # diff plan vs current
python namecheap_batch.py --domain gogenex.com --export current.json # export current state
Plan JSON format: Same as aliyun-dns skill (see references/plan-schema.md)
Environment Variables:
NAMECHEAP_API_USER - Namecheap username
NAMECHEAP_API_KEY - Namecheap API key
NAMECHEAP_CLIENT_IP - Whitelisted IPv4 address
"""
import argparse
import ipaddress
import json
import os
import sys
import time
import urllib.parse
import urllib.request
import xml.etree.ElementTree as ET
from datetime import datetime
# ──────────────────────────────────────────────
# Configuration & Helpers
# ──────────────────────────────────────────────
API_URL_PROD = "https://api.namecheap.com/xml.response"
API_URL_SANDBOX = "https://api.sandbox.namecheap.com/xml.response"
PRIVATE_RANGES = [
ipaddress.ip_network("10.0.0.0/8"),
ipaddress.ip_network("172.16.0.0/12"),
ipaddress.ip_network("192.168.0.0/16"),
ipaddress.ip_network("100.64.0.0/10"),
]
# Namecheap supported record types via API
SUPPORTED_TYPES = {"A", "AAAA", "CNAME", "MX", "MXE", "TXT", "URL", "URL301", "FRAME", "NS"}
def is_private_ip(ip_str: str) -> bool:
try:
ip = ipaddress.ip_address(ip_str)
return any(ip in net for net in PRIVATE_RANGES)
except ValueError:
return False
def get_config(sandbox: bool = False):
api_user = os.environ.get("NAMECHEAP_API_USER")
api_key = os.environ.get("NAMECHEAP_API_KEY")
client_ip = os.environ.get("NAMECHEAP_CLIENT_IP")
if not all([api_user, api_key, client_ip]):
print("ERROR: Set NAMECHEAP_API_USER, NAMECHEAP_API_KEY, and NAMECHEAP_CLIENT_IP env vars.")
sys.exit(1)
return {
"api_user": api_user,
"api_key": api_key,
"client_ip": client_ip,
"base_url": API_URL_SANDBOX if sandbox else API_URL_PROD,
}
def split_domain(domain: str) -> tuple:
parts = domain.rsplit(".", 1)
if len(parts) != 2:
print(f"ERROR: Cannot split domain '{domain}'.")
sys.exit(1)
return parts[0], parts[1]
# ──────────────────────────────────────────────
# API Calls
# ──────────────────────────────────────────────
def api_call(config: dict, command: str, extra_params: dict = None) -> ET.Element:
params = {
"ApiUser": config["api_user"],
"ApiKey": config["api_key"],
"UserName": config["api_user"],
"ClientIp": config["client_ip"],
"Command": command,
}
if extra_params:
params.update(extra_params)
url = f"{config['base_url']}?{urllib.parse.urlencode(params)}"
try:
req = urllib.request.Request(url)
with urllib.request.urlopen(req, timeout=30) as response:
xml_data = response.read().decode("utf-8")
except Exception as e:
print(f"ERROR: API call failed: {e}")
sys.exit(1)
root = ET.fromstring(xml_data)
status = root.attrib.get("Status", "")
if status == "ERROR":
for elem in root.iter():
if "Error" in elem.tag:
print(f"ERROR [{elem.attrib.get('Number', '?')}]: {elem.text}")
sys.exit(1)
return root
def get_hosts(config: dict, domain: str) -> list:
sld, tld = split_domain(domain)
root = api_call(config, "namecheap.domains.dns.getHosts", {"SLD": sld, "TLD": tld})
records = []
for elem in root.iter():
if "Host" in elem.tag and elem.attrib.get("Name"):
records.append({
"Name": elem.attrib.get("Name", ""),
"Type": elem.attrib.get("Type", ""),
"Address": elem.attrib.get("Address", ""),
"MXPref": elem.attrib.get("MXPref", "10"),
"TTL": elem.attrib.get("TTL", "1800"),
})
return records
def set_hosts(config: dict, domain: str, records: list) -> bool:
sld, tld = split_domain(domain)
params = {"SLD": sld, "TLD": tld}
for i, rec in enumerate(records, 1):
params[f"HostName{i}"] = rec["Name"]
params[f"RecordType{i}"] = rec["Type"]
params[f"Address{i}"] = rec["Address"]
params[f"TTL{i}"] = rec.get("TTL", "1800")
if rec["Type"] in ("MX", "MXE"):
params[f"MXPref{i}"] = rec.get("MXPref", "10")
api_call(config, "namecheap.domains.dns.setHosts", params)
return True
# ──────────────────────────────────────────────
# Plan Operations
# ──────────────────────────────────────────────
def load_plan(plan_file: str) -> list:
"""Load and normalize a plan JSON to Namecheap host format."""
with open(plan_file, "r", encoding="utf-8") as f:
raw_plan = json.load(f)
records = []
for entry in raw_plan:
rr = entry.get("rr", entry.get("Name", ""))
record_type = entry.get("type", entry.get("Type", ""))
value = entry.get("value", entry.get("Address", ""))
ttl = str(entry.get("ttl", entry.get("TTL", 1800)))
records.append({
"Name": rr,
"Type": record_type,
"Address": value,
"TTL": ttl,
"MXPref": str(entry.get("MXPref", "10")),
"_note": entry.get("note", ""),
"_scope": entry.get("scope", "public"),
})
return records
def validate_plan(records: list, domain: str) -> list:
"""Validate plan entries and return warnings."""
warnings = []
seen = set()
for i, rec in enumerate(records):
name = rec.get("Name", "")
rtype = rec.get("Type", "")
value = rec.get("Address", "")
scope = rec.get("_scope", "public")
# Required fields
if not name or not rtype or not value:
warnings.append(f"❌ Entry {i}: missing Name, Type, or Address")
continue
# Placeholder check
if "<" in value or ">" in value:
warnings.append(f"❌ Entry {i} ({name}): value contains placeholder: {value}")
# Unsupported type
if rtype.upper() == "SRV":
warnings.append(f"❌ Entry {i} ({name}): SRV records not supported by Namecheap API")
if rtype not in SUPPORTED_TYPES:
warnings.append(f"⚠️ Entry {i} ({name}): record type '{rtype}' may not be supported")
# Duplicate
key = (name, rtype)
if key in seen and rtype not in ("MX", "TXT", "NS"):
warnings.append(f"⚠️ Entry {i}: duplicate {name} {rtype}")
seen.add(key)
# Scope vs IP check
if scope == "private" and rtype == "A" and not is_private_ip(value):
warnings.append(f"🔒 Entry {i} ({name}): marked private but has public IP {value}")
return warnings
def backup_records(records: list, domain: str) -> str:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = f"backup_{domain}_{timestamp}.json"
with open(backup_file, "w", encoding="utf-8") as f:
json.dump(records, f, indent=2, ensure_ascii=False)
return backup_file
def apply_plan(config: dict, domain: str, plan_records: list, merge: bool = False, dry_run: bool = True):
"""Apply a DNS plan to Namecheap."""
mode = "DRY RUN" if dry_run else "APPLY"
strategy = "MERGE" if merge else "REPLACE"
print(f"\n{'🔍' if dry_run else '🚀'} {mode} ({strategy}) for {domain}")
print(f" Records in plan: {len(plan_records)}\n")
# Validate
warnings = validate_plan(plan_records, domain)
if warnings:
print("⚠️ Validation warnings:")
for w in warnings:
print(f" {w}")
print()
if any(w.startswith("") for w in warnings):
print("❌ Fix errors before applying.")
return
# Fetch current records
print("📡 Fetching current records...")
current = get_hosts(config, domain)
print(f" Found {len(current)} existing records\n")
# Build final record set
if merge:
# Merge: keep existing records not in plan, add/update from plan
plan_keys = {(r["Name"], r["Type"]) for r in plan_records}
# Keep records not covered by plan
final = [r for r in current if (r["Name"], r["Type"]) not in plan_keys]
# Add all plan records
final.extend([{
"Name": r["Name"],
"Type": r["Type"],
"Address": r["Address"],
"TTL": r.get("TTL", "1800"),
"MXPref": r.get("MXPref", "10"),
} for r in plan_records])
print(f" 📋 Merge result: {len(final)} total records")
print(f" Kept from existing: {len(final) - len(plan_records)}")
print(f" From plan: {len(plan_records)}")
else:
# Replace: plan becomes the entire record set
final = [{
"Name": r["Name"],
"Type": r["Type"],
"Address": r["Address"],
"TTL": r.get("TTL", "1800"),
"MXPref": r.get("MXPref", "10"),
} for r in plan_records]
print(f" ⚠️ REPLACE mode: {len(current)} existing records will be replaced with {len(final)} plan records")
# Show changes
print(f"\n Final record set:")
for r in final:
fqdn = f"{r['Name']}.{domain}" if r["Name"] != "@" else domain
print(f" {fqdn:<35} {r['Type']:<8} {r['Address']}")
if dry_run:
print(f"\n💡 Run with --apply to execute these changes.")
return
# Backup before write
backup_file = backup_records(current, domain)
print(f"\n 💾 Backup saved: {backup_file}")
# Write
print(f" 📤 Writing {len(final)} records...")
set_hosts(config, domain, final)
print(f" ✅ Done! All records updated successfully.")
print(f" 💡 To rollback, use: python namecheap_batch.py --domain {domain} --plan {backup_file} --apply")
def diff_plan(config: dict, domain: str, plan_file: str):
"""Show differences between plan and current state."""
plan_records = load_plan(plan_file)
current = get_hosts(config, domain)
print(f"\n🔍 Diff: {plan_file} vs live records for {domain}\n")
current_map = {}
for r in current:
key = (r["Name"], r["Type"])
current_map[key] = r
plan_map = {}
for r in plan_records:
key = (r["Name"], r["Type"])
plan_map[key] = r
# Records in plan
for key, rec in sorted(plan_map.items()):
fqdn = f"{key[0]}.{domain}" if key[0] != "@" else domain
if key in current_map:
curr = current_map[key]
if curr["Address"] != rec["Address"]:
print(f" ✏️ CHANGE {fqdn:<35} {key[1]:<6} {curr['Address']} -> {rec['Address']}")
elif curr.get("TTL", "1800") != rec.get("TTL", "1800"):
print(f" ⏱️ TTL {fqdn:<35} {key[1]:<6} TTL {curr['TTL']} -> {rec.get('TTL', '1800')}")
else:
print(f" ✅ OK {fqdn:<35} {key[1]:<6} {rec['Address']}")
else:
print(f" NEW {fqdn:<35} {key[1]:<6} {rec['Address']}")
# Records only in current (would be deleted in REPLACE mode)
for key, rec in sorted(current_map.items()):
if key not in plan_map:
fqdn = f"{key[0]}.{domain}" if key[0] != "@" else domain
print(f" 🗑️ REMOVE {fqdn:<35} {key[1]:<6} {rec['Address']} (not in plan — will be deleted in REPLACE mode)")
def export_current(config: dict, domain: str, output_file: str):
"""Export current records to plan JSON format."""
records = get_hosts(config, domain)
plan = []
for r in records:
plan.append({
"rr": r["Name"],
"type": r["Type"],
"value": r["Address"],
"ttl": int(r.get("TTL", 1800)),
"scope": "private" if (r["Type"] == "A" and is_private_ip(r["Address"])) else "public",
"note": "",
})
with open(output_file, "w", encoding="utf-8") as f:
json.dump(plan, f, indent=2, ensure_ascii=False)
print(f"✅ Exported {len(plan)} records to {output_file}")
# ──────────────────────────────────────────────
# CLI
# ──────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="Namecheap DNS Batch Provisioner (SAFE: dry-run by default)",
epilog="⚠️ REMEMBER: Namecheap setHosts OVERWRITES all records. Use --merge to keep existing records."
)
parser.add_argument("--domain", required=True, help="Domain name (e.g., gogenex.com)")
parser.add_argument("--sandbox", action="store_true", help="Use sandbox API")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--plan", help="JSON plan file to apply")
group.add_argument("--diff", help="Diff plan vs current state")
group.add_argument("--export", help="Export current records to JSON")
parser.add_argument("--apply", action="store_true", help="Actually apply changes (default is dry-run)")
parser.add_argument("--merge", action="store_true",
help="Merge plan with existing records instead of replacing")
args = parser.parse_args()
config = get_config(sandbox=args.sandbox)
if args.plan:
plan_records = load_plan(args.plan)
apply_plan(config, args.domain, plan_records, merge=args.merge, dry_run=not args.apply)
elif args.diff:
diff_plan(config, args.domain, args.diff)
elif args.export:
export_current(config, args.domain, args.export)
if __name__ == "__main__":
main()