gcx/scripts/namecheap_manager.py

393 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Namecheap DNS Record Manager (Safe Read-Merge-Write Pattern)
CRITICAL: Namecheap's setHosts API OVERWRITES all records.
This script always reads existing records first, merges changes, then writes back.
Usage:
python namecheap_manager.py list --domain example.com
python namecheap_manager.py add --domain example.com --name api --type A --value 1.2.3.4 [--ttl 1800]
python namecheap_manager.py update --domain example.com --name api --type A --value 5.6.7.8
python namecheap_manager.py delete --domain example.com --name api --type A
python namecheap_manager.py set-ns --domain example.com --nameservers ns1.cf.com ns2.cf.com
python namecheap_manager.py export --domain example.com --output records.json
Environment Variables:
NAMECHEAP_API_USER - Namecheap username
NAMECHEAP_API_KEY - Namecheap API key
NAMECHEAP_CLIENT_IP - Whitelisted IPv4 address
"""
import argparse
import json
import os
import sys
import xml.etree.ElementTree as ET
from datetime import datetime
try:
import urllib.request
import urllib.parse
except ImportError:
pass
# ──────────────────────────────────────────────
# Configuration
# ──────────────────────────────────────────────
API_URL_PROD = "https://api.namecheap.com/xml.response"
API_URL_SANDBOX = "https://api.sandbox.namecheap.com/xml.response"
NS = {"nc": "http://api.namecheap.com/xml.response"}
NS_HTTPS = {"nc": "https://api.namecheap.com/xml.response"}
def get_config(sandbox: bool = False):
api_user = os.environ.get("NAMECHEAP_API_USER")
api_key = os.environ.get("NAMECHEAP_API_KEY")
client_ip = os.environ.get("NAMECHEAP_CLIENT_IP")
if not all([api_user, api_key, client_ip]):
print("ERROR: Set NAMECHEAP_API_USER, NAMECHEAP_API_KEY, and NAMECHEAP_CLIENT_IP env vars.")
sys.exit(1)
return {
"api_user": api_user,
"api_key": api_key,
"client_ip": client_ip,
"base_url": API_URL_SANDBOX if sandbox else API_URL_PROD,
}
def split_domain(domain: str) -> tuple:
"""Split domain into SLD and TLD. e.g., 'gogenex.com' -> ('gogenex', 'com')"""
parts = domain.rsplit(".", 1)
if len(parts) != 2:
print(f"ERROR: Cannot split domain '{domain}'. Expected format: 'example.com'")
sys.exit(1)
return parts[0], parts[1]
# ──────────────────────────────────────────────
# API Calls
# ──────────────────────────────────────────────
def api_call(config: dict, command: str, extra_params: dict = None) -> ET.Element:
"""Make a Namecheap API call and return the parsed XML root."""
params = {
"ApiUser": config["api_user"],
"ApiKey": config["api_key"],
"UserName": config["api_user"],
"ClientIp": config["client_ip"],
"Command": command,
}
if extra_params:
params.update(extra_params)
url = f"{config['base_url']}?{urllib.parse.urlencode(params)}"
try:
req = urllib.request.Request(url)
with urllib.request.urlopen(req, timeout=30) as response:
xml_data = response.read().decode("utf-8")
except Exception as e:
print(f"ERROR: API call failed: {e}")
sys.exit(1)
root = ET.fromstring(xml_data)
# Check for errors (handle both http and https namespace)
status = root.attrib.get("Status", "")
if status == "ERROR":
# Try to find error message
for ns_dict in [NS, NS_HTTPS]:
for ns_prefix, ns_uri in ns_dict.items():
errors = root.findall(f".//{{{ns_uri}}}Error")
for err in errors:
print(f"ERROR [{err.attrib.get('Number', '?')}]: {err.text}")
sys.exit(1)
return root
def get_hosts(config: dict, domain: str) -> list:
"""Fetch all DNS host records for a domain."""
sld, tld = split_domain(domain)
root = api_call(config, "namecheap.domains.dns.getHosts", {"SLD": sld, "TLD": tld})
records = []
# Search in both namespace variants, both cases (Host and host)
for ns_uri in [NS["nc"], NS_HTTPS["nc"]]:
for tag in ["Host", "host"]:
hosts = root.findall(f".//{{{ns_uri}}}{tag}")
for host in hosts:
records.append({
"HostId": host.attrib.get("HostId", ""),
"Name": host.attrib.get("Name", ""),
"Type": host.attrib.get("Type", ""),
"Address": host.attrib.get("Address", ""),
"MXPref": host.attrib.get("MXPref", "10"),
"TTL": host.attrib.get("TTL", "1800"),
})
if records:
break
if records:
break
# Fallback: try without namespace, both cases
if not records:
for tag in ["Host", "host"]:
for host in root.iter(tag):
records.append({
"HostId": host.attrib.get("HostId", ""),
"Name": host.attrib.get("Name", ""),
"Type": host.attrib.get("Type", ""),
"Address": host.attrib.get("Address", ""),
"MXPref": host.attrib.get("MXPref", "10"),
"TTL": host.attrib.get("TTL", "1800"),
})
if records:
break
return records
def set_hosts(config: dict, domain: str, records: list) -> bool:
"""Set ALL DNS host records for a domain. This OVERWRITES everything."""
sld, tld = split_domain(domain)
params = {"SLD": sld, "TLD": tld}
for i, rec in enumerate(records, 1):
params[f"HostName{i}"] = rec["Name"]
params[f"RecordType{i}"] = rec["Type"]
params[f"Address{i}"] = rec["Address"]
params[f"TTL{i}"] = rec.get("TTL", "1800")
if rec["Type"] in ("MX", "MXE"):
params[f"MXPref{i}"] = rec.get("MXPref", "10")
root = api_call(config, "namecheap.domains.dns.setHosts", params)
return True
def set_nameservers(config: dict, domain: str, nameservers: list):
"""Set custom nameservers for a domain."""
sld, tld = split_domain(domain)
params = {
"SLD": sld,
"TLD": tld,
"Nameservers": ",".join(nameservers),
}
api_call(config, "namecheap.domains.dns.setCustom", params)
return True
# ──────────────────────────────────────────────
# Safe Operations (Read-Merge-Write)
# ──────────────────────────────────────────────
def backup_records(records: list, domain: str):
"""Save a backup of current records before modification."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = f"backup_{domain}_{timestamp}.json"
with open(backup_file, "w", encoding="utf-8") as f:
json.dump(records, f, indent=2)
print(f" 💾 Backup saved: {backup_file}")
return backup_file
def safe_add(config: dict, domain: str, name: str, record_type: str, value: str, ttl: str = "1800"):
"""Safely add a record by reading existing, appending, and writing back."""
print(f"\n📡 Reading current records for {domain}...")
current = get_hosts(config, domain)
print(f" Found {len(current)} existing records")
# Check for duplicate
for r in current:
if r["Name"] == name and r["Type"] == record_type and r["Address"] == value:
print(f" ⚠️ Record already exists: {name} {record_type} {value}")
return
# Backup before modification
backup_records(current, domain)
# Append new record
new_record = {
"Name": name,
"Type": record_type,
"Address": value,
"TTL": ttl,
"MXPref": "10",
}
updated = current + [new_record]
fqdn = f"{name}.{domain}" if name != "@" else domain
print(f" Adding: {fqdn} -> {record_type} {value}")
print(f" 📤 Writing {len(updated)} records back...")
set_hosts(config, domain, updated)
print(f" ✅ Done! Record added successfully.")
def safe_update(config: dict, domain: str, name: str, record_type: str, value: str, ttl: str = None):
"""Safely update a record by reading, modifying, and writing back."""
print(f"\n📡 Reading current records for {domain}...")
current = get_hosts(config, domain)
backup_records(current, domain)
found = False
for r in current:
if r["Name"] == name and r["Type"] == record_type:
old_value = r["Address"]
r["Address"] = value
if ttl:
r["TTL"] = ttl
found = True
fqdn = f"{name}.{domain}" if name != "@" else domain
print(f" ✏️ Updating: {fqdn} {record_type} {old_value} -> {value}")
break
if not found:
print(f" ❌ Record not found: {name} {record_type}")
return
print(f" 📤 Writing {len(current)} records back...")
set_hosts(config, domain, current)
print(f" ✅ Done! Record updated successfully.")
def safe_delete(config: dict, domain: str, name: str, record_type: str):
"""Safely delete a record by reading, removing, and writing back."""
print(f"\n📡 Reading current records for {domain}...")
current = get_hosts(config, domain)
backup_records(current, domain)
original_count = len(current)
updated = [r for r in current if not (r["Name"] == name and r["Type"] == record_type)]
if len(updated) == original_count:
print(f" ❌ Record not found: {name} {record_type}")
return
fqdn = f"{name}.{domain}" if name != "@" else domain
print(f" 🗑️ Deleting: {fqdn} {record_type}")
print(f" 📤 Writing {len(updated)} records back (was {original_count})...")
set_hosts(config, domain, updated)
print(f" ✅ Done! Record deleted successfully.")
# ──────────────────────────────────────────────
# Display & Export
# ──────────────────────────────────────────────
def display_records(records: list, domain: str):
"""Pretty-print DNS records."""
print(f"\n📋 DNS Records for {domain} (Total: {len(records)})\n")
print(f"{'HostId':<10} {'Name':<30} {'Type':<8} {'Address':<40} {'TTL'}")
print("-" * 100)
for r in records:
fqdn = f"{r['Name']}.{domain}" if r["Name"] != "@" else domain
print(f"{r['HostId']:<10} {fqdn:<30} {r['Type']:<8} {r['Address']:<40} {r['TTL']}")
def export_records(records: list, domain: str, output_file: str):
"""Export records to JSON plan format."""
plan = []
for r in records:
plan.append({
"rr": r["Name"],
"type": r["Type"],
"value": r["Address"],
"ttl": int(r.get("TTL", 1800)),
"note": "",
})
with open(output_file, "w", encoding="utf-8") as f:
json.dump(plan, f, indent=2, ensure_ascii=False)
print(f"✅ Exported {len(plan)} records to {output_file}")
# ──────────────────────────────────────────────
# CLI
# ──────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Namecheap DNS Record Manager (Safe Read-Merge-Write)")
parser.add_argument("--sandbox", action="store_true", help="Use sandbox API endpoint")
subparsers = parser.add_subparsers(dest="action", help="Action to perform")
# List
list_p = subparsers.add_parser("list", help="List all DNS records")
list_p.add_argument("--domain", required=True)
# Add
add_p = subparsers.add_parser("add", help="Add a DNS record (safe merge)")
add_p.add_argument("--domain", required=True)
add_p.add_argument("--name", required=True, help="Hostname (e.g., api, www, @)")
add_p.add_argument("--type", required=True, help="Record type (A, AAAA, CNAME, MX, TXT, etc.)")
add_p.add_argument("--value", required=True, help="Record value")
add_p.add_argument("--ttl", default="1800", help="TTL in seconds (default: 1800)")
# Update
update_p = subparsers.add_parser("update", help="Update a DNS record")
update_p.add_argument("--domain", required=True)
update_p.add_argument("--name", required=True)
update_p.add_argument("--type", required=True)
update_p.add_argument("--value", required=True)
update_p.add_argument("--ttl", help="New TTL (optional)")
# Delete
delete_p = subparsers.add_parser("delete", help="Delete a DNS record")
delete_p.add_argument("--domain", required=True)
delete_p.add_argument("--name", required=True)
delete_p.add_argument("--type", required=True)
# Set nameservers
ns_p = subparsers.add_parser("set-ns", help="Set custom nameservers")
ns_p.add_argument("--domain", required=True)
ns_p.add_argument("--nameservers", nargs="+", required=True, help="Nameserver domains")
# Export
export_p = subparsers.add_parser("export", help="Export records to JSON")
export_p.add_argument("--domain", required=True)
export_p.add_argument("--output", required=True, help="Output JSON file path")
args = parser.parse_args()
if not args.action:
parser.print_help()
sys.exit(1)
config = get_config(sandbox=args.sandbox)
if args.action == "list":
records = get_hosts(config, args.domain)
display_records(records, args.domain)
elif args.action == "add":
safe_add(config, args.domain, args.name, args.type, args.value, args.ttl)
elif args.action == "update":
safe_update(config, args.domain, args.name, args.type, args.value, args.ttl)
elif args.action == "delete":
safe_delete(config, args.domain, args.name, args.type)
elif args.action == "set-ns":
print(f"\n🔀 Setting nameservers for {args.domain}: {', '.join(args.nameservers)}")
set_nameservers(config, args.domain, args.nameservers)
print(f"✅ Nameservers updated. Note: URL/Email forwarding and DDNS will stop working.")
elif args.action == "export":
records = get_hosts(config, args.domain)
export_records(records, args.domain, args.output)
if __name__ == "__main__":
main()