Files
home-server/ansible/roles/noble_authentik/files/configure_authentik.py

260 lines
9.3 KiB
Python

#!/usr/bin/env python3
"""
Create / update Authentik OAuth2/OpenID providers + applications + groups (stdlib only).
Environment:
AUTHENTIK_API_BASE e.g. https://auth.apps.example.com/api/v3
AUTHENTIK_TOKEN bootstrap token (Bearer)
BOOTSTRAP_EMAIL initial admin email (added to noble-admins)
CLIENT_JSON path to JSON file: { "argocd": {"client_id","client_secret","redirect_uri"}, ... }
"""
from __future__ import annotations
import json
import os
import sys
import urllib.error
import urllib.parse
import urllib.request
def req(method: str, url: str, body: dict | None = None) -> tuple[int, dict | list]:
data = None
headers = {"Authorization": f"Bearer {os.environ['AUTHENTIK_TOKEN']}", "Accept": "application/json"}
if body is not None:
data = json.dumps(body).encode("utf-8")
headers["Content-Type"] = "application/json"
r = urllib.request.Request(url, data=data, headers=headers, method=method)
try:
with urllib.request.urlopen(r, timeout=120) as resp:
raw = resp.read().decode("utf-8")
code = resp.getcode()
if not raw:
return code, {}
return code, json.loads(raw)
except urllib.error.HTTPError as e:
err = e.read().decode("utf-8", errors="replace")
raise RuntimeError(f"{method} {url} -> HTTP {e.code}: {err}") from e
def collect(base: str, path: str) -> list:
out: list = []
url = base + path
origin = base.split("/api/v3", 1)[0] if "/api/v3" in base else base
while url:
code, payload = req("GET", url)
if code != 200:
raise RuntimeError(f"GET {url} unexpected {code}")
if isinstance(payload, dict) and "results" in payload:
out.extend(payload["results"])
nxt = payload.get("next") or ""
if not nxt:
url = ""
elif nxt.startswith("http"):
url = nxt
else:
url = origin + nxt
else:
break
return out
def find_flow_pk(base: str, slug: str) -> str:
code, payload = req("GET", f"{base}/flows/instances/?slug={urllib.parse.quote(slug)}")
if code != 200 or not isinstance(payload, dict):
raise RuntimeError(f"flow lookup failed for {slug}")
results = payload.get("results") or []
if not results:
raise RuntimeError(f"flow slug not found: {slug}")
return str(results[0]["pk"])
def find_flow_pk_first(base: str, slugs: list[str]) -> str:
last_err: Exception | None = None
for s in slugs:
try:
return find_flow_pk(base, s)
except RuntimeError as e:
last_err = e
raise RuntimeError(f"no flow matched {slugs}: {last_err}")
def signing_key_pk(base: str) -> int:
code, payload = req("GET", f"{base}/crypto/certificatekeypairs/?ordering=pk&page_size=1")
if code != 200 or not isinstance(payload, dict) or not payload.get("results"):
raise RuntimeError("no signing certificate keypairs returned")
return int(payload["results"][0]["pk"])
def scope_mapping_pks(base: str) -> list[int]:
mappings = collect(base, "/propertymappings/oauthscope/?page_size=500")
want_scopes = {"openid", "email", "profile", "offline_access", "groups"}
pks: list[int] = []
for m in mappings:
sn = (m.get("scope_name") or "").strip()
if sn in want_scopes:
pks.append(int(m["pk"]))
# de-dupe preserve order
seen: set[int] = set()
uniq: list[int] = []
for p in pks:
if p not in seen:
seen.add(p)
uniq.append(p)
if len(uniq) >= 3:
return uniq
# Fallback: include managed OpenID mappings by name heuristics
for m in mappings:
name = (m.get("name") or "").lower()
if "openid" in name and any(x in name for x in ("openid", "email", "profile", "groups", "offline")):
pk = int(m["pk"])
if pk not in seen:
seen.add(pk)
uniq.append(pk)
if len(uniq) < 3:
raise RuntimeError(f"too few oauth scope mappings resolved ({uniq}); inspect /propertymappings/oauthscope/")
return uniq
def upsert_oauth_provider(
base: str,
slug: str,
name: str,
client_id: str,
client_secret: str,
redirect_uri: str,
auth_flow: str,
invalidation_flow: str,
signing_key: int,
property_mappings: list[int],
) -> int:
code, payload = req("GET", f"{base}/providers/oauth2/?slug={urllib.parse.quote(slug)}")
if code != 200 or not isinstance(payload, dict):
raise RuntimeError("provider list failed")
results = payload.get("results") or []
body = {
"name": name,
"slug": slug,
"authorization_flow": auth_flow,
"invalidation_flow": invalidation_flow,
"property_mappings": property_mappings,
"client_type": "confidential",
"client_id": client_id,
"client_secret": client_secret,
"redirect_uris": redirect_uri.strip() + "\n",
"signing_key": signing_key,
}
if results:
pk = int(results[0]["pk"])
code2, _ = req("PATCH", f"{base}/providers/oauth2/{pk}/", body)
if code2 not in (200, 204):
raise RuntimeError(f"PATCH provider {slug} -> {code2}")
return pk
code3, created = req("POST", f"{base}/providers/oauth2/", body)
if code3 not in (200, 201) or not isinstance(created, dict):
raise RuntimeError(f"POST provider {slug} -> {code3} {created}")
return int(created["pk"])
def upsert_application(base: str, slug: str, name: str, provider_pk: int) -> None:
code, payload = req("GET", f"{base}/core/applications/?slug={urllib.parse.quote(slug)}")
if code != 200 or not isinstance(payload, dict):
raise RuntimeError("application list failed")
results = payload.get("results") or []
body = {"name": name, "slug": slug, "provider": provider_pk}
if results:
pk = int(results[0]["pk"])
code2, _ = req("PATCH", f"{base}/core/applications/{pk}/", body)
if code2 not in (200, 204):
raise RuntimeError(f"PATCH application {slug} -> {code2}")
return
code3, _ = req("POST", f"{base}/core/applications/", body)
if code3 not in (200, 201):
raise RuntimeError(f"POST application {slug} -> {code3}")
def ensure_group(base: str, name: str) -> int:
code, payload = req("GET", f"{base}/core/groups/?name={urllib.parse.quote(name)}")
if code != 200 or not isinstance(payload, dict):
raise RuntimeError("group list failed")
results = payload.get("results") or []
if results:
return int(results[0]["pk"])
code2, created = req("POST", f"{base}/core/groups/", {"name": name})
if code2 not in (200, 201) or not isinstance(created, dict):
raise RuntimeError(f"POST group {name} -> {code2}")
return int(created["pk"])
def add_user_to_groups(base: str, email: str, group_pks: list[int]) -> None:
code, payload = req("GET", f"{base}/core/users/?email={urllib.parse.quote(email)}")
if code != 200 or not isinstance(payload, dict):
raise RuntimeError("user list failed")
results = payload.get("results") or []
if not results:
print(f"WARN: no user with email {email}; skip group membership", file=sys.stderr)
return
user = results[0]
upk = int(user["pk"])
existing = [int(g["pk"]) for g in user.get("groups", []) if isinstance(g, dict) and "pk" in g]
merged = sorted(set(existing + group_pks))
if merged == sorted(set(existing)):
return
code2, _ = req("PATCH", f"{base}/core/users/{upk}/", {"groups": merged})
if code2 not in (200, 204):
raise RuntimeError(f"PATCH user groups -> {code2}")
def main() -> int:
base = os.environ.get("AUTHENTIK_API_BASE", "").rstrip("/")
tok = os.environ.get("AUTHENTIK_TOKEN", "")
cfg_path = os.environ.get("CLIENT_JSON", "")
email = os.environ.get("BOOTSTRAP_EMAIL", "")
if not base or not tok or not cfg_path:
print("AUTHENTIK_API_BASE, AUTHENTIK_TOKEN, CLIENT_JSON required", file=sys.stderr)
return 2
with open(cfg_path, encoding="utf-8") as f:
clients = json.load(f)
if not isinstance(clients, dict):
print("CLIENT_JSON must be an object keyed by provider slug", file=sys.stderr)
return 2
auth_flow = find_flow_pk(base, "default-provider-authorization-implicit-consent")
invalidation_flow = find_flow_pk_first(
base,
["default-invalidation-flow", "default-provider-invalidation-flow"],
)
signing_key = signing_key_pk(base)
pmap = scope_mapping_pks(base)
admins_pk = ensure_group(base, "noble-admins")
editors_pk = ensure_group(base, "noble-editors")
for slug, meta in clients.items():
name = meta.get("name") or slug
cid = meta["client_id"]
csec = meta["client_secret"]
redir = meta["redirect_uri"]
ppk = upsert_oauth_provider(
base,
slug,
name,
cid,
csec,
redir,
auth_flow,
invalidation_flow,
signing_key,
pmap,
)
upsert_application(base, slug, name, ppk)
if email:
add_user_to_groups(base, email, [admins_pk, editors_pk])
print("authentik: providers + applications configured", flush=True)
return 0
if __name__ == "__main__":
raise SystemExit(main())