#!/usr/bin/env python3 """ Create / update Authentik OAuth2/OpenID providers + applications + groups (stdlib only). Environment: AUTHENTIK_API_BASE e.g. https://auth.apps.example.com/api/v3 AUTHENTIK_TOKEN bootstrap API token (Bearer) for user **akadmin** (see README if flows return 403) BOOTSTRAP_EMAIL initial admin email (added to noble-admins) CLIENT_JSON path to JSON file: { "argocd": {"client_id","client_secret","redirect_uri"}, ... } Optional (skip flow API discovery; Authentik 2026+ flow **pk** UUID strings): AUTHENTIK_OAUTH_AUTHORIZATION_FLOW_PK AUTHENTIK_OAUTH_INVALIDATION_FLOW_PK """ from __future__ import annotations import json import os import sys import urllib.error import urllib.parse import urllib.request # List endpoints for OAuth2 scope PropertyMapping (Authentik version-dependent). _OAUTH_SCOPE_MAPPING_LIST_PATHS = ( "/propertymappings/provider/scope/?page_size=500", "/propertymappings/scope/?page_size=500", "/propertymappings/oauthscope/?page_size=500", ) def primary_key(value) -> int | str: """Authentik API pk: integer (legacy) or UUID string (2026+).""" if isinstance(value, bool): raise ValueError(f"invalid pk (bool): {value!r}") if isinstance(value, int): return value if isinstance(value, str): return value raise ValueError(f"invalid pk type {type(value).__name__}: {value!r}") def dedupe_pks_preserve_order(items: list[int | str]) -> list[int | str]: out: list[int | str] = [] seen: set[str] = set() for item in items: key = str(item) if key not in seen: seen.add(key) out.append(item) return out def req(method: str, url: str, body: dict | None = None) -> tuple[int, dict | list]: data = None headers = {"Authorization": f"Bearer {os.environ['AUTHENTIK_TOKEN']}", "Accept": "application/json"} if body is not None: data = json.dumps(body).encode("utf-8") headers["Content-Type"] = "application/json" r = urllib.request.Request(url, data=data, headers=headers, method=method) try: with urllib.request.urlopen(r, timeout=120) as resp: raw = resp.read().decode("utf-8") code = resp.getcode() if not raw: return code, {} return code, json.loads(raw) except urllib.error.HTTPError as e: err = e.read().decode("utf-8", errors="replace") raise RuntimeError(f"{method} {url} -> HTTP {e.code}: {err}") from e def req_any(method: str, url: str, body: dict | None = None) -> tuple[int, dict | list | str]: """Like req() but returns HTTP status and body (or error text) instead of raising on HTTP errors.""" data = None headers = {"Authorization": f"Bearer {os.environ['AUTHENTIK_TOKEN']}", "Accept": "application/json"} if body is not None: data = json.dumps(body).encode("utf-8") headers["Content-Type"] = "application/json" r = urllib.request.Request(url, data=data, headers=headers, method=method) try: with urllib.request.urlopen(r, timeout=120) as resp: raw = resp.read().decode("utf-8") code = resp.getcode() if not raw: return code, {} return code, json.loads(raw) except urllib.error.HTTPError as e: err = e.read().decode("utf-8", errors="replace") try: return e.code, json.loads(err) except json.JSONDecodeError: return e.code, err def collect(base: str, path: str) -> list: out: list = [] url = base + path origin = base.split("/api/v3", 1)[0] if "/api/v3" in base else base while url: code, payload = req("GET", url) if code != 200: raise RuntimeError(f"GET {url} unexpected {code}") if isinstance(payload, dict) and "results" in payload: out.extend(payload["results"]) nxt = payload.get("next") or "" if not nxt: url = "" elif nxt.startswith("http"): url = nxt else: url = origin + nxt else: break return out def find_flow_pk(base: str, slug: str) -> str: """Resolve a Flow primary key. Prefer detail-by-slug (REST); fall back to list filter.""" slug_path = urllib.parse.quote(slug, safe="") attempts = ( f"{base}/flows/instances/{slug_path}/", f"{base}/flows/instances/?slug={urllib.parse.quote(slug)}", ) errors: list[str] = [] for url in attempts: code, payload = req_any("GET", url) if code != 200: errors.append(f"{url} -> HTTP {code}: {payload!r}") continue if not isinstance(payload, dict): errors.append(f"{url} -> unexpected body type") continue if "results" in payload: results = payload.get("results") or [] if results: return str(primary_key(results[0]["pk"])) elif payload.get("pk") is not None: return str(primary_key(payload["pk"])) errors.append(f"{url} -> empty") joined = "; ".join(errors) extra = "" low = joined.lower() if ( "token invalid" in low or "invalid/expired" in low or ("invalid" in low and "expired" in low) ): extra = ( " The API rejected the bearer token (invalid/expired). After a fresh install, wait until " "the worker has finished bootstrap (re-run this playbook; Ansible now waits for HTTP 200). " "Confirm NOBLE_AUTHENTIK_BOOTSTRAP_TOKEN in .env matches what Helm was given. " "If you changed the token in .env without a matching helm upgrade, run the authentik tag again " "or create a new API token for akadmin." ) else: extra = ( " Authentik 2026+ RBAC: the API user must be a superuser (member of **authentik Admins**) " "or use a token for **akadmin** created from that account. " "Alternatively set AUTHENTIK_OAUTH_AUTHORIZATION_FLOW_PK and " "AUTHENTIK_OAUTH_INVALIDATION_FLOW_PK (flow UUIDs from Admin → Flows → open flow → URL / API)." ) raise RuntimeError(f"cannot resolve flow slug {slug!r} ({joined}).{extra}") def find_flow_pk_first(base: str, slugs: list[str]) -> str: last_err: Exception | None = None for s in slugs: try: return find_flow_pk(base, s) except RuntimeError as e: last_err = e raise RuntimeError(f"no flow matched {slugs}: {last_err}") def signing_key_pk(base: str): code, payload = req("GET", f"{base}/crypto/certificatekeypairs/?ordering=pk&page_size=1") if code != 200 or not isinstance(payload, dict) or not payload.get("results"): raise RuntimeError("no signing certificate keypairs returned") return payload["results"][0]["pk"] def _collect_scope_mappings(base: str) -> list: """Try known Authentik API paths for OAuth2 scope property mappings (version-dependent).""" last_err: Exception | None = None for path in _OAUTH_SCOPE_MAPPING_LIST_PATHS: try: return collect(base, path) except RuntimeError as exc: err = str(exc) if " 404" in err or "404:" in err: last_err = exc continue raise raise RuntimeError(f"could not list OAuth scope property mappings; last error: {last_err}") def scope_mapping_pks(base: str) -> list[int | str]: mappings = _collect_scope_mappings(base) want_scopes = {"openid", "email", "profile", "offline_access", "groups"} pks: list[int | str] = [] for m in mappings: sn = (m.get("scope_name") or "").strip() if sn in want_scopes: pks.append(primary_key(m["pk"])) # de-dupe preserve order (str key: int 1 vs str "1" unlikely in one payload) seen: set[str] = set() uniq: list[int | str] = [] for p in pks: key = str(p) if key not in seen: seen.add(key) uniq.append(p) if len(uniq) >= 3: return uniq # Fallback: include managed OpenID mappings by name heuristics for m in mappings: name = (m.get("name") or "").lower() if "openid" in name and any(x in name for x in ("openid", "email", "profile", "groups", "offline")): pk_val = primary_key(m["pk"]) key = str(pk_val) if key not in seen: seen.add(key) uniq.append(pk_val) if len(uniq) < 3: hints = ", ".join( f"GET {base}{p.split('?', 1)[0]}/" for p in _OAUTH_SCOPE_MAPPING_LIST_PATHS ) raise RuntimeError(f"too few oauth scope mappings resolved ({uniq}); inspect one of: {hints}") return uniq def find_oauth2_provider_by_client_id(base: str, client_id: str) -> dict | None: """OAuth2 provider list has no reliable **slug** filter; match on **client_id**.""" url = f"{base}/providers/oauth2/?page_size=100" origin = base.split("/api/v3", 1)[0] if "/api/v3" in base else base while url: code, payload = req("GET", url) if code != 200 or not isinstance(payload, dict): raise RuntimeError(f"provider list failed: GET {url} -> {code}") for row in payload.get("results") or []: if row.get("client_id") == client_id: return row nxt = payload.get("next") or "" if not nxt: return None if nxt.startswith("http"): url = nxt else: url = origin + nxt return None def upsert_oauth_provider( base: str, slug: str, name: str, client_id: str, client_secret: str, redirect_uri: str, auth_flow: str, invalidation_flow: str, signing_key, property_mappings: list[int | str], ) -> int | str: # Authentik 2025+ expects redirect_uris as structured JSON (not newline-separated text). redirect_uris = [ { "matching_mode": "strict", "url": redirect_uri.strip(), "redirect_uri_type": "authorization", } ] existing = find_oauth2_provider_by_client_id(base, client_id) body = { "name": name, "authorization_flow": auth_flow, "invalidation_flow": invalidation_flow, "property_mappings": property_mappings, "client_type": "confidential", "client_id": client_id, "client_secret": client_secret, "redirect_uris": redirect_uris, "signing_key": signing_key, "grant_types": ["authorization_code"], } if existing: pk = primary_key(existing["pk"]) code2, _ = req("PATCH", f"{base}/providers/oauth2/{pk}/", body) if code2 not in (200, 204): raise RuntimeError(f"PATCH provider client_id={client_id} -> {code2}") return pk code3, created = req("POST", f"{base}/providers/oauth2/", body) if code3 not in (200, 201) or not isinstance(created, dict): raise RuntimeError(f"POST provider client_id={client_id} -> {code3} {created}") return primary_key(created["pk"]) def upsert_application(base: str, slug: str, name: str, provider_pk: int | str) -> None: # Detail URL is /core/applications/{slug}/ (lookup_field slug, 2026+). Do not trust list # ordering: ?slug= filters can still return unrelated rows first → PATCH wrong app + 400. slug = str(slug).strip() seg = urllib.parse.quote(slug, safe="") detail = f"{base}/core/applications/{seg}/" code, payload = req_any("GET", detail) if code == 200: if not isinstance(payload, dict): raise RuntimeError(f"GET application {slug}: unexpected response") # Omit slug on update — sending the same slug/provider can trip UniqueValidator on PATCH. code2, err = req_any("PATCH", detail, {"name": name, "provider": provider_pk}) if code2 not in (200, 204): raise RuntimeError(f"PATCH application {slug} -> {code2}: {err!r}") return if code == 404: body = {"name": name, "slug": slug, "provider": provider_pk} code3, err = req_any("POST", f"{base}/core/applications/", body) if code3 not in (200, 201): raise RuntimeError(f"POST application {slug} -> {code3}: {err!r}") return raise RuntimeError(f"GET application {slug} -> {code}: {payload!r}") def ensure_group(base: str, name: str) -> int | str: code, payload = req("GET", f"{base}/core/groups/?name={urllib.parse.quote(name)}") if code != 200 or not isinstance(payload, dict): raise RuntimeError("group list failed") results = payload.get("results") or [] if results: return primary_key(results[0]["pk"]) code2, created = req("POST", f"{base}/core/groups/", {"name": name}) if code2 not in (200, 201) or not isinstance(created, dict): raise RuntimeError(f"POST group {name} -> {code2}") return primary_key(created["pk"]) def add_user_to_groups(base: str, email: str, group_pks: list[int | str]) -> None: code, payload = req("GET", f"{base}/core/users/?email={urllib.parse.quote(email)}") if code != 200 or not isinstance(payload, dict): raise RuntimeError("user list failed") results = payload.get("results") or [] if not results: print(f"WARN: no user with email {email}; skip group membership", file=sys.stderr) return user = results[0] upk = primary_key(user["pk"]) existing = [ primary_key(g["pk"]) for g in user.get("groups", []) if isinstance(g, dict) and "pk" in g ] merged = dedupe_pks_preserve_order([*existing, *group_pks]) if {str(x) for x in merged} == {str(x) for x in existing}: return code2, _ = req("PATCH", f"{base}/core/users/{upk}/", {"groups": merged}) if code2 not in (200, 204): raise RuntimeError(f"PATCH user groups -> {code2}") def main() -> int: base = os.environ.get("AUTHENTIK_API_BASE", "").rstrip("/") tok = os.environ.get("AUTHENTIK_TOKEN", "") cfg_path = os.environ.get("CLIENT_JSON", "") email = os.environ.get("BOOTSTRAP_EMAIL", "") if not base or not tok or not cfg_path: print("AUTHENTIK_API_BASE, AUTHENTIK_TOKEN, CLIENT_JSON required", file=sys.stderr) return 2 with open(cfg_path, encoding="utf-8") as f: clients = json.load(f) if not isinstance(clients, dict): print("CLIENT_JSON must be an object keyed by provider slug", file=sys.stderr) return 2 auth_pk = (os.environ.get("AUTHENTIK_OAUTH_AUTHORIZATION_FLOW_PK") or "").strip() inv_pk = (os.environ.get("AUTHENTIK_OAUTH_INVALIDATION_FLOW_PK") or "").strip() if auth_pk and inv_pk: auth_flow, invalidation_flow = auth_pk, inv_pk elif auth_pk or inv_pk: print( "Set both AUTHENTIK_OAUTH_AUTHORIZATION_FLOW_PK and AUTHENTIK_OAUTH_INVALIDATION_FLOW_PK " "or neither", file=sys.stderr, ) return 2 else: auth_flow = find_flow_pk(base, "default-provider-authorization-implicit-consent") invalidation_flow = find_flow_pk_first( base, ["default-invalidation-flow", "default-provider-invalidation-flow"], ) signing_key = signing_key_pk(base) pmap = scope_mapping_pks(base) admins_pk = ensure_group(base, "noble-admins") editors_pk = ensure_group(base, "noble-editors") try: for slug, meta in clients.items(): name = meta.get("name") or slug cid = meta["client_id"] csec = meta["client_secret"] redir = meta["redirect_uri"] ppk = upsert_oauth_provider( base, slug, name, cid, csec, redir, auth_flow, invalidation_flow, signing_key, pmap, ) upsert_application(base, slug, name, ppk) if email: add_user_to_groups(base, email, [admins_pk, editors_pk]) print("authentik: providers + applications configured", flush=True) return 0 except RuntimeError as exc: print(str(exc), file=sys.stderr) return 1 except Exception as exc: print(f"{type(exc).__name__}: {exc}", file=sys.stderr) return 1 if __name__ == "__main__": raise SystemExit(main())