#!/usr/bin/env python3 """ Idempotently create Pangolin public HTTP resources + Traefik targets via the Integration API. Docs: https://docs.pangolin.net/manage/integration-api Walkthrough: https://docs.pangolin.net/manage/common-api-routes Environment (or --env-file) can set: NOBLE_PANGOLIN_API_BASE, NOBLE_PANGOLIN_ORG_ID, NOBLE_PANGOLIN_API_TOKEN — must be **apiKeyId.apiKeySecret** (one string, dot in the middle), OR set **NOBLE_PANGOLIN_API_KEY_ID** and put only the **secret** in **NOBLE_PANGOLIN_API_TOKEN**, NOBLE_PANGOLIN_SITE_ID (numeric siteId **or** Pangolin site **niceId**, e.g. unruly-asian-badger), NOBLE_PANGOLIN_TRAEFIK_IP, NOBLE_PANGOLIN_TRAEFIK_PORT (default 443) Optional TLS: NOBLE_PANGOLIN_CA_BUNDLE (path to PEM) or NOBLE_PANGOLIN_INSECURE_SKIP_TLS_VERIFY=true (homelab self-signed Integration API only — insecure). CLI overrides env. FQDNs: --fqdns a.example.com,b.example.com (required). """ from __future__ import annotations import argparse import json import os import ssl import sys import urllib.error import urllib.parse import urllib.request from typing import Any def load_dotenv(path: str) -> dict[str, str]: out: dict[str, str] = {} with open(path, encoding="utf-8") as f: for raw in f: line = raw.strip() if not line or line.startswith("#"): continue if "=" not in line: continue k, v = line.split("=", 1) k = k.strip() v = v.strip().strip('"').strip("'") if k: out[k] = v return out def env_truthy(raw: str | None) -> bool: if raw is None: return False return raw.strip().lower() in ("1", "true", "yes", "on") def normalize_api_token(raw: str) -> str: """Strip whitespace and a single accidental ``Bearer `` prefix (`.env` / copy-paste).""" t = str(raw).strip() if t.lower().startswith("bearer "): return t[7:].strip() return t def pangolin_bearer_credential(token: str, key_id: str) -> str: """ Pangolin's Integration API expects ``Authorization: Bearer {apiKeyId}.{apiKeySecret}`` (see Pangolin ``verifyApiKey`` — the part after ``Bearer`` is split on the first ``.``). """ t = normalize_api_token(token) kid = (key_id or "").strip() if "." in t: return t if kid: return f"{kid}.{t}" return t def tls_ssl_context(ca_bundle: str, insecure: bool) -> ssl.SSLContext | None: """Return custom SSL context, or None for default certificate verification.""" path = (ca_bundle or "").strip() if path: if not os.path.isfile(path): raise SystemExit(f"NOBLE_PANGOLIN_CA_BUNDLE is not a readable file: {path!r}") return ssl.create_default_context(cafile=path) if insecure: ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE return ctx return None def http_exchange( method: str, url: str, token: str, body: dict[str, Any] | None = None, ssl_context: ssl.SSLContext | None = None, ) -> tuple[int, Any]: """HTTP request; returns (status_code, parsed JSON body or dict with __raw on parse error).""" data_bytes = None if body is None else json.dumps(body).encode("utf-8") req = urllib.request.Request( url, data=data_bytes, method=method, headers={ "Authorization": f"Bearer {token}", "Accept": "application/json", **({"Content-Type": "application/json"} if body is not None else {}), }, ) try: with urllib.request.urlopen(req, timeout=120, context=ssl_context) as resp: payload = resp.read().decode("utf-8") code = int(getattr(resp, "status", None) or resp.getcode() or 200) except urllib.error.HTTPError as e: code = int(e.code) raw = e.read().decode("utf-8", errors="replace") try: return code, (json.loads(raw) if raw.strip() else {}) except json.JSONDecodeError: return code, {"__raw": raw[:2000]} except urllib.error.URLError as e: reason = str(e.reason) if getattr(e, "reason", None) is not None else str(e) if "CERTIFICATE_VERIFY_FAILED" in reason or "certificate verify failed" in reason.lower(): raise SystemExit( f"{e}\n" "TLS verification failed. For a self-signed Integration API, set in .env:\n" " NOBLE_PANGOLIN_INSECURE_SKIP_TLS_VERIFY=true\n" "or set NOBLE_PANGOLIN_CA_BUNDLE to a PEM file that trusts that API (preferred)." ) from e raise SystemExit(f"Request failed: {e}") from e if not payload.strip(): return code, {} try: return code, json.loads(payload) except json.JSONDecodeError: return code, {"__raw": payload[:2000]} def api_request( method: str, url: str, token: str, body: dict[str, Any] | None = None, ssl_context: ssl.SSLContext | None = None, ) -> Any: code, parsed = http_exchange(method, url, token, body, ssl_context) if code >= 400: detail = json.dumps(parsed) if isinstance(parsed, (dict, list)) else str(parsed) hint = "" if code in (401, 403): hint = ( "\n\nPangolin expects **Authorization: Bearer {apiKeyId}.{apiKeySecret}** (id, dot, secret " "from **Organization → API keys** when the key is created). Put **`id.secret`** in " "**`NOBLE_PANGOLIN_API_TOKEN`**, or set **`NOBLE_PANGOLIN_API_KEY_ID`** and only the secret in " "**`NOBLE_PANGOLIN_API_TOKEN`**. A browser tab uses **session cookies**, not this header.\n\n" "Also check **NOBLE_PANGOLIN_API_BASE**: this must be the **Integration API** hostname " "(e.g. `https://api.example.com/v1`), NOT the main Pangolin UI host " "(`https://pangolin.example.com/api/v1` is the session-based external API and always " "returns 401 to Bearer tokens). The Integration API runs on a **separate port** (default 3003) " "and needs its own Traefik-exposed hostname. See `flags.enable_integration_api` in Pangolin `config.yml`." ) raise SystemExit(f"HTTP {code} {method} {url}\n{detail}{hint}") return parsed def unwrap(resp: dict[str, Any]) -> Any: if resp.get("error") or resp.get("success") is False: raise SystemExit(f"Pangolin API error: {resp!r}") return resp.get("data") def normalize_fqdn(s: str) -> str: return s.lower().strip().rstrip(".") def resolve_domain( fqdn: str, domains: list[dict[str, Any]] ) -> tuple[str, str | None]: """Return (domainId, subdomain) for an HTTP resource. subdomain is None for apex / cname-only domains.""" fqdn = normalize_fqdn(fqdn) best: tuple[str, str | None, int] | None = None for d in domains: bd = (d.get("baseDomain") or "").strip() if not bd: continue bd_n = normalize_fqdn(bd) if fqdn == bd_n: cand = (str(d["domainId"]), None, len(bd_n)) elif fqdn.endswith("." + bd_n): sub = fqdn[: -(len(bd_n) + 1)] if not sub: continue cand = (str(d["domainId"]), sub, len(bd_n)) else: continue if best is None or cand[2] > best[2]: best = cand if best is None: avail = ", ".join(sorted({normalize_fqdn(d.get("baseDomain") or "") for d in domains if d.get("baseDomain")})) raise SystemExit( f"No linked Pangolin domain matches FQDN {fqdn!r}. " f"Link the parent domain in Pangolin first. Known baseDomain values: {avail or '(none)'}" ) return best[0], best[1] def list_all_resources( api_base: str, org_id: str, token: str, ssl_context: ssl.SSLContext | None, ) -> list[dict[str, Any]]: out: list[dict[str, Any]] = [] page = 1 page_size = 100 while True: url = f"{api_base.rstrip('/')}/org/{org_id}/resources?page={page}&pageSize={page_size}" data = unwrap(api_request("GET", url, token, ssl_context=ssl_context)) if isinstance(data, list): out.extend(data) break if not isinstance(data, dict): break batch = data.get("resources") or data.get("items") or [] if not isinstance(batch, list): break out.extend(batch) pag = data.get("pagination") or {} total = pag.get("total") if isinstance(total, int) and len(out) >= total: break if len(batch) < page_size: break page += 1 if page > 500: raise SystemExit("Pagination safety stop (>500 pages)") return out def list_all_sites( api_base: str, org_id: str, token: str, ssl_context: ssl.SSLContext | None, ) -> list[dict[str, Any]]: out: list[dict[str, Any]] = [] page = 1 page_size = 100 while True: url = f"{api_base.rstrip('/')}/org/{org_id}/sites?page={page}&pageSize={page_size}" data = unwrap(api_request("GET", url, token, ssl_context=ssl_context)) if isinstance(data, list): out.extend(data) break if not isinstance(data, dict): break batch = data.get("sites") or data.get("items") or [] if not isinstance(batch, list): break out.extend(batch) pag = data.get("pagination") or {} total = pag.get("total") if isinstance(total, int) and len(out) >= total: break if len(batch) < page_size: break page += 1 if page > 500: raise SystemExit("Pagination safety stop (sites >500 pages)") return out def resource_public_fqdn(res: dict[str, Any]) -> str | None: fd = res.get("fullDomain") if isinstance(fd, str) and fd.strip(): return normalize_fqdn(fd) sub = res.get("subdomain") dom = res.get("domain") or {} bd = dom.get("baseDomain") if isinstance(dom, dict) else None if isinstance(bd, str) and bd.strip(): bd_n = normalize_fqdn(bd) if sub is None or sub == "": return bd_n if isinstance(sub, str): return normalize_fqdn(f"{sub}.{bd_n}") return None def find_resource_for_fqdn(resources: list[dict[str, Any]], fqdn: str) -> dict[str, Any] | None: target = normalize_fqdn(fqdn) for r in resources: if not r.get("http"): continue got = resource_public_fqdn(r) if got == target: return r return None def list_targets( api_base: str, resource_id: int, token: str, ssl_context: ssl.SSLContext | None, ) -> list[dict[str, Any]]: url = f"{api_base.rstrip('/')}/resource/{resource_id}/targets" data = unwrap(api_request("GET", url, token, ssl_context=ssl_context)) if isinstance(data, list): return data if isinstance(data, dict): return data.get("targets") or data.get("items") or [] return [] def target_matches(t: dict[str, Any], site_id: int, ip: str, port: int) -> bool: return ( int(t.get("siteId") or 0) == site_id and str(t.get("ip") or "") == ip and int(t.get("port") or 0) == port ) def resolve_site_id( api_base: str, org_id: str, token: str, site_ref: str, ssl_context: ssl.SSLContext | None, ) -> int: """Pangolin targets need numeric siteId. Accept digits or site **niceId** (UI slug).""" ref = site_ref.strip() if not ref: raise SystemExit("NOBLE_PANGOLIN_SITE_ID is empty") if ref.isdigit(): return int(ref) slug = urllib.parse.quote(ref, safe="") url = f"{api_base.rstrip('/')}/org/{org_id}/site/{slug}" code, envelope = http_exchange("GET", url, token, None, ssl_context) sid: int | None = None if code == 200 and isinstance(envelope, dict): if not envelope.get("error") and envelope.get("success") is not False: data = envelope.get("data") if isinstance(data, dict) and data.get("siteId") is not None: sid = int(data["siteId"]) if sid is not None: return sid sites = list_all_sites(api_base, org_id, token, ssl_context) ref_l = ref.lower() for s in sites: nid = s.get("niceId") or s.get("nice_id") if isinstance(nid, str) and nid.lower() == ref_l: out = s.get("siteId") if s.get("siteId") is not None else s.get("id") if out is not None: print(f"[site] resolved niceId {ref!r} via List Sites -> siteId={int(out)}") return int(out) if code in (401, 403): msg = ( f"HTTP {code} on GET {url} and no site with niceId {ref!r} in List Sites. " "Grant **Site → Get Site** and **Site → List Sites** on the organization API key, " "or set **NOBLE_PANGOLIN_SITE_ID** to the numeric **siteId** from Pangolin **Sites**. " "Remember: **Bearer** must be **apiKeyId.apiKeySecret**; a browser tab uses **session cookies** instead." ) if isinstance(envelope, dict) and envelope.get("message"): msg += f" Server message: {envelope.get('message')!r}." raise SystemExit(msg) detail = json.dumps(envelope) if isinstance(envelope, (dict, list)) else str(envelope) raise SystemExit( f"No site matched {ref!r}. GET {url} returned HTTP {code}.\n{detail}\n" "Check **NOBLE_PANGOLIN_API_BASE** (self-hosted: often https:///api/v1 — match " "Swagger **servers**), **NOBLE_PANGOLIN_ORG_ID**, and API key permissions. " "Or set **NOBLE_PANGOLIN_SITE_ID** to the numeric **siteId**." ) def main() -> None: ap = argparse.ArgumentParser(description=__doc__) ap.add_argument("--env-file", help="Parse KEY=value lines (optional overrides for env)") ap.add_argument("--api-base", default=os.environ.get("NOBLE_PANGOLIN_API_BASE", "")) ap.add_argument("--org-id", default=os.environ.get("NOBLE_PANGOLIN_ORG_ID", "")) ap.add_argument("--token", default=os.environ.get("NOBLE_PANGOLIN_API_TOKEN", "")) ap.add_argument("--site-id", default=os.environ.get("NOBLE_PANGOLIN_SITE_ID", "")) ap.add_argument("--traefik-ip", default=os.environ.get("NOBLE_PANGOLIN_TRAEFIK_IP", "")) ap.add_argument( "--traefik-port", type=int, default=int(os.environ.get("NOBLE_PANGOLIN_TRAEFIK_PORT", "443") or 443), ) ap.add_argument( "--fqdns", required=True, help="Comma-separated public FQDNs (must match linked Pangolin domains / SANs)", ) ap.add_argument( "--dry-run", action="store_true", help="Print actions only; do not call mutating endpoints", ) ap.add_argument( "--ca-bundle", default=os.environ.get("NOBLE_PANGOLIN_CA_BUNDLE", ""), help="Path to PEM CA bundle for the Integration API (preferred over --insecure-skip-tls-verify)", ) ap.add_argument( "--insecure-skip-tls-verify", action="store_true", help="Disable TLS certificate verification (homelab self-signed API only)", ) args = ap.parse_args() if args.env_file: for k, v in load_dotenv(args.env_file).items(): if k.startswith("NOBLE_PANGOLIN_"): # Assign (not setdefault): a stale or empty **NOBLE_*** in the parent environment # must not hide values from **.env** when Ansible runs this script. os.environ[k] = v args.api_base = os.environ.get("NOBLE_PANGOLIN_API_BASE", args.api_base or "") args.org_id = os.environ.get("NOBLE_PANGOLIN_ORG_ID", args.org_id or "") args.token = os.environ.get("NOBLE_PANGOLIN_API_TOKEN", args.token or "") args.site_id = os.environ.get("NOBLE_PANGOLIN_SITE_ID", args.site_id or "") args.traefik_ip = os.environ.get("NOBLE_PANGOLIN_TRAEFIK_IP", args.traefik_ip or "") args.ca_bundle = os.environ.get("NOBLE_PANGOLIN_CA_BUNDLE", args.ca_bundle or "") if not str(args.ca_bundle or "").strip(): args.ca_bundle = os.environ.get("NOBLE_PANGOLIN_CA_BUNDLE", "") if not args.insecure_skip_tls_verify: args.insecure_skip_tls_verify = env_truthy( os.environ.get("NOBLE_PANGOLIN_INSECURE_SKIP_TLS_VERIFY", "") ) api_key_id = os.environ.get("NOBLE_PANGOLIN_API_KEY_ID", "").strip() args.token = pangolin_bearer_credential(str(args.token or ""), api_key_id) # Print the key ID (never the secret) so mismatches are easy to spot. used_key_id = args.token.split(".")[0] if "." in args.token else "(none)" print(f"[auth] using apiKeyId={used_key_id!r} — verify this exists in Pangolin → Organization → API keys") if "." not in args.token: print( "[auth] WARNING: NOBLE_PANGOLIN_API_TOKEN has no '.' — Pangolin requires **apiKeyId.apiKeySecret**. " "Set **NOBLE_PANGOLIN_API_KEY_ID** + secret, or paste **id.secret** as a single value in **NOBLE_PANGOLIN_API_TOKEN**.", file=sys.stderr, ) missing = [ n for n, v in [ ("NOBLE_PANGOLIN_API_BASE", args.api_base), ("NOBLE_PANGOLIN_ORG_ID", args.org_id), ("NOBLE_PANGOLIN_API_TOKEN", args.token), ("NOBLE_PANGOLIN_SITE_ID", args.site_id), ("NOBLE_PANGOLIN_TRAEFIK_IP", args.traefik_ip), ] if not str(v).strip() ] if missing: raise SystemExit(f"Missing required settings: {', '.join(missing)}") fqdns = [normalize_fqdn(x) for x in args.fqdns.split(",") if x.strip()] if not fqdns: raise SystemExit("No FQDNs in --fqdns") api_base = str(args.api_base).rstrip("/") org_id = str(args.org_id).strip() token = str(args.token).strip() ssl_ctx = tls_ssl_context(str(args.ca_bundle).strip(), bool(args.insecure_skip_tls_verify)) if str(args.ca_bundle).strip(): print(f"[tls] using CA bundle {args.ca_bundle!r}") elif args.insecure_skip_tls_verify: print("[tls] WARNING: certificate verification disabled (NOBLE_PANGOLIN_INSECURE_SKIP_TLS_VERIFY)") site_id = resolve_site_id(api_base, org_id, token, str(args.site_id).strip(), ssl_ctx) print(f"[site] siteId={site_id} (NOBLE_PANGOLIN_SITE_ID={args.site_id!r})") traefik_ip = str(args.traefik_ip).strip() traefik_port = int(args.traefik_port) dom_url = f"{api_base}/org/{org_id}/domains" domains_raw = unwrap(api_request("GET", dom_url, token, ssl_context=ssl_ctx)) domains: list[dict[str, Any]] = [] if isinstance(domains_raw, list): domains = domains_raw elif isinstance(domains_raw, dict): domains = domains_raw.get("domains") or [] if not isinstance(domains, list): raise SystemExit(f"Unexpected domains response: {domains_raw!r}") resources = list_all_resources(api_base, org_id, token, ssl_ctx) for fqdn in fqdns: domain_id, subdomain = resolve_domain(fqdn, domains) res = find_resource_for_fqdn(resources, fqdn) if res is None: body = { "name": f"noble {fqdn}", "http": True, "protocol": "tcp", "domainId": domain_id, **({"subdomain": subdomain} if subdomain is not None else {"subdomain": None}), } print(f"[create] resource {fqdn!r} domainId={domain_id} subdomain={subdomain!r}") if args.dry_run: continue created = unwrap( api_request( "PUT", f"{api_base}/org/{org_id}/resource", token, body, ssl_context=ssl_ctx, ) ) rid = int(str(created.get("resourceId", "")).strip() or 0) if not rid: raise SystemExit(f"Create resource response missing resourceId: {created!r}") print(f" -> resourceId={rid}") resources.append( { "resourceId": rid, "http": True, "fullDomain": fqdn, "subdomain": subdomain, } ) res = find_resource_for_fqdn(resources, fqdn) else: rid = int(str(res.get("resourceId", "")).strip() or 0) if not rid: raise SystemExit(f"Resource missing resourceId: {res!r}") print(f"[exists] resource {fqdn!r} resourceId={rid}") if res is None: raise SystemExit("internal: resource missing after create") rid = int(str(res.get("resourceId", "")).strip() or 0) if not rid: raise SystemExit(f"Resource missing resourceId: {res!r}") targets = list_targets(api_base, rid, token, ssl_ctx) if any(target_matches(t, site_id, traefik_ip, traefik_port) for t in targets): print(f" -> target OK site={site_id} {traefik_ip}:{traefik_port}") continue tbody = { "siteId": site_id, "ip": traefik_ip, "port": traefik_port, "method": "http", } print(f"[target] PUT /resource/{rid}/target {tbody}") if args.dry_run: continue api_request( "PUT", f"{api_base}/resource/{rid}/target", token, tbody, ssl_context=ssl_ctx, ) print(" -> target created") print("Done.") if __name__ == "__main__": try: main() except KeyboardInterrupt: sys.exit(130)