#!/usr/bin/env python3 """ Namecheap DNS Record Manager (Safe Read-Merge-Write Pattern) CRITICAL: Namecheap's setHosts API OVERWRITES all records. This script always reads existing records first, merges changes, then writes back. Usage: python namecheap_manager.py list --domain example.com python namecheap_manager.py add --domain example.com --name api --type A --value 1.2.3.4 [--ttl 1800] python namecheap_manager.py update --domain example.com --name api --type A --value 5.6.7.8 python namecheap_manager.py delete --domain example.com --name api --type A python namecheap_manager.py set-ns --domain example.com --nameservers ns1.cf.com ns2.cf.com python namecheap_manager.py export --domain example.com --output records.json Environment Variables: NAMECHEAP_API_USER - Namecheap username NAMECHEAP_API_KEY - Namecheap API key NAMECHEAP_CLIENT_IP - Whitelisted IPv4 address """ import argparse import json import os import sys import xml.etree.ElementTree as ET from datetime import datetime try: import urllib.request import urllib.parse except ImportError: pass # ────────────────────────────────────────────── # Configuration # ────────────────────────────────────────────── API_URL_PROD = "https://api.namecheap.com/xml.response" API_URL_SANDBOX = "https://api.sandbox.namecheap.com/xml.response" NS = {"nc": "http://api.namecheap.com/xml.response"} NS_HTTPS = {"nc": "https://api.namecheap.com/xml.response"} def get_config(sandbox: bool = False): api_user = os.environ.get("NAMECHEAP_API_USER") api_key = os.environ.get("NAMECHEAP_API_KEY") client_ip = os.environ.get("NAMECHEAP_CLIENT_IP") if not all([api_user, api_key, client_ip]): print("ERROR: Set NAMECHEAP_API_USER, NAMECHEAP_API_KEY, and NAMECHEAP_CLIENT_IP env vars.") sys.exit(1) return { "api_user": api_user, "api_key": api_key, "client_ip": client_ip, "base_url": API_URL_SANDBOX if sandbox else API_URL_PROD, } def split_domain(domain: str) -> tuple: """Split domain into SLD and TLD. e.g., 'gogenex.com' -> ('gogenex', 'com')""" parts = domain.rsplit(".", 1) if len(parts) != 2: print(f"ERROR: Cannot split domain '{domain}'. Expected format: 'example.com'") sys.exit(1) return parts[0], parts[1] # ────────────────────────────────────────────── # API Calls # ────────────────────────────────────────────── def api_call(config: dict, command: str, extra_params: dict = None) -> ET.Element: """Make a Namecheap API call and return the parsed XML root.""" params = { "ApiUser": config["api_user"], "ApiKey": config["api_key"], "UserName": config["api_user"], "ClientIp": config["client_ip"], "Command": command, } if extra_params: params.update(extra_params) url = f"{config['base_url']}?{urllib.parse.urlencode(params)}" try: req = urllib.request.Request(url) with urllib.request.urlopen(req, timeout=30) as response: xml_data = response.read().decode("utf-8") except Exception as e: print(f"ERROR: API call failed: {e}") sys.exit(1) root = ET.fromstring(xml_data) # Check for errors (handle both http and https namespace) status = root.attrib.get("Status", "") if status == "ERROR": # Try to find error message for ns_dict in [NS, NS_HTTPS]: for ns_prefix, ns_uri in ns_dict.items(): errors = root.findall(f".//{{{ns_uri}}}Error") for err in errors: print(f"ERROR [{err.attrib.get('Number', '?')}]: {err.text}") sys.exit(1) return root def get_hosts(config: dict, domain: str) -> list: """Fetch all DNS host records for a domain.""" sld, tld = split_domain(domain) root = api_call(config, "namecheap.domains.dns.getHosts", {"SLD": sld, "TLD": tld}) records = [] # Search in both namespace variants, both cases (Host and host) for ns_uri in [NS["nc"], NS_HTTPS["nc"]]: for tag in ["Host", "host"]: hosts = root.findall(f".//{{{ns_uri}}}{tag}") for host in hosts: records.append({ "HostId": host.attrib.get("HostId", ""), "Name": host.attrib.get("Name", ""), "Type": host.attrib.get("Type", ""), "Address": host.attrib.get("Address", ""), "MXPref": host.attrib.get("MXPref", "10"), "TTL": host.attrib.get("TTL", "1800"), }) if records: break if records: break # Fallback: try without namespace, both cases if not records: for tag in ["Host", "host"]: for host in root.iter(tag): records.append({ "HostId": host.attrib.get("HostId", ""), "Name": host.attrib.get("Name", ""), "Type": host.attrib.get("Type", ""), "Address": host.attrib.get("Address", ""), "MXPref": host.attrib.get("MXPref", "10"), "TTL": host.attrib.get("TTL", "1800"), }) if records: break return records def set_hosts(config: dict, domain: str, records: list) -> bool: """Set ALL DNS host records for a domain. This OVERWRITES everything.""" sld, tld = split_domain(domain) params = {"SLD": sld, "TLD": tld} for i, rec in enumerate(records, 1): params[f"HostName{i}"] = rec["Name"] params[f"RecordType{i}"] = rec["Type"] params[f"Address{i}"] = rec["Address"] params[f"TTL{i}"] = rec.get("TTL", "1800") if rec["Type"] in ("MX", "MXE"): params[f"MXPref{i}"] = rec.get("MXPref", "10") root = api_call(config, "namecheap.domains.dns.setHosts", params) return True def set_nameservers(config: dict, domain: str, nameservers: list): """Set custom nameservers for a domain.""" sld, tld = split_domain(domain) params = { "SLD": sld, "TLD": tld, "Nameservers": ",".join(nameservers), } api_call(config, "namecheap.domains.dns.setCustom", params) return True # ────────────────────────────────────────────── # Safe Operations (Read-Merge-Write) # ────────────────────────────────────────────── def backup_records(records: list, domain: str): """Save a backup of current records before modification.""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_file = f"backup_{domain}_{timestamp}.json" with open(backup_file, "w", encoding="utf-8") as f: json.dump(records, f, indent=2) print(f" 💾 Backup saved: {backup_file}") return backup_file def safe_add(config: dict, domain: str, name: str, record_type: str, value: str, ttl: str = "1800"): """Safely add a record by reading existing, appending, and writing back.""" print(f"\n📡 Reading current records for {domain}...") current = get_hosts(config, domain) print(f" Found {len(current)} existing records") # Check for duplicate for r in current: if r["Name"] == name and r["Type"] == record_type and r["Address"] == value: print(f" ⚠️ Record already exists: {name} {record_type} {value}") return # Backup before modification backup_records(current, domain) # Append new record new_record = { "Name": name, "Type": record_type, "Address": value, "TTL": ttl, "MXPref": "10", } updated = current + [new_record] fqdn = f"{name}.{domain}" if name != "@" else domain print(f" ➕ Adding: {fqdn} -> {record_type} {value}") print(f" 📤 Writing {len(updated)} records back...") set_hosts(config, domain, updated) print(f" ✅ Done! Record added successfully.") def safe_update(config: dict, domain: str, name: str, record_type: str, value: str, ttl: str = None): """Safely update a record by reading, modifying, and writing back.""" print(f"\n📡 Reading current records for {domain}...") current = get_hosts(config, domain) backup_records(current, domain) found = False for r in current: if r["Name"] == name and r["Type"] == record_type: old_value = r["Address"] r["Address"] = value if ttl: r["TTL"] = ttl found = True fqdn = f"{name}.{domain}" if name != "@" else domain print(f" ✏️ Updating: {fqdn} {record_type} {old_value} -> {value}") break if not found: print(f" ❌ Record not found: {name} {record_type}") return print(f" 📤 Writing {len(current)} records back...") set_hosts(config, domain, current) print(f" ✅ Done! Record updated successfully.") def safe_delete(config: dict, domain: str, name: str, record_type: str): """Safely delete a record by reading, removing, and writing back.""" print(f"\n📡 Reading current records for {domain}...") current = get_hosts(config, domain) backup_records(current, domain) original_count = len(current) updated = [r for r in current if not (r["Name"] == name and r["Type"] == record_type)] if len(updated) == original_count: print(f" ❌ Record not found: {name} {record_type}") return fqdn = f"{name}.{domain}" if name != "@" else domain print(f" 🗑️ Deleting: {fqdn} {record_type}") print(f" 📤 Writing {len(updated)} records back (was {original_count})...") set_hosts(config, domain, updated) print(f" ✅ Done! Record deleted successfully.") # ────────────────────────────────────────────── # Display & Export # ────────────────────────────────────────────── def display_records(records: list, domain: str): """Pretty-print DNS records.""" print(f"\n📋 DNS Records for {domain} (Total: {len(records)})\n") print(f"{'HostId':<10} {'Name':<30} {'Type':<8} {'Address':<40} {'TTL'}") print("-" * 100) for r in records: fqdn = f"{r['Name']}.{domain}" if r["Name"] != "@" else domain print(f"{r['HostId']:<10} {fqdn:<30} {r['Type']:<8} {r['Address']:<40} {r['TTL']}") def export_records(records: list, domain: str, output_file: str): """Export records to JSON plan format.""" plan = [] for r in records: plan.append({ "rr": r["Name"], "type": r["Type"], "value": r["Address"], "ttl": int(r.get("TTL", 1800)), "note": "", }) with open(output_file, "w", encoding="utf-8") as f: json.dump(plan, f, indent=2, ensure_ascii=False) print(f"✅ Exported {len(plan)} records to {output_file}") # ────────────────────────────────────────────── # CLI # ────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser(description="Namecheap DNS Record Manager (Safe Read-Merge-Write)") parser.add_argument("--sandbox", action="store_true", help="Use sandbox API endpoint") subparsers = parser.add_subparsers(dest="action", help="Action to perform") # List list_p = subparsers.add_parser("list", help="List all DNS records") list_p.add_argument("--domain", required=True) # Add add_p = subparsers.add_parser("add", help="Add a DNS record (safe merge)") add_p.add_argument("--domain", required=True) add_p.add_argument("--name", required=True, help="Hostname (e.g., api, www, @)") add_p.add_argument("--type", required=True, help="Record type (A, AAAA, CNAME, MX, TXT, etc.)") add_p.add_argument("--value", required=True, help="Record value") add_p.add_argument("--ttl", default="1800", help="TTL in seconds (default: 1800)") # Update update_p = subparsers.add_parser("update", help="Update a DNS record") update_p.add_argument("--domain", required=True) update_p.add_argument("--name", required=True) update_p.add_argument("--type", required=True) update_p.add_argument("--value", required=True) update_p.add_argument("--ttl", help="New TTL (optional)") # Delete delete_p = subparsers.add_parser("delete", help="Delete a DNS record") delete_p.add_argument("--domain", required=True) delete_p.add_argument("--name", required=True) delete_p.add_argument("--type", required=True) # Set nameservers ns_p = subparsers.add_parser("set-ns", help="Set custom nameservers") ns_p.add_argument("--domain", required=True) ns_p.add_argument("--nameservers", nargs="+", required=True, help="Nameserver domains") # Export export_p = subparsers.add_parser("export", help="Export records to JSON") export_p.add_argument("--domain", required=True) export_p.add_argument("--output", required=True, help="Output JSON file path") args = parser.parse_args() if not args.action: parser.print_help() sys.exit(1) config = get_config(sandbox=args.sandbox) if args.action == "list": records = get_hosts(config, args.domain) display_records(records, args.domain) elif args.action == "add": safe_add(config, args.domain, args.name, args.type, args.value, args.ttl) elif args.action == "update": safe_update(config, args.domain, args.name, args.type, args.value, args.ttl) elif args.action == "delete": safe_delete(config, args.domain, args.name, args.type) elif args.action == "set-ns": print(f"\n🔀 Setting nameservers for {args.domain}: {', '.join(args.nameservers)}") set_nameservers(config, args.domain, args.nameservers) print(f"✅ Nameservers updated. Note: URL/Email forwarding and DDNS will stop working.") elif args.action == "export": records = get_hosts(config, args.domain) export_records(records, args.domain, args.output) if __name__ == "__main__": main()