diff --git a/docs/gogenex-domain-planning.md b/docs/gogenex-domain-planning.md new file mode 100644 index 0000000..b723b09 --- /dev/null +++ b/docs/gogenex-domain-planning.md @@ -0,0 +1,350 @@ +# GoGenex 双域名架构规划方案 + +> 适用场景:企业门户 + 区块链全生态 + MPC 签名服务 + 对象存储 + 微服务架构 +> 主域名:`gogenex.cn`(阿里云)、`gogenex.com`(Namecheap) +> 版本:v1.1 | 2026-03-01 + +--- + +## 〇、双域名策略 + +| 域名 | 注册商 | DNS 服务 | 用途定位 | +| --- | --- | --- | --- | +| `gogenex.cn` | 阿里云 | 阿里云 DNS | **主力域名** — API 网关、后台、内部服务、区块链、运维 | +| `gogenex.com` | Namecheap | Namecheap DNS | **国际 & 品牌** — 官网、文档、开发者平台、状态页 | + +> **原则:** `.cn` 承载所有业务和技术服务;`.com` 面向国际用户和品牌展示。 +> 两个域名可以互相 CNAME,例如 `api.gogenex.com` CNAME 到 `api.gogenex.cn`。 + +--- + +## 一、总体架构分层 + +``` +gogenex.cn / gogenex.com +├── 🌐 门户与官网层 +├── 🔌 API 网关层(对外 / 对内) +├── 🔐 安全与认证层(MPC / Auth / KMS) +├── ⛓️ 区块链服务层 +├── 📦 存储与 CDN 层 +├── 📊 运维监控层 +├── 🧪 环境隔离层(staging / dev) +└── 📚 文档与开放平台 +``` + +--- + +## 二、子域名详细规划 + +### 2.1 🌐 门户与官网 + +| 子域名 | 用途 | 访问范围 | DNS | +| --- | --- | --- | --- | +| `gogenex.com` / `www.gogenex.com` | 企业官网、品牌落地页 | 公开 | Namecheap | +| `gogenex.cn` / `www.gogenex.cn` | 中文官网(可跳转 .com) | 公开 | 阿里云 | +| `portal.gogenex.cn` | 用户业务门户(Web App) | 公开 / 登录 | 阿里云 | +| `admin.gogenex.cn` | 内部运营管理后台 | 内网 / VPN | 阿里云 | +| `console.gogenex.cn` | 开发者 / 商户控制台 | 登录 | 阿里云 | + +--- + +### 2.2 🔌 API 网关层 + +#### 对外公共 API(统一网关入口) + +| 子域名 | 用途 | 说明 | DNS | +| --- | --- | --- | --- | +| `api.gogenex.cn` | 对外统一 API 网关 | 所有对外服务统一入口 | 阿里云 | +| `api.gogenex.com` | 国际 API 入口 | CNAME → `api.gogenex.cn` | Namecheap | +| ↳ `/v1/wallet/*` | 钱包服务 | 路径路由 | | +| ↳ `/v1/tx/*` | 交易服务 | 路径路由 | | +| ↳ `/v1/chain/*` | 链上查询服务 | 路径路由 | | +| ↳ `/v1/mpc/*` | MPC 签名对外接口 | 路径路由 | | +| ↳ `/v1/nft/*` | NFT 服务 | 路径路由 | | +| ↳ `/v1/defi/*` | DeFi 聚合服务 | 路径路由 | | + +#### 对内微服务 API + +| 子域名 | 用途 | 访问范围 | DNS | +| --- | --- | --- | --- | +| `api-internal.gogenex.cn` | 内部微服务 API 网关 | 内网 / VPN | 阿里云 | +| ↳ `/svc/user/*` | 用户中心 | 内部 | | +| ↳ `/svc/risk/*` | 风控引擎 | 内部 | | +| ↳ `/svc/notify/*` | 通知推送 | 内部 | | +| ↳ `/svc/settlement/*` | 清结算 | 内部 | | + +--- + +### 2.3 🔐 安全与认证层(MPC 重点) + +| 子域名 | 用途 | 访问范围 | 备注 | DNS | +| --- | --- | --- | --- | --- | +| `auth.gogenex.cn` | 统一认证 / SSO / OAuth2 | 公开 | JWT 签发、2FA | 阿里云 | +| `mpc.gogenex.cn` | MPC 签名服务总入口 | **严格受限** | 独立部署,网络隔离 | 阿里云 | +| `mpc-relay.gogenex.cn` | MPC 中继 / 协调节点 | 内网 | 多方协调通信 | 阿里云 | +| `mpc-callback.gogenex.cn` | MPC 签名异步回调 | 白名单 | 签名结果回传 | 阿里云 | +| `kms.gogenex.cn` | 密钥管理服务(KMS) | 内网 | HSM / 密钥分片管理 | 阿里云 | +| `vault.gogenex.cn` | 机密存储(HashiCorp Vault) | 内网 | 配置密钥、证书管理 | 阿里云 | + +> ⚠️ **MPC 安全建议**:`mpc.gogenex.cn` 建议不解析到公网,仅通过 VPN / 内网访问。对外暴露的 MPC 能力通过 `api.gogenex.cn/v1/mpc/*` 走网关代理,经过认证、限流、风控后转发。 + +--- + +### 2.4 ⛓️ 区块链服务层 + +| 子域名 | 用途 | 访问范围 | 备注 | DNS | +| --- | --- | --- | --- | --- | +| `rpc.gogenex.cn` | 区块链 RPC 统一代理 | 公开 / 限流 | 多链 RPC 聚合入口 | 阿里云 | +| ↳ `/eth`, `/bsc`, `/tron`... | 按链路径路由 | | | | +| `ws.gogenex.cn` | WebSocket 长连接服务 | 公开 / 登录 | 交易推送、链上事件 | 阿里云 | +| `explorer.gogenex.cn` | 区块链浏览器 | 公开 | 交易 / 区块查询 | 阿里云 | +| `indexer.gogenex.cn` | 链上数据索引服务 | 内网 | The Graph 类服务 | 阿里云 | +| `bridge.gogenex.cn` | 跨链桥服务 | 公开 / 登录 | 跨链资产转移 | 阿里云 | +| `oracle.gogenex.cn` | 预言机服务 | 内网 / 受限 | 喂价、链下数据 | 阿里云 | +| `node.gogenex.cn` | 全节点管理入口 | 内网 | 节点运维、健康检查 | 阿里云 | +| `faucet.gogenex.cn` | 测试网水龙头 | 公开 | 测试代币发放 | 阿里云 | + +--- + +### 2.5 📦 存储与 CDN 层 + +| 子域名 | 用途 | 访问范围 | DNS | +| --- | --- | --- | --- | +| `oss.gogenex.cn` | MinIO 对象存储 API | 内部 / 服务间 | 阿里云 | +| `static.gogenex.cn` | 静态资源 CDN(前端资源、图片) | 公开 | 阿里云 | +| `ipfs-gw.gogenex.cn` | IPFS 网关(NFT 元数据等) | 公开 | 阿里云 | + +> MinIO 推荐路径风格:`oss.gogenex.cn/bucket/object` +> 如需虚拟主机风格可配置 `*.oss.gogenex.cn` 泛解析。 + +--- + +### 2.6 📊 运维监控层 + +| 子域名 | 用途 | 访问范围 | DNS | +| --- | --- | --- | --- | +| `monitor.gogenex.cn` | Grafana 监控面板 | 内网 / VPN | 阿里云 | +| `log.gogenex.cn` | 日志平台(ELK / Loki) | 内网 / VPN | 阿里云 | +| `trace.gogenex.cn` | 链路追踪(Jaeger / Tempo) | 内网 / VPN | 阿里云 | +| `alert.gogenex.cn` | 告警平台(Alertmanager) | 内网 / VPN | 阿里云 | +| `status.gogenex.com` | 服务状态页(对外) | 公开 | Namecheap | +| `ci.gogenex.cn` | CI/CD 平台(Jenkins / GitLab) | 内网 / VPN | 阿里云 | + +--- + +### 2.7 🧪 环境隔离 + +采用 **`{env}-{service}.gogenex.cn`** 风格统一命名: + +| 环境 | 示例 | 说明 | +| --- | --- | --- | +| **生产** | `api.gogenex.cn` | 无前缀 = 生产 | +| **预发布** | `staging-api.gogenex.cn` | staging 前缀 | +| **测试** | `test-api.gogenex.cn` | test 前缀 | +| **开发** | `dev-api.gogenex.cn` | dev 前缀 | +| | `dev-mpc.gogenex.cn` | | +| | `dev-portal.gogenex.cn` | | +| | `staging-rpc.gogenex.cn` | | + +--- + +### 2.8 📚 文档与开放平台 + +| 子域名 | 用途 | 访问范围 | DNS | +| --- | --- | --- | --- | +| `docs.gogenex.com` | API 文档 / 开发者文档 | 公开 | Namecheap | +| `developer.gogenex.com` | 开发者开放平台 | 公开 | Namecheap | +| `sandbox.gogenex.com` | API 沙盒测试环境 | 公开 / 登录 | Namecheap | + +--- + +## 三、DNS 记录规划 + +### 3.1 阿里云 DNS — `gogenex.cn` + +```dns +# ----- 官网与门户 ----- +gogenex.cn. A +www.gogenex.cn. CNAME gogenex.cn. +portal.gogenex.cn. A +console.gogenex.cn. A +admin.gogenex.cn. A <内网 IP> + +# ----- API 网关 ----- +api.gogenex.cn. A + +# ----- 认证 ----- +auth.gogenex.cn. A + +# ----- 区块链服务 ----- +rpc.gogenex.cn. A +ws.gogenex.cn. A +explorer.gogenex.cn. A +bridge.gogenex.cn. A +faucet.gogenex.cn. A + +# ----- 存储与 CDN ----- +static.gogenex.cn. CNAME +ipfs-gw.gogenex.cn. A + +# ----- 内部服务(私有 DNS / 内网解析) ----- +api-internal.gogenex.cn. A <内网 IP> +mpc.gogenex.cn. A <内网隔离 IP> +mpc-relay.gogenex.cn. A <内网 IP> +mpc-callback.gogenex.cn. A <内网 IP> +kms.gogenex.cn. A <内网 IP> +vault.gogenex.cn. A <内网 IP> +oss.gogenex.cn. A <内网 IP> +indexer.gogenex.cn. A <内网 IP> +oracle.gogenex.cn. A <内网 IP> +node.gogenex.cn. A <内网 IP> +monitor.gogenex.cn. A <内网 IP> +log.gogenex.cn. A <内网 IP> +trace.gogenex.cn. A <内网 IP> +alert.gogenex.cn. A <内网 IP> +ci.gogenex.cn. A <内网 IP> +``` + +### 3.2 Namecheap DNS — `gogenex.com` + +```dns +# ----- 品牌官网 ----- +@ A +www CNAME gogenex.com. + +# ----- 国际 API 入口(CNAME 到 .cn 主域名) ----- +api CNAME api.gogenex.cn. + +# ----- 文档与开放平台 ----- +docs CNAME <文档托管> +developer A +sandbox A + +# ----- 状态页 ----- +status CNAME <状态页服务> +``` + +--- + +## 四、SSL 证书策略 + +| 证书 | 覆盖范围 | 说明 | +| --- | --- | --- | +| `*.gogenex.cn` + `gogenex.cn` | 所有 .cn 子域名 + 裸域 | 阿里云免费证书或 Let's Encrypt | +| `*.gogenex.com` + `gogenex.com` | 所有 .com 子域名 + 裸域 | Let's Encrypt | +| `*.oss.gogenex.cn` | MinIO 虚拟主机风格(可选) | 仅在需要时申请 | + +**推荐方案:** + +- 使用 **Let's Encrypt** 做自动续期,DNS-01 验证方式(支持泛域名) +- 阿里云 DNS 验证插件:`certbot-dns-aliyun` +- K8s 场景配合 `cert-manager` 管理 +- 内网服务可使用自签 CA + mTLS 双向认证 + +--- + +## 五、安全分区架构 + +``` +┌──────────────────────────────────────────────────────┐ +│ 公网 (Internet) │ +│ │ +│ .com: www / api(CNAME) / docs / developer / status │ +│ .cn: portal / api / auth / rpc / ws / explorer │ +│ static / faucet / bridge / console │ +└─────────────────────┬────────────────────────────────┘ + │ WAF / API Gateway / Rate Limit +┌─────────────────────┴────────────────────────────────┐ +│ DMZ / 网关层 │ +│ Nginx / Kong / APISIX / Cloudflare │ +└─────────────────────┬────────────────────────────────┘ + │ VPN / mTLS / 内网 +┌─────────────────────┴────────────────────────────────┐ +│ 内网服务区 │ +│ │ +│ .cn: api-internal / admin / oss / indexer / oracle │ +│ monitor / log / trace / ci / node │ +└─────────────────────┬────────────────────────────────┘ + │ 网络隔离 / 硬件安全模块 +┌─────────────────────┴────────────────────────────────┐ +│ 🔒 安全隔离区 (MPC Zone) │ +│ │ +│ .cn: mpc / mpc-relay / mpc-callback / kms / vault │ +│ 独立 VPC / 独立网段 / HSM 硬件 │ +└──────────────────────────────────────────────────────┘ +``` + +### 安全分区说明 + +| 分区 | 网络策略 | 典型服务 | +| --- | --- | --- | +| **公网区** | 经 WAF + CDN 保护,限流 | 官网(.com)、API 网关(.cn)、区块链浏览器 | +| **DMZ 网关层** | 反向代理,仅转发合法请求 | Nginx / Kong / APISIX | +| **内网服务区** | 仅 VPN / 内网可达 | 微服务、监控、存储 (均 .cn) | +| **MPC 安全隔离区** | 独立 VPC、独立网段、最小化出入口 | MPC 签名、KMS、Vault (均 .cn) | + +--- + +## 六、子域名总览清单 + +### gogenex.cn(阿里云 DNS)— 28 条 + +| # | 子域名 | 用途 | 公网/内网 | +| --- | --- | --- | --- | +| 1 | `gogenex.cn` | 中文官网 | 公网 | +| 2 | `www.gogenex.cn` | 中文官网 (CNAME) | 公网 | +| 3 | `portal.gogenex.cn` | 业务门户 | 公网 | +| 4 | `admin.gogenex.cn` | 管理后台 | 内网 | +| 5 | `console.gogenex.cn` | 开发者/商户控制台 | 公网 | +| 6 | `api.gogenex.cn` | 对外 API 网关 | 公网 | +| 7 | `api-internal.gogenex.cn` | 对内 API 网关 | 内网 | +| 8 | `auth.gogenex.cn` | 统一认证 / SSO | 公网 | +| 9 | `mpc.gogenex.cn` | MPC 签名服务 | 内网(隔离) | +| 10 | `mpc-relay.gogenex.cn` | MPC 中继节点 | 内网(隔离) | +| 11 | `mpc-callback.gogenex.cn` | MPC 异步回调 | 内网(白名单) | +| 12 | `kms.gogenex.cn` | 密钥管理 | 内网(隔离) | +| 13 | `vault.gogenex.cn` | 机密存储 | 内网(隔离) | +| 14 | `rpc.gogenex.cn` | 区块链 RPC 代理 | 公网 | +| 15 | `ws.gogenex.cn` | WebSocket 服务 | 公网 | +| 16 | `explorer.gogenex.cn` | 区块链浏览器 | 公网 | +| 17 | `indexer.gogenex.cn` | 链上数据索引 | 内网 | +| 18 | `bridge.gogenex.cn` | 跨链桥 | 公网 | +| 19 | `oracle.gogenex.cn` | 预言机 | 内网 | +| 20 | `node.gogenex.cn` | 全节点管理 | 内网 | +| 21 | `faucet.gogenex.cn` | 测试网水龙头 | 公网 | +| 22 | `oss.gogenex.cn` | MinIO 对象存储 | 内网 | +| 23 | `static.gogenex.cn` | 静态资源 CDN | 公网 | +| 24 | `ipfs-gw.gogenex.cn` | IPFS 网关 | 公网 | +| 25 | `monitor.gogenex.cn` | Grafana 监控 | 内网 | +| 26 | `log.gogenex.cn` | 日志平台 | 内网 | +| 27 | `trace.gogenex.cn` | 链路追踪 | 内网 | +| 28 | `alert.gogenex.cn` | 告警平台 | 内网 | +| 29 | `ci.gogenex.cn` | CI/CD 平台 | 内网 | + +### gogenex.com(Namecheap DNS)— 7 条 + +| # | 子域名 | 用途 | 公网/内网 | +| --- | --- | --- | --- | +| 1 | `gogenex.com` | 品牌官网 | 公网 | +| 2 | `www.gogenex.com` | 品牌官网 (CNAME) | 公网 | +| 3 | `api.gogenex.com` | 国际 API (CNAME → api.gogenex.cn) | 公网 | +| 4 | `docs.gogenex.com` | API 文档 | 公网 | +| 5 | `developer.gogenex.com` | 开发者平台 | 公网 | +| 6 | `sandbox.gogenex.com` | API 沙盒 | 公网 | +| 7 | `status.gogenex.com` | 服务状态页 | 公网 | + +--- + +## 七、关键设计原则 + +1. **双域名分工**:`.cn` 承载技术栈和业务服务,`.com` 面向国际品牌和文档,互不干扰。 +2. **统一网关入口**:对外仅暴露 `api.gogenex.cn`,`.com` 的 API 通过 CNAME 跳转,避免子域名爆炸。 +3. **MPC 严格隔离**:MPC / KMS / Vault 部署在独立安全区,不直接暴露公网,所有对外 MPC 能力经网关代理。 +4. **环境一致性**:采用 `{env}-{service}` 统一命名规范,生产无前缀。 +5. **证书简化**:`.cn` 一张泛域名证书 + `.com` 一张泛域名证书,各自覆盖所有子域名。 +6. **内外分离**:内部服务使用私有 DNS 解析,不解析到公网 IP。 +7. **可扩展性**:新增业务模块只需在网关层添加路径规则,无需频繁新增子域名。 + +--- + +> 📌 本方案为初始架构规划,可根据实际业务发展和团队规模进行裁剪或扩展。 diff --git a/docs/migration-guide.md b/docs/migration-guide.md new file mode 100644 index 0000000..169372d --- /dev/null +++ b/docs/migration-guide.md @@ -0,0 +1,96 @@ +# DNS Migration Guide: Namecheap to External Providers + +When managing 30+ records with frequent changes, Namecheap's overwrite-only API becomes a liability. This guide covers migrating DNS resolution to a better API provider while keeping domain registration at Namecheap. + +## Why Migrate DNS Away from Namecheap? + +| Issue | Impact | +|---|---| +| `setHosts` overwrites all records | Risk of accidental deletion | +| No single-record CRUD | Every change requires full read-write cycle | +| No SRV support via API | Manual work for some services | +| XML responses | More complex parsing than JSON | +| IPv4-only whitelist | Limits deployment environments | + +## Recommended Alternatives + +### Option 1: Cloudflare DNS (Recommended for most cases) + +- **Free plan** includes DNS management +- RESTful JSON API with per-record CRUD +- Official SDKs in Python, Go, Node.js, etc. +- Terraform provider available +- Built-in DDoS protection and CDN +- API Token authentication (scoped permissions) + +### Option 2: Alibaba Cloud DNS (Recommended for China-focused services) + +- Better performance for China mainland users +- Full per-record CRUD API +- Multi-language SDK support +- RAM sub-user permission control +- Use the `aliyun-dns` skill for management + +## Migration Steps + +### Step 1: Export Current Records + +```bash +python namecheap_manager.py export --domain gogenex.com --output current_records.json +``` + +### Step 2: Set Up New DNS Provider + +#### For Cloudflare: +1. Create a Cloudflare account and add your domain +2. Cloudflare will scan and import existing records +3. Verify all records were imported correctly against `current_records.json` +4. Note the assigned Cloudflare nameservers (e.g., `adam.ns.cloudflare.com`) + +#### For Alibaba Cloud DNS: +1. Add domain in Alidns console +2. Import records using: `python alidns_batch.py --domain gogenex.com --plan current_records.json` +3. Note the assigned Alidns nameservers (e.g., `dns1.hichina.com`) + +### Step 3: Switch Nameservers at Namecheap + +```bash +# For Cloudflare +python namecheap_manager.py set-ns --domain gogenex.com \ + --nameservers adam.ns.cloudflare.com betty.ns.cloudflare.com + +# For Alibaba Cloud +python namecheap_manager.py set-ns --domain gogenex.com \ + --nameservers dns1.hichina.com dns2.hichina.com +``` + +### Step 4: Verify Propagation + +DNS propagation typically takes 24-48 hours. Monitor with: + +```bash +# Check which nameservers are responding +dig NS gogenex.com + +# Verify specific records +dig api.gogenex.com +dig mpc.gogenex.com +``` + +### Step 5: Post-Migration Cleanup + +- Update any DDNS scripts to use the new provider's API +- Update Let's Encrypt certbot plugins (e.g., switch from `certbot-dns-namecheap` to `certbot-dns-cloudflare`) +- Update CI/CD pipelines that manage DNS +- Keep Namecheap account active for domain renewal + +## Rollback Plan + +If issues arise, switch back to Namecheap nameservers: + +```bash +python namecheap_manager.py set-ns --domain gogenex.com \ + --nameservers dns1.registrar-servers.com dns2.registrar-servers.com +``` + +Note: Namecheap default nameservers may vary. Check your Namecheap dashboard for the correct values. diff --git a/docs/plan-schema.md b/docs/plan-schema.md new file mode 100644 index 0000000..86b79a6 --- /dev/null +++ b/docs/plan-schema.md @@ -0,0 +1,80 @@ +# Batch Plan JSON Schema + +This document defines the JSON format used by `alidns_batch.py` for batch DNS provisioning. + +## Schema + +```json +[ + { + "rr": "(required) string - Subdomain prefix. '@' for root domain.", + "type": "(required) string - DNS record type: A, AAAA, CNAME, MX, TXT, NS, SRV, CAA", + "value": "(required) string - Record value (IP address, domain, text, etc.)", + "ttl": "(optional) integer - TTL in seconds. Default: 600", + "scope": "(optional) string - 'public' or 'private'. Used for security validation.", + "note": "(optional) string - Human-readable description of the record's purpose." + } +] +``` + +## Example: GoGenex.com Full Plan + +Based on the domain planning document, here is a complete plan example: + +```json +[ + {"rr": "@", "type": "A", "value": "", "ttl": 600, "scope": "public", "note": "Root domain - corporate website"}, + {"rr": "www", "type": "CNAME", "value": "gogenex.com", "ttl": 600, "scope": "public", "note": "WWW redirect to root"}, + {"rr": "portal", "type": "A", "value": "", "ttl": 600, "scope": "public", "note": "User business portal"}, + {"rr": "console", "type": "A", "value": "", "ttl": 600, "scope": "public", "note": "Developer/merchant console"}, + {"rr": "admin", "type": "A", "value": "10.0.1.10", "ttl": 600, "scope": "private", "note": "Internal admin dashboard"}, + + {"rr": "api", "type": "A", "value": "", "ttl": 300, "scope": "public", "note": "Public API gateway"}, + {"rr": "api-internal", "type": "A", "value": "10.0.2.10", "ttl": 600, "scope": "private", "note": "Internal microservice API gateway"}, + + {"rr": "auth", "type": "A", "value": "", "ttl": 300, "scope": "public", "note": "SSO / OAuth2 authentication"}, + {"rr": "mpc", "type": "A", "value": "10.0.10.10", "ttl": 600, "scope": "private", "note": "MPC signing service (isolated)"}, + {"rr": "mpc-relay", "type": "A", "value": "10.0.10.11", "ttl": 600, "scope": "private", "note": "MPC relay/coordinator node"}, + {"rr": "mpc-callback", "type": "A", "value": "10.0.10.12", "ttl": 600, "scope": "private", "note": "MPC async callback"}, + {"rr": "kms", "type": "A", "value": "10.0.10.20", "ttl": 600, "scope": "private", "note": "Key management service"}, + {"rr": "vault", "type": "A", "value": "10.0.10.21", "ttl": 600, "scope": "private", "note": "HashiCorp Vault secret storage"}, + + {"rr": "rpc", "type": "A", "value": "", "ttl": 300, "scope": "public", "note": "Blockchain RPC proxy"}, + {"rr": "ws", "type": "A", "value": "", "ttl": 300, "scope": "public", "note": "WebSocket service"}, + {"rr": "explorer", "type": "A", "value": "", "ttl": 600, "scope": "public", "note": "Blockchain explorer"}, + {"rr": "indexer", "type": "A", "value": "10.0.3.10", "ttl": 600, "scope": "private", "note": "On-chain data indexer"}, + {"rr": "bridge", "type": "A", "value": "", "ttl": 600, "scope": "public", "note": "Cross-chain bridge"}, + {"rr": "oracle", "type": "A", "value": "10.0.3.20", "ttl": 600, "scope": "private", "note": "Oracle service"}, + {"rr": "node", "type": "A", "value": "10.0.3.30", "ttl": 600, "scope": "private", "note": "Full node management"}, + {"rr": "faucet", "type": "A", "value": "", "ttl": 600, "scope": "public", "note": "Testnet faucet"}, + + {"rr": "oss", "type": "A", "value": "10.0.4.10", "ttl": 600, "scope": "private", "note": "MinIO object storage"}, + {"rr": "static", "type": "CNAME", "value": "", "ttl": 300, "scope": "public", "note": "Static assets CDN"}, + {"rr": "ipfs-gw", "type": "A", "value": "", "ttl": 600, "scope": "public", "note": "IPFS gateway"}, + + {"rr": "monitor", "type": "A", "value": "10.0.5.10", "ttl": 600, "scope": "private", "note": "Grafana monitoring"}, + {"rr": "log", "type": "A", "value": "10.0.5.11", "ttl": 600, "scope": "private", "note": "Log platform (ELK/Loki)"}, + {"rr": "trace", "type": "A", "value": "10.0.5.12", "ttl": 600, "scope": "private", "note": "Tracing (Jaeger/Tempo)"}, + {"rr": "alert", "type": "A", "value": "10.0.5.13", "ttl": 600, "scope": "private", "note": "Alertmanager"}, + {"rr": "status", "type": "CNAME", "value": "", "ttl": 600, "scope": "public", "note": "Public status page"}, + {"rr": "ci", "type": "A", "value": "10.0.5.20", "ttl": 600, "scope": "private", "note": "CI/CD platform"}, + + {"rr": "docs", "type": "CNAME", "value": "", "ttl": 600, "scope": "public", "note": "API documentation"}, + {"rr": "developer", "type": "A", "value": "", "ttl": 600, "scope": "public", "note": "Developer open platform"}, + {"rr": "sandbox", "type": "A", "value": "", "ttl": 600, "scope": "public", "note": "API sandbox environment"} +] +``` + +## Placeholder Convention + +Values like `` are placeholders. Replace them with actual IP addresses or domains before running the batch script. The script will reject entries with `<` or `>` in the value field. + +## Generating a Plan from the Planning Document + +To convert a markdown planning doc into a JSON plan: + +1. Extract all subdomain entries from the tables +2. Determine record type (A for IPs, CNAME for domain aliases) +3. Set scope based on the "访问范围" column (公网 -> public, 内网 -> private) +4. Fill in actual IP addresses from your infrastructure +5. Save as `.json` and validate with `--dry-run` before applying diff --git a/scripts/alidns_batch.py b/scripts/alidns_batch.py new file mode 100644 index 0000000..71e272f --- /dev/null +++ b/scripts/alidns_batch.py @@ -0,0 +1,329 @@ +#!/usr/bin/env python3 +""" +Alibaba Cloud DNS Batch Provisioner +Reads a JSON plan file and creates/verifies all DNS records in batch. + +Usage: + python alidns_batch.py --domain gogenex.com --plan plan.json [--dry-run] [--force] + python alidns_batch.py --domain gogenex.com --export current_records.json + python alidns_batch.py --domain gogenex.com --diff plan.json + +Plan JSON format (see references/plan-schema.md for full spec): +[ + {"rr": "api", "type": "A", "value": "1.2.3.4", "ttl": 600, "scope": "public", "note": "API gateway"}, + {"rr": "www", "type": "CNAME", "value": "gogenex.com", "ttl": 600, "scope": "public", "note": "WWW redirect"}, + {"rr": "mpc", "type": "A", "value": "10.0.1.100", "ttl": 600, "scope": "private", "note": "MPC signing service"}, + {"rr": "static", "type": "CNAME", "value": "cdn.example.com", "ttl": 300, "scope": "public", "note": "CDN for static assets"} +] + +Environment Variables: + ALIBABA_CLOUD_ACCESS_KEY_ID - RAM sub-user Access Key ID + ALIBABA_CLOUD_ACCESS_KEY_SECRET - RAM sub-user Access Key Secret +""" + +import argparse +import ipaddress +import json +import os +import sys +import time + +try: + from alibabacloud_alidns20150109.client import Client as AlidnsClient + from alibabacloud_alidns20150109 import models as alidns_models + from alibabacloud_tea_openapi import models as open_api_models +except ImportError: + print("ERROR: Required packages not installed. Run:") + print(" pip install alibabacloud_alidns20150109 alibabacloud_tea_openapi --break-system-packages") + sys.exit(1) + + +# Private IP ranges (RFC 1918 + RFC 6598) +PRIVATE_RANGES = [ + ipaddress.ip_network("10.0.0.0/8"), + ipaddress.ip_network("172.16.0.0/12"), + ipaddress.ip_network("192.168.0.0/16"), + ipaddress.ip_network("100.64.0.0/10"), +] + + +def is_private_ip(ip_str: str) -> bool: + """Check if an IP address is in a private range.""" + try: + ip = ipaddress.ip_address(ip_str) + return any(ip in net for net in PRIVATE_RANGES) + except ValueError: + return False + + +def create_client(region_id: str = "cn-hangzhou") -> AlidnsClient: + access_key_id = os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_ID") + access_key_secret = os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_SECRET") + + if not access_key_id or not access_key_secret: + print("ERROR: Set ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET env vars.") + sys.exit(1) + + config = open_api_models.Config( + access_key_id=access_key_id, + access_key_secret=access_key_secret, + ) + config.endpoint = f"alidns.{region_id}.aliyuncs.com" + return AlidnsClient(config) + + +def get_existing_records(client: AlidnsClient, domain: str) -> list: + """Fetch all existing DNS records for a domain.""" + all_records = [] + page = 1 + page_size = 100 + + while True: + request = alidns_models.DescribeDomainRecordsRequest( + domain_name=domain, + page_number=page, + page_size=page_size, + ) + response = client.describe_domain_records(request) + records = response.body.domain_records.record + all_records.extend(records) + + if len(all_records) >= response.body.total_count: + break + page += 1 + + return all_records + + +def find_matching_record(existing: list, rr: str, record_type: str) -> object: + """Find an existing record matching the given RR and type.""" + for r in existing: + if r.rr == rr and r.type == record_type: + return r + return None + + +def validate_plan(plan: list, domain: str) -> list: + """Validate the plan and return warnings.""" + warnings = [] + seen = set() + + for i, entry in enumerate(plan): + # Required fields + for field in ["rr", "type", "value"]: + if field not in entry: + warnings.append(f"❌ Entry {i}: missing required field '{field}'") + + # Duplicate check + key = (entry.get("rr", ""), entry.get("type", "")) + if key in seen: + warnings.append(f"⚠️ Entry {i}: duplicate record {key[0]}.{domain} {key[1]}") + seen.add(key) + + # Security: private scope pointing to public IP + scope = entry.get("scope", "public") + value = entry.get("value", "") + record_type = entry.get("type", "") + + if scope == "private" and record_type == "A" and not is_private_ip(value): + warnings.append( + f"🔒 SECURITY: Entry {i} ({entry.get('rr')}.{domain}) is marked 'private' " + f"but points to public IP {value}. Internal services should use private IPs." + ) + + if scope == "public" and record_type == "A" and is_private_ip(value): + warnings.append( + f"⚠️ Entry {i} ({entry.get('rr')}.{domain}) is marked 'public' " + f"but points to private IP {value}. This won't be reachable from the internet." + ) + + return warnings + + +def apply_plan(client: AlidnsClient, domain: str, plan: list, dry_run: bool = False, force: bool = False): + """Apply the DNS plan to Alibaba Cloud DNS.""" + print(f"\n{'🔍 DRY RUN' if dry_run else '🚀 APPLYING'} plan for {domain}") + print(f" Records in plan: {len(plan)}\n") + + # Validate + warnings = validate_plan(plan, domain) + if warnings: + print("⚠️ Validation warnings:") + for w in warnings: + print(f" {w}") + print() + + has_errors = any(w.startswith("❌") for w in warnings) + has_security = any(w.startswith("🔒") for w in warnings) + + if has_errors: + print("❌ Plan has errors. Fix them before applying.") + return + + if has_security and not force: + print("🔒 Security warnings detected. Use --force to override.") + return + + # Fetch existing records + print("📡 Fetching existing records...") + existing = get_existing_records(client, domain) + print(f" Found {len(existing)} existing records\n") + + stats = {"added": 0, "updated": 0, "skipped": 0, "errors": 0} + + for entry in plan: + rr = entry["rr"] + record_type = entry["type"] + value = entry["value"] + ttl = entry.get("ttl", 600) + note = entry.get("note", "") + fqdn = f"{rr}.{domain}" if rr != "@" else domain + + match = find_matching_record(existing, rr, record_type) + + if match: + if match.value == value and match.ttl == ttl: + print(f" ⏭️ SKIP {fqdn:<35} {record_type:<6} {value} (already correct)") + stats["skipped"] += 1 + else: + print(f" ✏️ UPDATE {fqdn:<35} {record_type:<6} {match.value} -> {value} {f'[{note}]' if note else ''}") + if not dry_run: + try: + request = alidns_models.UpdateDomainRecordRequest( + record_id=match.record_id, + rr=rr, + type=record_type, + value=value, + ttl=ttl, + ) + client.update_domain_record(request) + stats["updated"] += 1 + time.sleep(0.1) # Rate limit throttle + except Exception as e: + print(f" ❌ Error: {e}") + stats["errors"] += 1 + else: + stats["updated"] += 1 + else: + print(f" ➕ ADD {fqdn:<35} {record_type:<6} {value} {f'[{note}]' if note else ''}") + if not dry_run: + try: + request = alidns_models.AddDomainRecordRequest( + domain_name=domain, + rr=rr, + type=record_type, + value=value, + ttl=ttl, + ) + client.add_domain_record(request) + stats["added"] += 1 + time.sleep(0.1) + except Exception as e: + error_msg = str(e) + if "DomainRecordDuplicate" in error_msg: + print(f" ⚠️ Already exists (different query match)") + stats["skipped"] += 1 + else: + print(f" ❌ Error: {e}") + stats["errors"] += 1 + else: + stats["added"] += 1 + + # Summary + print(f"\n{'📊 DRY RUN SUMMARY' if dry_run else '📊 EXECUTION SUMMARY'}") + print(f" Added: {stats['added']}") + print(f" Updated: {stats['updated']}") + print(f" Skipped: {stats['skipped']}") + print(f" Errors: {stats['errors']}") + + if dry_run: + print(f"\n💡 Run without --dry-run to apply these changes.") + + +def export_records(client: AlidnsClient, domain: str, output_file: str): + """Export current DNS records to a JSON plan file.""" + records = get_existing_records(client, domain) + + plan = [] + for r in records: + plan.append({ + "rr": r.rr, + "type": r.type, + "value": r.value, + "ttl": r.ttl, + "scope": "private" if (r.type == "A" and is_private_ip(r.value)) else "public", + "note": "", + "record_id": r.record_id, + "status": r.status, + }) + + with open(output_file, "w", encoding="utf-8") as f: + json.dump(plan, f, indent=2, ensure_ascii=False) + + print(f"✅ Exported {len(plan)} records to {output_file}") + + +def diff_plan(client: AlidnsClient, domain: str, plan_file: str): + """Show differences between current state and planned state.""" + with open(plan_file, "r", encoding="utf-8") as f: + plan = json.load(f) + + existing = get_existing_records(client, domain) + + print(f"\n🔍 Diff: {plan_file} vs live records for {domain}\n") + + # Check plan entries against existing + for entry in plan: + rr = entry["rr"] + record_type = entry["type"] + value = entry["value"] + ttl = entry.get("ttl", 600) + fqdn = f"{rr}.{domain}" if rr != "@" else domain + + match = find_matching_record(existing, rr, record_type) + + if not match: + print(f" ➕ NEW {fqdn:<35} {record_type:<6} {value}") + elif match.value != value: + print(f" ✏️ CHANGE {fqdn:<35} {record_type:<6} {match.value} -> {value}") + elif match.ttl != ttl: + print(f" ⏱️ TTL {fqdn:<35} {record_type:<6} TTL {match.ttl} -> {ttl}") + else: + print(f" ✅ OK {fqdn:<35} {record_type:<6} {value}") + + # Check for records not in plan + plan_keys = {(e["rr"], e["type"]) for e in plan} + for r in existing: + if (r.rr, r.type) not in plan_keys: + fqdn = f"{r.rr}.{domain}" if r.rr != "@" else domain + print(f" ⚠️ EXTRA {fqdn:<35} {r.type:<6} {r.value} (not in plan)") + + +def main(): + parser = argparse.ArgumentParser(description="Alibaba Cloud DNS Batch Provisioner") + parser.add_argument("--domain", required=True, help="Domain name (e.g., gogenex.com)") + parser.add_argument("--region", default="cn-hangzhou", help="Alidns region") + + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument("--plan", help="JSON plan file to apply") + group.add_argument("--export", help="Export current records to JSON file") + group.add_argument("--diff", help="Show diff between plan and current state") + + parser.add_argument("--dry-run", action="store_true", help="Preview changes without applying") + parser.add_argument("--force", action="store_true", help="Override security warnings") + + args = parser.parse_args() + client = create_client(args.region) + + if args.plan: + with open(args.plan, "r", encoding="utf-8") as f: + plan = json.load(f) + apply_plan(client, args.domain, plan, dry_run=args.dry_run, force=args.force) + elif args.export: + export_records(client, args.domain, args.export) + elif args.diff: + diff_plan(client, args.domain, args.diff) + + +if __name__ == "__main__": + main() diff --git a/scripts/alidns_manager.py b/scripts/alidns_manager.py new file mode 100644 index 0000000..5483955 --- /dev/null +++ b/scripts/alidns_manager.py @@ -0,0 +1,218 @@ +#!/usr/bin/env python3 +""" +Alibaba Cloud DNS Record Manager +Supports: list, add, update, delete, enable, disable operations on Alidns records. + +Usage: + python alidns_manager.py list --domain example.com [--rr www] [--type A] + python alidns_manager.py add --domain example.com --rr www --type A --value 1.2.3.4 [--ttl 600] + python alidns_manager.py update --record-id 123456 --rr www --type A --value 5.6.7.8 [--ttl 600] + python alidns_manager.py delete --record-id 123456 + python alidns_manager.py enable --record-id 123456 + python alidns_manager.py disable --record-id 123456 + +Environment Variables: + ALIBABA_CLOUD_ACCESS_KEY_ID - RAM sub-user Access Key ID + ALIBABA_CLOUD_ACCESS_KEY_SECRET - RAM sub-user Access Key Secret +""" + +import argparse +import json +import os +import sys + +try: + from alibabacloud_alidns20150109.client import Client as AlidnsClient + from alibabacloud_alidns20150109 import models as alidns_models + from alibabacloud_tea_openapi import models as open_api_models +except ImportError: + print("ERROR: Required packages not installed. Run:") + print(" pip install alibabacloud_alidns20150109 alibabacloud_tea_openapi --break-system-packages") + sys.exit(1) + + +def create_client(region_id: str = "cn-hangzhou") -> AlidnsClient: + """Create an authenticated Alidns client.""" + access_key_id = os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_ID") + access_key_secret = os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_SECRET") + + if not access_key_id or not access_key_secret: + print("ERROR: Set ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET env vars.") + sys.exit(1) + + config = open_api_models.Config( + access_key_id=access_key_id, + access_key_secret=access_key_secret, + ) + config.endpoint = f"alidns.{region_id}.aliyuncs.com" + return AlidnsClient(config) + + +def list_records(client: AlidnsClient, domain: str, rr: str = None, record_type: str = None, page_size: int = 100): + """List all DNS records for a domain, with optional filtering.""" + request = alidns_models.DescribeDomainRecordsRequest( + domain_name=domain, + page_size=page_size, + ) + if rr: + request.rrkey_word = rr + if record_type: + request.type = record_type + + try: + response = client.describe_domain_records(request) + records = response.body.domain_records.record + total = response.body.total_count + + print(f"\n📋 DNS Records for {domain} (Total: {total})\n") + print(f"{'RecordId':<20} {'RR':<25} {'Type':<8} {'Value':<40} {'TTL':<6} {'Status'}") + print("-" * 120) + + for r in records: + status_icon = "✅" if r.status == "ENABLE" else "⏸️" + rr_display = f"{r.rr}.{domain}" if r.rr != "@" else domain + print(f"{r.record_id:<20} {rr_display:<25} {r.type:<8} {r.value:<40} {r.ttl:<6} {status_icon} {r.status}") + + return records + except Exception as e: + print(f"ERROR listing records: {e}") + sys.exit(1) + + +def add_record(client: AlidnsClient, domain: str, rr: str, record_type: str, value: str, ttl: int = 600): + """Add a new DNS record.""" + request = alidns_models.AddDomainRecordRequest( + domain_name=domain, + rr=rr, + type=record_type, + value=value, + ttl=ttl, + ) + + try: + response = client.add_domain_record(request) + record_id = response.body.record_id + fqdn = f"{rr}.{domain}" if rr != "@" else domain + print(f"✅ Added: {fqdn} -> {record_type} {value} (TTL: {ttl}, RecordId: {record_id})") + return record_id + except Exception as e: + error_msg = str(e) + if "DomainRecordDuplicate" in error_msg: + print(f"⚠️ Record already exists: {rr}.{domain} {record_type} {value}") + elif "DomainRecordConflict" in error_msg: + print(f"❌ Conflict: CNAME records cannot coexist with other record types for {rr}.{domain}") + else: + print(f"ERROR adding record: {e}") + return None + + +def update_record(client: AlidnsClient, record_id: str, rr: str, record_type: str, value: str, ttl: int = 600): + """Update an existing DNS record by RecordId.""" + request = alidns_models.UpdateDomainRecordRequest( + record_id=record_id, + rr=rr, + type=record_type, + value=value, + ttl=ttl, + ) + + try: + client.update_domain_record(request) + print(f"✅ Updated RecordId {record_id}: {rr} -> {record_type} {value} (TTL: {ttl})") + except Exception as e: + print(f"ERROR updating record: {e}") + + +def delete_record(client: AlidnsClient, record_id: str): + """Delete a DNS record by RecordId.""" + request = alidns_models.DeleteDomainRecordRequest(record_id=record_id) + + try: + client.delete_domain_record(request) + print(f"✅ Deleted RecordId: {record_id}") + except Exception as e: + print(f"ERROR deleting record: {e}") + + +def set_record_status(client: AlidnsClient, record_id: str, status: str): + """Enable or disable a DNS record.""" + request = alidns_models.SetDomainRecordStatusRequest( + record_id=record_id, + status=status, # "Enable" or "Disable" + ) + + try: + client.set_domain_record_status(request) + icon = "✅" if status == "Enable" else "⏸️" + print(f"{icon} RecordId {record_id} status set to: {status}") + except Exception as e: + print(f"ERROR setting status: {e}") + + +def main(): + parser = argparse.ArgumentParser(description="Alibaba Cloud DNS Record Manager") + subparsers = parser.add_subparsers(dest="action", help="Action to perform") + + # List + list_parser = subparsers.add_parser("list", help="List DNS records") + list_parser.add_argument("--domain", required=True, help="Domain name (e.g., gogenex.com)") + list_parser.add_argument("--rr", help="Filter by subdomain prefix") + list_parser.add_argument("--type", help="Filter by record type (A, CNAME, etc.)") + list_parser.add_argument("--region", default="cn-hangzhou", help="Alidns region (default: cn-hangzhou)") + + # Add + add_parser = subparsers.add_parser("add", help="Add a DNS record") + add_parser.add_argument("--domain", required=True) + add_parser.add_argument("--rr", required=True, help="Subdomain prefix (e.g., api, www, @)") + add_parser.add_argument("--type", required=True, help="Record type (A, AAAA, CNAME, MX, TXT, etc.)") + add_parser.add_argument("--value", required=True, help="Record value (IP, domain, text)") + add_parser.add_argument("--ttl", type=int, default=600, help="TTL in seconds (default: 600)") + add_parser.add_argument("--region", default="cn-hangzhou") + + # Update + update_parser = subparsers.add_parser("update", help="Update a DNS record") + update_parser.add_argument("--record-id", required=True, help="RecordId to update") + update_parser.add_argument("--rr", required=True) + update_parser.add_argument("--type", required=True) + update_parser.add_argument("--value", required=True) + update_parser.add_argument("--ttl", type=int, default=600) + update_parser.add_argument("--region", default="cn-hangzhou") + + # Delete + delete_parser = subparsers.add_parser("delete", help="Delete a DNS record") + delete_parser.add_argument("--record-id", required=True, help="RecordId to delete") + delete_parser.add_argument("--region", default="cn-hangzhou") + + # Enable / Disable + enable_parser = subparsers.add_parser("enable", help="Enable a DNS record") + enable_parser.add_argument("--record-id", required=True) + enable_parser.add_argument("--region", default="cn-hangzhou") + + disable_parser = subparsers.add_parser("disable", help="Disable a DNS record") + disable_parser.add_argument("--record-id", required=True) + disable_parser.add_argument("--region", default="cn-hangzhou") + + args = parser.parse_args() + + if not args.action: + parser.print_help() + sys.exit(1) + + client = create_client(args.region) + + if args.action == "list": + list_records(client, args.domain, rr=args.rr, record_type=getattr(args, "type", None)) + elif args.action == "add": + add_record(client, args.domain, args.rr, args.type, args.value, args.ttl) + elif args.action == "update": + update_record(client, args.record_id, args.rr, args.type, args.value, args.ttl) + elif args.action == "delete": + delete_record(client, args.record_id) + elif args.action == "enable": + set_record_status(client, args.record_id, "Enable") + elif args.action == "disable": + set_record_status(client, args.record_id, "Disable") + + +if __name__ == "__main__": + main() diff --git a/scripts/namecheap_batch.py b/scripts/namecheap_batch.py new file mode 100644 index 0000000..6c76a5b --- /dev/null +++ b/scripts/namecheap_batch.py @@ -0,0 +1,401 @@ +#!/usr/bin/env python3 +""" +Namecheap DNS Batch Provisioner +Reads a JSON plan file and replaces all DNS records in one atomic operation. + +CRITICAL: Namecheap setHosts OVERWRITES all records. This script: + 1. Backs up current records before any changes + 2. Validates the plan for common errors + 3. Performs a dry-run by default + 4. Requires explicit --apply flag to make changes + +Usage: + python namecheap_batch.py --domain gogenex.com --plan plan.json # dry-run (default) + python namecheap_batch.py --domain gogenex.com --plan plan.json --apply # apply changes + python namecheap_batch.py --domain gogenex.com --plan plan.json --merge # merge with existing + python namecheap_batch.py --domain gogenex.com --diff plan.json # diff plan vs current + python namecheap_batch.py --domain gogenex.com --export current.json # export current state + +Plan JSON format: Same as aliyun-dns skill (see references/plan-schema.md) + +Environment Variables: + NAMECHEAP_API_USER - Namecheap username + NAMECHEAP_API_KEY - Namecheap API key + NAMECHEAP_CLIENT_IP - Whitelisted IPv4 address +""" + +import argparse +import ipaddress +import json +import os +import sys +import time +import urllib.parse +import urllib.request +import xml.etree.ElementTree as ET +from datetime import datetime + + +# ────────────────────────────────────────────── +# Configuration & Helpers +# ────────────────────────────────────────────── + +API_URL_PROD = "https://api.namecheap.com/xml.response" +API_URL_SANDBOX = "https://api.sandbox.namecheap.com/xml.response" + +PRIVATE_RANGES = [ + ipaddress.ip_network("10.0.0.0/8"), + ipaddress.ip_network("172.16.0.0/12"), + ipaddress.ip_network("192.168.0.0/16"), + ipaddress.ip_network("100.64.0.0/10"), +] + +# Namecheap supported record types via API +SUPPORTED_TYPES = {"A", "AAAA", "CNAME", "MX", "MXE", "TXT", "URL", "URL301", "FRAME", "NS"} + + +def is_private_ip(ip_str: str) -> bool: + try: + ip = ipaddress.ip_address(ip_str) + return any(ip in net for net in PRIVATE_RANGES) + except ValueError: + return False + + +def get_config(sandbox: bool = False): + api_user = os.environ.get("NAMECHEAP_API_USER") + api_key = os.environ.get("NAMECHEAP_API_KEY") + client_ip = os.environ.get("NAMECHEAP_CLIENT_IP") + + if not all([api_user, api_key, client_ip]): + print("ERROR: Set NAMECHEAP_API_USER, NAMECHEAP_API_KEY, and NAMECHEAP_CLIENT_IP env vars.") + sys.exit(1) + + return { + "api_user": api_user, + "api_key": api_key, + "client_ip": client_ip, + "base_url": API_URL_SANDBOX if sandbox else API_URL_PROD, + } + + +def split_domain(domain: str) -> tuple: + parts = domain.rsplit(".", 1) + if len(parts) != 2: + print(f"ERROR: Cannot split domain '{domain}'.") + sys.exit(1) + return parts[0], parts[1] + + +# ────────────────────────────────────────────── +# API Calls +# ────────────────────────────────────────────── + +def api_call(config: dict, command: str, extra_params: dict = None) -> ET.Element: + params = { + "ApiUser": config["api_user"], + "ApiKey": config["api_key"], + "UserName": config["api_user"], + "ClientIp": config["client_ip"], + "Command": command, + } + if extra_params: + params.update(extra_params) + + url = f"{config['base_url']}?{urllib.parse.urlencode(params)}" + + try: + req = urllib.request.Request(url) + with urllib.request.urlopen(req, timeout=30) as response: + xml_data = response.read().decode("utf-8") + except Exception as e: + print(f"ERROR: API call failed: {e}") + sys.exit(1) + + root = ET.fromstring(xml_data) + status = root.attrib.get("Status", "") + if status == "ERROR": + for elem in root.iter(): + if "Error" in elem.tag: + print(f"ERROR [{elem.attrib.get('Number', '?')}]: {elem.text}") + sys.exit(1) + + return root + + +def get_hosts(config: dict, domain: str) -> list: + sld, tld = split_domain(domain) + root = api_call(config, "namecheap.domains.dns.getHosts", {"SLD": sld, "TLD": tld}) + + records = [] + for elem in root.iter(): + if "Host" in elem.tag and elem.attrib.get("Name"): + records.append({ + "Name": elem.attrib.get("Name", ""), + "Type": elem.attrib.get("Type", ""), + "Address": elem.attrib.get("Address", ""), + "MXPref": elem.attrib.get("MXPref", "10"), + "TTL": elem.attrib.get("TTL", "1800"), + }) + + return records + + +def set_hosts(config: dict, domain: str, records: list) -> bool: + sld, tld = split_domain(domain) + params = {"SLD": sld, "TLD": tld} + + for i, rec in enumerate(records, 1): + params[f"HostName{i}"] = rec["Name"] + params[f"RecordType{i}"] = rec["Type"] + params[f"Address{i}"] = rec["Address"] + params[f"TTL{i}"] = rec.get("TTL", "1800") + if rec["Type"] in ("MX", "MXE"): + params[f"MXPref{i}"] = rec.get("MXPref", "10") + + api_call(config, "namecheap.domains.dns.setHosts", params) + return True + + +# ────────────────────────────────────────────── +# Plan Operations +# ────────────────────────────────────────────── + +def load_plan(plan_file: str) -> list: + """Load and normalize a plan JSON to Namecheap host format.""" + with open(plan_file, "r", encoding="utf-8") as f: + raw_plan = json.load(f) + + records = [] + for entry in raw_plan: + rr = entry.get("rr", entry.get("Name", "")) + record_type = entry.get("type", entry.get("Type", "")) + value = entry.get("value", entry.get("Address", "")) + ttl = str(entry.get("ttl", entry.get("TTL", 1800))) + + records.append({ + "Name": rr, + "Type": record_type, + "Address": value, + "TTL": ttl, + "MXPref": str(entry.get("MXPref", "10")), + "_note": entry.get("note", ""), + "_scope": entry.get("scope", "public"), + }) + + return records + + +def validate_plan(records: list, domain: str) -> list: + """Validate plan entries and return warnings.""" + warnings = [] + seen = set() + + for i, rec in enumerate(records): + name = rec.get("Name", "") + rtype = rec.get("Type", "") + value = rec.get("Address", "") + scope = rec.get("_scope", "public") + + # Required fields + if not name or not rtype or not value: + warnings.append(f"❌ Entry {i}: missing Name, Type, or Address") + continue + + # Placeholder check + if "<" in value or ">" in value: + warnings.append(f"❌ Entry {i} ({name}): value contains placeholder: {value}") + + # Unsupported type + if rtype.upper() == "SRV": + warnings.append(f"❌ Entry {i} ({name}): SRV records not supported by Namecheap API") + + if rtype not in SUPPORTED_TYPES: + warnings.append(f"⚠️ Entry {i} ({name}): record type '{rtype}' may not be supported") + + # Duplicate + key = (name, rtype) + if key in seen and rtype not in ("MX", "TXT", "NS"): + warnings.append(f"⚠️ Entry {i}: duplicate {name} {rtype}") + seen.add(key) + + # Scope vs IP check + if scope == "private" and rtype == "A" and not is_private_ip(value): + warnings.append(f"🔒 Entry {i} ({name}): marked private but has public IP {value}") + + return warnings + + +def backup_records(records: list, domain: str) -> str: + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + backup_file = f"backup_{domain}_{timestamp}.json" + with open(backup_file, "w", encoding="utf-8") as f: + json.dump(records, f, indent=2, ensure_ascii=False) + return backup_file + + +def apply_plan(config: dict, domain: str, plan_records: list, merge: bool = False, dry_run: bool = True): + """Apply a DNS plan to Namecheap.""" + mode = "DRY RUN" if dry_run else "APPLY" + strategy = "MERGE" if merge else "REPLACE" + + print(f"\n{'🔍' if dry_run else '🚀'} {mode} ({strategy}) for {domain}") + print(f" Records in plan: {len(plan_records)}\n") + + # Validate + warnings = validate_plan(plan_records, domain) + if warnings: + print("⚠️ Validation warnings:") + for w in warnings: + print(f" {w}") + print() + if any(w.startswith("❌") for w in warnings): + print("❌ Fix errors before applying.") + return + + # Fetch current records + print("📡 Fetching current records...") + current = get_hosts(config, domain) + print(f" Found {len(current)} existing records\n") + + # Build final record set + if merge: + # Merge: keep existing records not in plan, add/update from plan + plan_keys = {(r["Name"], r["Type"]) for r in plan_records} + # Keep records not covered by plan + final = [r for r in current if (r["Name"], r["Type"]) not in plan_keys] + # Add all plan records + final.extend([{ + "Name": r["Name"], + "Type": r["Type"], + "Address": r["Address"], + "TTL": r.get("TTL", "1800"), + "MXPref": r.get("MXPref", "10"), + } for r in plan_records]) + print(f" 📋 Merge result: {len(final)} total records") + print(f" Kept from existing: {len(final) - len(plan_records)}") + print(f" From plan: {len(plan_records)}") + else: + # Replace: plan becomes the entire record set + final = [{ + "Name": r["Name"], + "Type": r["Type"], + "Address": r["Address"], + "TTL": r.get("TTL", "1800"), + "MXPref": r.get("MXPref", "10"), + } for r in plan_records] + print(f" ⚠️ REPLACE mode: {len(current)} existing records will be replaced with {len(final)} plan records") + + # Show changes + print(f"\n Final record set:") + for r in final: + fqdn = f"{r['Name']}.{domain}" if r["Name"] != "@" else domain + print(f" {fqdn:<35} {r['Type']:<8} {r['Address']}") + + if dry_run: + print(f"\n💡 Run with --apply to execute these changes.") + return + + # Backup before write + backup_file = backup_records(current, domain) + print(f"\n 💾 Backup saved: {backup_file}") + + # Write + print(f" 📤 Writing {len(final)} records...") + set_hosts(config, domain, final) + print(f" ✅ Done! All records updated successfully.") + print(f" 💡 To rollback, use: python namecheap_batch.py --domain {domain} --plan {backup_file} --apply") + + +def diff_plan(config: dict, domain: str, plan_file: str): + """Show differences between plan and current state.""" + plan_records = load_plan(plan_file) + current = get_hosts(config, domain) + + print(f"\n🔍 Diff: {plan_file} vs live records for {domain}\n") + + current_map = {} + for r in current: + key = (r["Name"], r["Type"]) + current_map[key] = r + + plan_map = {} + for r in plan_records: + key = (r["Name"], r["Type"]) + plan_map[key] = r + + # Records in plan + for key, rec in sorted(plan_map.items()): + fqdn = f"{key[0]}.{domain}" if key[0] != "@" else domain + if key in current_map: + curr = current_map[key] + if curr["Address"] != rec["Address"]: + print(f" ✏️ CHANGE {fqdn:<35} {key[1]:<6} {curr['Address']} -> {rec['Address']}") + elif curr.get("TTL", "1800") != rec.get("TTL", "1800"): + print(f" ⏱️ TTL {fqdn:<35} {key[1]:<6} TTL {curr['TTL']} -> {rec.get('TTL', '1800')}") + else: + print(f" ✅ OK {fqdn:<35} {key[1]:<6} {rec['Address']}") + else: + print(f" ➕ NEW {fqdn:<35} {key[1]:<6} {rec['Address']}") + + # Records only in current (would be deleted in REPLACE mode) + for key, rec in sorted(current_map.items()): + if key not in plan_map: + fqdn = f"{key[0]}.{domain}" if key[0] != "@" else domain + print(f" 🗑️ REMOVE {fqdn:<35} {key[1]:<6} {rec['Address']} (not in plan — will be deleted in REPLACE mode)") + + +def export_current(config: dict, domain: str, output_file: str): + """Export current records to plan JSON format.""" + records = get_hosts(config, domain) + plan = [] + for r in records: + plan.append({ + "rr": r["Name"], + "type": r["Type"], + "value": r["Address"], + "ttl": int(r.get("TTL", 1800)), + "scope": "private" if (r["Type"] == "A" and is_private_ip(r["Address"])) else "public", + "note": "", + }) + + with open(output_file, "w", encoding="utf-8") as f: + json.dump(plan, f, indent=2, ensure_ascii=False) + print(f"✅ Exported {len(plan)} records to {output_file}") + + +# ────────────────────────────────────────────── +# CLI +# ────────────────────────────────────────────── + +def main(): + parser = argparse.ArgumentParser( + description="Namecheap DNS Batch Provisioner (SAFE: dry-run by default)", + epilog="⚠️ REMEMBER: Namecheap setHosts OVERWRITES all records. Use --merge to keep existing records." + ) + parser.add_argument("--domain", required=True, help="Domain name (e.g., gogenex.com)") + parser.add_argument("--sandbox", action="store_true", help="Use sandbox API") + + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument("--plan", help="JSON plan file to apply") + group.add_argument("--diff", help="Diff plan vs current state") + group.add_argument("--export", help="Export current records to JSON") + + parser.add_argument("--apply", action="store_true", help="Actually apply changes (default is dry-run)") + parser.add_argument("--merge", action="store_true", + help="Merge plan with existing records instead of replacing") + + args = parser.parse_args() + config = get_config(sandbox=args.sandbox) + + if args.plan: + plan_records = load_plan(args.plan) + apply_plan(config, args.domain, plan_records, merge=args.merge, dry_run=not args.apply) + elif args.diff: + diff_plan(config, args.domain, args.diff) + elif args.export: + export_current(config, args.domain, args.export) + + +if __name__ == "__main__": + main() diff --git a/scripts/namecheap_manager.py b/scripts/namecheap_manager.py new file mode 100644 index 0000000..82669f2 --- /dev/null +++ b/scripts/namecheap_manager.py @@ -0,0 +1,392 @@ +#!/usr/bin/env python3 +""" +Namecheap DNS Record Manager (Safe Read-Merge-Write Pattern) + +CRITICAL: Namecheap's setHosts API OVERWRITES all records. +This script always reads existing records first, merges changes, then writes back. + +Usage: + python namecheap_manager.py list --domain example.com + python namecheap_manager.py add --domain example.com --name api --type A --value 1.2.3.4 [--ttl 1800] + python namecheap_manager.py update --domain example.com --name api --type A --value 5.6.7.8 + python namecheap_manager.py delete --domain example.com --name api --type A + python namecheap_manager.py set-ns --domain example.com --nameservers ns1.cf.com ns2.cf.com + python namecheap_manager.py export --domain example.com --output records.json + +Environment Variables: + NAMECHEAP_API_USER - Namecheap username + NAMECHEAP_API_KEY - Namecheap API key + NAMECHEAP_CLIENT_IP - Whitelisted IPv4 address +""" + +import argparse +import json +import os +import sys +import xml.etree.ElementTree as ET +from datetime import datetime + +try: + import urllib.request + import urllib.parse +except ImportError: + pass + + +# ────────────────────────────────────────────── +# Configuration +# ────────────────────────────────────────────── + +API_URL_PROD = "https://api.namecheap.com/xml.response" +API_URL_SANDBOX = "https://api.sandbox.namecheap.com/xml.response" +NS = {"nc": "http://api.namecheap.com/xml.response"} +NS_HTTPS = {"nc": "https://api.namecheap.com/xml.response"} + + +def get_config(sandbox: bool = False): + api_user = os.environ.get("NAMECHEAP_API_USER") + api_key = os.environ.get("NAMECHEAP_API_KEY") + client_ip = os.environ.get("NAMECHEAP_CLIENT_IP") + + if not all([api_user, api_key, client_ip]): + print("ERROR: Set NAMECHEAP_API_USER, NAMECHEAP_API_KEY, and NAMECHEAP_CLIENT_IP env vars.") + sys.exit(1) + + return { + "api_user": api_user, + "api_key": api_key, + "client_ip": client_ip, + "base_url": API_URL_SANDBOX if sandbox else API_URL_PROD, + } + + +def split_domain(domain: str) -> tuple: + """Split domain into SLD and TLD. e.g., 'gogenex.com' -> ('gogenex', 'com')""" + parts = domain.rsplit(".", 1) + if len(parts) != 2: + print(f"ERROR: Cannot split domain '{domain}'. Expected format: 'example.com'") + sys.exit(1) + return parts[0], parts[1] + + +# ────────────────────────────────────────────── +# API Calls +# ────────────────────────────────────────────── + +def api_call(config: dict, command: str, extra_params: dict = None) -> ET.Element: + """Make a Namecheap API call and return the parsed XML root.""" + params = { + "ApiUser": config["api_user"], + "ApiKey": config["api_key"], + "UserName": config["api_user"], + "ClientIp": config["client_ip"], + "Command": command, + } + if extra_params: + params.update(extra_params) + + url = f"{config['base_url']}?{urllib.parse.urlencode(params)}" + + try: + req = urllib.request.Request(url) + with urllib.request.urlopen(req, timeout=30) as response: + xml_data = response.read().decode("utf-8") + except Exception as e: + print(f"ERROR: API call failed: {e}") + sys.exit(1) + + root = ET.fromstring(xml_data) + + # Check for errors (handle both http and https namespace) + status = root.attrib.get("Status", "") + if status == "ERROR": + # Try to find error message + for ns_dict in [NS, NS_HTTPS]: + for ns_prefix, ns_uri in ns_dict.items(): + errors = root.findall(f".//{{{ns_uri}}}Error") + for err in errors: + print(f"ERROR [{err.attrib.get('Number', '?')}]: {err.text}") + sys.exit(1) + + return root + + +def get_hosts(config: dict, domain: str) -> list: + """Fetch all DNS host records for a domain.""" + sld, tld = split_domain(domain) + root = api_call(config, "namecheap.domains.dns.getHosts", {"SLD": sld, "TLD": tld}) + + records = [] + # Search in both namespace variants, both cases (Host and host) + for ns_uri in [NS["nc"], NS_HTTPS["nc"]]: + for tag in ["Host", "host"]: + hosts = root.findall(f".//{{{ns_uri}}}{tag}") + for host in hosts: + records.append({ + "HostId": host.attrib.get("HostId", ""), + "Name": host.attrib.get("Name", ""), + "Type": host.attrib.get("Type", ""), + "Address": host.attrib.get("Address", ""), + "MXPref": host.attrib.get("MXPref", "10"), + "TTL": host.attrib.get("TTL", "1800"), + }) + if records: + break + if records: + break + + # Fallback: try without namespace, both cases + if not records: + for tag in ["Host", "host"]: + for host in root.iter(tag): + records.append({ + "HostId": host.attrib.get("HostId", ""), + "Name": host.attrib.get("Name", ""), + "Type": host.attrib.get("Type", ""), + "Address": host.attrib.get("Address", ""), + "MXPref": host.attrib.get("MXPref", "10"), + "TTL": host.attrib.get("TTL", "1800"), + }) + if records: + break + + return records + + +def set_hosts(config: dict, domain: str, records: list) -> bool: + """Set ALL DNS host records for a domain. This OVERWRITES everything.""" + sld, tld = split_domain(domain) + params = {"SLD": sld, "TLD": tld} + + for i, rec in enumerate(records, 1): + params[f"HostName{i}"] = rec["Name"] + params[f"RecordType{i}"] = rec["Type"] + params[f"Address{i}"] = rec["Address"] + params[f"TTL{i}"] = rec.get("TTL", "1800") + if rec["Type"] in ("MX", "MXE"): + params[f"MXPref{i}"] = rec.get("MXPref", "10") + + root = api_call(config, "namecheap.domains.dns.setHosts", params) + return True + + +def set_nameservers(config: dict, domain: str, nameservers: list): + """Set custom nameservers for a domain.""" + sld, tld = split_domain(domain) + params = { + "SLD": sld, + "TLD": tld, + "Nameservers": ",".join(nameservers), + } + api_call(config, "namecheap.domains.dns.setCustom", params) + return True + + +# ────────────────────────────────────────────── +# Safe Operations (Read-Merge-Write) +# ────────────────────────────────────────────── + +def backup_records(records: list, domain: str): + """Save a backup of current records before modification.""" + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + backup_file = f"backup_{domain}_{timestamp}.json" + with open(backup_file, "w", encoding="utf-8") as f: + json.dump(records, f, indent=2) + print(f" 💾 Backup saved: {backup_file}") + return backup_file + + +def safe_add(config: dict, domain: str, name: str, record_type: str, value: str, ttl: str = "1800"): + """Safely add a record by reading existing, appending, and writing back.""" + print(f"\n📡 Reading current records for {domain}...") + current = get_hosts(config, domain) + print(f" Found {len(current)} existing records") + + # Check for duplicate + for r in current: + if r["Name"] == name and r["Type"] == record_type and r["Address"] == value: + print(f" ⚠️ Record already exists: {name} {record_type} {value}") + return + + # Backup before modification + backup_records(current, domain) + + # Append new record + new_record = { + "Name": name, + "Type": record_type, + "Address": value, + "TTL": ttl, + "MXPref": "10", + } + updated = current + [new_record] + + fqdn = f"{name}.{domain}" if name != "@" else domain + print(f" ➕ Adding: {fqdn} -> {record_type} {value}") + print(f" 📤 Writing {len(updated)} records back...") + + set_hosts(config, domain, updated) + print(f" ✅ Done! Record added successfully.") + + +def safe_update(config: dict, domain: str, name: str, record_type: str, value: str, ttl: str = None): + """Safely update a record by reading, modifying, and writing back.""" + print(f"\n📡 Reading current records for {domain}...") + current = get_hosts(config, domain) + + backup_records(current, domain) + + found = False + for r in current: + if r["Name"] == name and r["Type"] == record_type: + old_value = r["Address"] + r["Address"] = value + if ttl: + r["TTL"] = ttl + found = True + fqdn = f"{name}.{domain}" if name != "@" else domain + print(f" ✏️ Updating: {fqdn} {record_type} {old_value} -> {value}") + break + + if not found: + print(f" ❌ Record not found: {name} {record_type}") + return + + print(f" 📤 Writing {len(current)} records back...") + set_hosts(config, domain, current) + print(f" ✅ Done! Record updated successfully.") + + +def safe_delete(config: dict, domain: str, name: str, record_type: str): + """Safely delete a record by reading, removing, and writing back.""" + print(f"\n📡 Reading current records for {domain}...") + current = get_hosts(config, domain) + + backup_records(current, domain) + + original_count = len(current) + updated = [r for r in current if not (r["Name"] == name and r["Type"] == record_type)] + + if len(updated) == original_count: + print(f" ❌ Record not found: {name} {record_type}") + return + + fqdn = f"{name}.{domain}" if name != "@" else domain + print(f" 🗑️ Deleting: {fqdn} {record_type}") + print(f" 📤 Writing {len(updated)} records back (was {original_count})...") + + set_hosts(config, domain, updated) + print(f" ✅ Done! Record deleted successfully.") + + +# ────────────────────────────────────────────── +# Display & Export +# ────────────────────────────────────────────── + +def display_records(records: list, domain: str): + """Pretty-print DNS records.""" + print(f"\n📋 DNS Records for {domain} (Total: {len(records)})\n") + print(f"{'HostId':<10} {'Name':<30} {'Type':<8} {'Address':<40} {'TTL'}") + print("-" * 100) + + for r in records: + fqdn = f"{r['Name']}.{domain}" if r["Name"] != "@" else domain + print(f"{r['HostId']:<10} {fqdn:<30} {r['Type']:<8} {r['Address']:<40} {r['TTL']}") + + +def export_records(records: list, domain: str, output_file: str): + """Export records to JSON plan format.""" + plan = [] + for r in records: + plan.append({ + "rr": r["Name"], + "type": r["Type"], + "value": r["Address"], + "ttl": int(r.get("TTL", 1800)), + "note": "", + }) + + with open(output_file, "w", encoding="utf-8") as f: + json.dump(plan, f, indent=2, ensure_ascii=False) + + print(f"✅ Exported {len(plan)} records to {output_file}") + + +# ────────────────────────────────────────────── +# CLI +# ────────────────────────────────────────────── + +def main(): + parser = argparse.ArgumentParser(description="Namecheap DNS Record Manager (Safe Read-Merge-Write)") + parser.add_argument("--sandbox", action="store_true", help="Use sandbox API endpoint") + subparsers = parser.add_subparsers(dest="action", help="Action to perform") + + # List + list_p = subparsers.add_parser("list", help="List all DNS records") + list_p.add_argument("--domain", required=True) + + # Add + add_p = subparsers.add_parser("add", help="Add a DNS record (safe merge)") + add_p.add_argument("--domain", required=True) + add_p.add_argument("--name", required=True, help="Hostname (e.g., api, www, @)") + add_p.add_argument("--type", required=True, help="Record type (A, AAAA, CNAME, MX, TXT, etc.)") + add_p.add_argument("--value", required=True, help="Record value") + add_p.add_argument("--ttl", default="1800", help="TTL in seconds (default: 1800)") + + # Update + update_p = subparsers.add_parser("update", help="Update a DNS record") + update_p.add_argument("--domain", required=True) + update_p.add_argument("--name", required=True) + update_p.add_argument("--type", required=True) + update_p.add_argument("--value", required=True) + update_p.add_argument("--ttl", help="New TTL (optional)") + + # Delete + delete_p = subparsers.add_parser("delete", help="Delete a DNS record") + delete_p.add_argument("--domain", required=True) + delete_p.add_argument("--name", required=True) + delete_p.add_argument("--type", required=True) + + # Set nameservers + ns_p = subparsers.add_parser("set-ns", help="Set custom nameservers") + ns_p.add_argument("--domain", required=True) + ns_p.add_argument("--nameservers", nargs="+", required=True, help="Nameserver domains") + + # Export + export_p = subparsers.add_parser("export", help="Export records to JSON") + export_p.add_argument("--domain", required=True) + export_p.add_argument("--output", required=True, help="Output JSON file path") + + args = parser.parse_args() + + if not args.action: + parser.print_help() + sys.exit(1) + + config = get_config(sandbox=args.sandbox) + + if args.action == "list": + records = get_hosts(config, args.domain) + display_records(records, args.domain) + + elif args.action == "add": + safe_add(config, args.domain, args.name, args.type, args.value, args.ttl) + + elif args.action == "update": + safe_update(config, args.domain, args.name, args.type, args.value, args.ttl) + + elif args.action == "delete": + safe_delete(config, args.domain, args.name, args.type) + + elif args.action == "set-ns": + print(f"\n🔀 Setting nameservers for {args.domain}: {', '.join(args.nameservers)}") + set_nameservers(config, args.domain, args.nameservers) + print(f"✅ Nameservers updated. Note: URL/Email forwarding and DDNS will stop working.") + + elif args.action == "export": + records = get_hosts(config, args.domain) + export_records(records, args.domain, args.output) + + +if __name__ == "__main__": + main()