#!/usr/bin/env python3 """ Idempotently create Pangolin public HTTP resources + Traefik targets via the Integration API. Docs: https://docs.pangolin.net/manage/integration-api Walkthrough: https://docs.pangolin.net/manage/common-api-routes Environment (or --env-file) can set: NOBLE_PANGOLIN_API_BASE, NOBLE_PANGOLIN_ORG_ID, NOBLE_PANGOLIN_API_TOKEN, NOBLE_PANGOLIN_SITE_ID, NOBLE_PANGOLIN_TRAEFIK_IP, NOBLE_PANGOLIN_TRAEFIK_PORT (default 443) CLI overrides env. FQDNs: --fqdns a.example.com,b.example.com (required). """ from __future__ import annotations import argparse import json import os import sys import urllib.error import urllib.request from typing import Any def load_dotenv(path: str) -> dict[str, str]: out: dict[str, str] = {} with open(path, encoding="utf-8") as f: for raw in f: line = raw.strip() if not line or line.startswith("#"): continue if "=" not in line: continue k, v = line.split("=", 1) k = k.strip() v = v.strip().strip('"').strip("'") if k: out[k] = v return out def api_request( method: str, url: str, token: str, body: dict[str, Any] | None = None, ) -> dict[str, Any]: data = None if body is None else json.dumps(body).encode("utf-8") req = urllib.request.Request( url, data=data, method=method, headers={ "Authorization": f"Bearer {token}", "Accept": "application/json", **({"Content-Type": "application/json"} if body is not None else {}), }, ) try: with urllib.request.urlopen(req, timeout=120) as resp: payload = resp.read().decode("utf-8") except urllib.error.HTTPError as e: detail = e.read().decode("utf-8", errors="replace") raise SystemExit(f"HTTP {e.code} {method} {url}\n{detail}") from e if not payload: return {} return json.loads(payload) def unwrap(resp: dict[str, Any]) -> Any: if resp.get("error") or resp.get("success") is False: raise SystemExit(f"Pangolin API error: {resp!r}") return resp.get("data") def normalize_fqdn(s: str) -> str: return s.lower().strip().rstrip(".") def resolve_domain( fqdn: str, domains: list[dict[str, Any]] ) -> tuple[str, str | None]: """Return (domainId, subdomain) for an HTTP resource. subdomain is None for apex / cname-only domains.""" fqdn = normalize_fqdn(fqdn) best: tuple[str, str | None, int] | None = None for d in domains: bd = (d.get("baseDomain") or "").strip() if not bd: continue bd_n = normalize_fqdn(bd) if fqdn == bd_n: cand = (str(d["domainId"]), None, len(bd_n)) elif fqdn.endswith("." + bd_n): sub = fqdn[: -(len(bd_n) + 1)] if not sub: continue cand = (str(d["domainId"]), sub, len(bd_n)) else: continue if best is None or cand[2] > best[2]: best = cand if best is None: avail = ", ".join(sorted({normalize_fqdn(d.get("baseDomain") or "") for d in domains if d.get("baseDomain")})) raise SystemExit( f"No linked Pangolin domain matches FQDN {fqdn!r}. " f"Link the parent domain in Pangolin first. Known baseDomain values: {avail or '(none)'}" ) return best[0], best[1] def list_all_resources(api_base: str, org_id: str, token: str) -> list[dict[str, Any]]: out: list[dict[str, Any]] = [] page = 1 page_size = 100 while True: url = f"{api_base.rstrip('/')}/org/{org_id}/resources?page={page}&pageSize={page_size}" data = unwrap(api_request("GET", url, token)) if isinstance(data, list): out.extend(data) break if not isinstance(data, dict): break batch = data.get("resources") or data.get("items") or [] if not isinstance(batch, list): break out.extend(batch) pag = data.get("pagination") or {} total = pag.get("total") if isinstance(total, int) and len(out) >= total: break if len(batch) < page_size: break page += 1 if page > 500: raise SystemExit("Pagination safety stop (>500 pages)") return out def resource_public_fqdn(res: dict[str, Any]) -> str | None: fd = res.get("fullDomain") if isinstance(fd, str) and fd.strip(): return normalize_fqdn(fd) sub = res.get("subdomain") dom = res.get("domain") or {} bd = dom.get("baseDomain") if isinstance(dom, dict) else None if isinstance(bd, str) and bd.strip(): bd_n = normalize_fqdn(bd) if sub is None or sub == "": return bd_n if isinstance(sub, str): return normalize_fqdn(f"{sub}.{bd_n}") return None def find_resource_for_fqdn(resources: list[dict[str, Any]], fqdn: str) -> dict[str, Any] | None: target = normalize_fqdn(fqdn) for r in resources: if not r.get("http"): continue got = resource_public_fqdn(r) if got == target: return r return None def list_targets(api_base: str, resource_id: int, token: str) -> list[dict[str, Any]]: url = f"{api_base.rstrip('/')}/resource/{resource_id}/targets" data = unwrap(api_request("GET", url, token)) if isinstance(data, list): return data if isinstance(data, dict): return data.get("targets") or data.get("items") or [] return [] def target_matches(t: dict[str, Any], site_id: int, ip: str, port: int) -> bool: return ( int(t.get("siteId") or 0) == site_id and str(t.get("ip") or "") == ip and int(t.get("port") or 0) == port ) def main() -> None: ap = argparse.ArgumentParser(description=__doc__) ap.add_argument("--env-file", help="Parse KEY=value lines (optional overrides for env)") ap.add_argument("--api-base", default=os.environ.get("NOBLE_PANGOLIN_API_BASE", "")) ap.add_argument("--org-id", default=os.environ.get("NOBLE_PANGOLIN_ORG_ID", "")) ap.add_argument("--token", default=os.environ.get("NOBLE_PANGOLIN_API_TOKEN", "")) ap.add_argument("--site-id", default=os.environ.get("NOBLE_PANGOLIN_SITE_ID", "")) ap.add_argument("--traefik-ip", default=os.environ.get("NOBLE_PANGOLIN_TRAEFIK_IP", "")) ap.add_argument( "--traefik-port", type=int, default=int(os.environ.get("NOBLE_PANGOLIN_TRAEFIK_PORT", "443") or 443), ) ap.add_argument( "--fqdns", required=True, help="Comma-separated public FQDNs (must match linked Pangolin domains / SANs)", ) ap.add_argument( "--dry-run", action="store_true", help="Print actions only; do not call mutating endpoints", ) args = ap.parse_args() if args.env_file: for k, v in load_dotenv(args.env_file).items(): if k.startswith("NOBLE_PANGOLIN_"): os.environ.setdefault(k, v) args.api_base = args.api_base or os.environ.get("NOBLE_PANGOLIN_API_BASE", "") args.org_id = args.org_id or os.environ.get("NOBLE_PANGOLIN_ORG_ID", "") args.token = args.token or os.environ.get("NOBLE_PANGOLIN_API_TOKEN", "") args.site_id = args.site_id or os.environ.get("NOBLE_PANGOLIN_SITE_ID", "") args.traefik_ip = args.traefik_ip or os.environ.get("NOBLE_PANGOLIN_TRAEFIK_IP", "") missing = [ n for n, v in [ ("NOBLE_PANGOLIN_API_BASE", args.api_base), ("NOBLE_PANGOLIN_ORG_ID", args.org_id), ("NOBLE_PANGOLIN_API_TOKEN", args.token), ("NOBLE_PANGOLIN_SITE_ID", args.site_id), ("NOBLE_PANGOLIN_TRAEFIK_IP", args.traefik_ip), ] if not str(v).strip() ] if missing: raise SystemExit(f"Missing required settings: {', '.join(missing)}") fqdns = [normalize_fqdn(x) for x in args.fqdns.split(",") if x.strip()] if not fqdns: raise SystemExit("No FQDNs in --fqdns") api_base = str(args.api_base).rstrip("/") org_id = str(args.org_id).strip() token = str(args.token).strip() site_id = int(str(args.site_id).strip()) traefik_ip = str(args.traefik_ip).strip() traefik_port = int(args.traefik_port) dom_url = f"{api_base}/org/{org_id}/domains" domains_raw = unwrap(api_request("GET", dom_url, token)) domains: list[dict[str, Any]] = [] if isinstance(domains_raw, list): domains = domains_raw elif isinstance(domains_raw, dict): domains = domains_raw.get("domains") or [] if not isinstance(domains, list): raise SystemExit(f"Unexpected domains response: {domains_raw!r}") resources = list_all_resources(api_base, org_id, token) for fqdn in fqdns: domain_id, subdomain = resolve_domain(fqdn, domains) res = find_resource_for_fqdn(resources, fqdn) if res is None: body = { "name": f"noble {fqdn}", "http": True, "protocol": "tcp", "domainId": domain_id, **({"subdomain": subdomain} if subdomain is not None else {"subdomain": None}), } print(f"[create] resource {fqdn!r} domainId={domain_id} subdomain={subdomain!r}") if args.dry_run: continue created = unwrap( api_request( "PUT", f"{api_base}/org/{org_id}/resource", token, body, ) ) rid = int(str(created.get("resourceId", "")).strip() or 0) if not rid: raise SystemExit(f"Create resource response missing resourceId: {created!r}") print(f" -> resourceId={rid}") resources.append( { "resourceId": rid, "http": True, "fullDomain": fqdn, "subdomain": subdomain, } ) res = find_resource_for_fqdn(resources, fqdn) else: rid = int(str(res.get("resourceId", "")).strip() or 0) if not rid: raise SystemExit(f"Resource missing resourceId: {res!r}") print(f"[exists] resource {fqdn!r} resourceId={rid}") if res is None: raise SystemExit("internal: resource missing after create") rid = int(str(res.get("resourceId", "")).strip() or 0) if not rid: raise SystemExit(f"Resource missing resourceId: {res!r}") targets = list_targets(api_base, rid, token) if any(target_matches(t, site_id, traefik_ip, traefik_port) for t in targets): print(f" -> target OK site={site_id} {traefik_ip}:{traefik_port}") continue tbody = { "siteId": site_id, "ip": traefik_ip, "port": traefik_port, "method": "http", } print(f"[target] PUT /resource/{rid}/target {tbody}") if args.dry_run: continue api_request( "PUT", f"{api_base}/resource/{rid}/target", token, tbody, ) print(" -> target created") print("Done.") if __name__ == "__main__": try: main() except KeyboardInterrupt: sys.exit(130)