521 lines
21 KiB
Python
521 lines
21 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Create / update Authentik OAuth2/OpenID providers + applications + groups (stdlib only).
|
|
Environment:
|
|
AUTHENTIK_API_BASE e.g. https://auth.apps.example.com/api/v3
|
|
AUTHENTIK_TOKEN bootstrap API token (Bearer) for user **akadmin** (see README if flows return 403)
|
|
BOOTSTRAP_EMAIL initial admin email (added to noble-admins)
|
|
CLIENT_JSON path to JSON file: { "argocd": {"client_id","client_secret","redirect_uri"}, ... }
|
|
Optional (skip API discovery; Authentik 2026+ RBAC — use UUID strings from worker shell or Admin UI):
|
|
AUTHENTIK_OAUTH_AUTHORIZATION_FLOW_PK
|
|
AUTHENTIK_OAUTH_INVALIDATION_FLOW_PK
|
|
AUTHENTIK_OAUTH_SIGNING_KEY_PK
|
|
AUTHENTIK_OAUTH_SCOPE_MAPPING_PKS comma-separated ScopeMapping UUIDs (openid,email,profile,offline_access[,groups])
|
|
AUTHENTIK_NOBLE_ADMINS_GROUP_PK noble-admins Group UUID (skip GET /core/groups/)
|
|
AUTHENTIK_NOBLE_EDITORS_GROUP_PK noble-editors Group UUID
|
|
AUTHENTIK_SKIP_OIDC_REST if 1/true/yes, skip OAuth2 provider + application REST calls (Ansible
|
|
uses **worker_upsert_oauth_oidc.py** when bootstrap tokens cannot **GET …/providers/oauth2/**)
|
|
AUTHENTIK_SKIP_USER_GROUP_REST if 1/true/yes, skip **GET/PATCH …/core/users/** for bootstrap group membership (Ansible uses
|
|
**worker_add_bootstrap_user_groups.py** when the token cannot **view_user**)
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
|
|
|
|
# List endpoints for OAuth2 scope PropertyMapping (Authentik version-dependent).
|
|
_OAUTH_SCOPE_MAPPING_LIST_PATHS = (
|
|
"/propertymappings/provider/scope/?page_size=500",
|
|
"/propertymappings/scope/?page_size=500",
|
|
"/propertymappings/oauthscope/?page_size=500",
|
|
)
|
|
|
|
|
|
def primary_key(value) -> int | str:
|
|
"""Authentik API pk: integer (legacy) or UUID string (2026+)."""
|
|
if isinstance(value, bool):
|
|
raise ValueError(f"invalid pk (bool): {value!r}")
|
|
if isinstance(value, int):
|
|
return value
|
|
if isinstance(value, str):
|
|
return value
|
|
raise ValueError(f"invalid pk type {type(value).__name__}: {value!r}")
|
|
|
|
|
|
def dedupe_pks_preserve_order(items: list[int | str]) -> list[int | str]:
|
|
out: list[int | str] = []
|
|
seen: set[str] = set()
|
|
for item in items:
|
|
key = str(item)
|
|
if key not in seen:
|
|
seen.add(key)
|
|
out.append(item)
|
|
return out
|
|
|
|
|
|
def req(method: str, url: str, body: dict | None = None) -> tuple[int, dict | list]:
|
|
data = None
|
|
headers = {"Authorization": f"Bearer {os.environ['AUTHENTIK_TOKEN']}", "Accept": "application/json"}
|
|
if body is not None:
|
|
data = json.dumps(body).encode("utf-8")
|
|
headers["Content-Type"] = "application/json"
|
|
r = urllib.request.Request(url, data=data, headers=headers, method=method)
|
|
try:
|
|
with urllib.request.urlopen(r, timeout=120) as resp:
|
|
raw = resp.read().decode("utf-8")
|
|
code = resp.getcode()
|
|
if not raw:
|
|
return code, {}
|
|
return code, json.loads(raw)
|
|
except urllib.error.HTTPError as e:
|
|
err = e.read().decode("utf-8", errors="replace")
|
|
raise RuntimeError(f"{method} {url} -> HTTP {e.code}: {err}") from e
|
|
|
|
|
|
def req_any(method: str, url: str, body: dict | None = None) -> tuple[int, dict | list | str]:
|
|
"""Like req() but returns HTTP status and body (or error text) instead of raising on HTTP errors."""
|
|
data = None
|
|
headers = {"Authorization": f"Bearer {os.environ['AUTHENTIK_TOKEN']}", "Accept": "application/json"}
|
|
if body is not None:
|
|
data = json.dumps(body).encode("utf-8")
|
|
headers["Content-Type"] = "application/json"
|
|
r = urllib.request.Request(url, data=data, headers=headers, method=method)
|
|
try:
|
|
with urllib.request.urlopen(r, timeout=120) as resp:
|
|
raw = resp.read().decode("utf-8")
|
|
code = resp.getcode()
|
|
if not raw:
|
|
return code, {}
|
|
return code, json.loads(raw)
|
|
except urllib.error.HTTPError as e:
|
|
err = e.read().decode("utf-8", errors="replace")
|
|
try:
|
|
return e.code, json.loads(err)
|
|
except json.JSONDecodeError:
|
|
return e.code, err
|
|
|
|
|
|
def collect(base: str, path: str) -> list:
|
|
out: list = []
|
|
url = base + path
|
|
origin = base.split("/api/v3", 1)[0] if "/api/v3" in base else base
|
|
while url:
|
|
code, payload = req("GET", url)
|
|
if code != 200:
|
|
raise RuntimeError(f"GET {url} unexpected {code}")
|
|
if isinstance(payload, dict) and "results" in payload:
|
|
out.extend(payload["results"])
|
|
nxt = payload.get("next") or ""
|
|
if not nxt:
|
|
url = ""
|
|
elif nxt.startswith("http"):
|
|
url = nxt
|
|
else:
|
|
url = origin + nxt
|
|
else:
|
|
break
|
|
return out
|
|
|
|
|
|
def find_flow_pk(base: str, slug: str) -> str:
|
|
"""Resolve a Flow primary key. Prefer detail-by-slug (REST); fall back to list filter."""
|
|
slug_path = urllib.parse.quote(slug, safe="")
|
|
attempts = (
|
|
f"{base}/flows/instances/{slug_path}/",
|
|
f"{base}/flows/instances/?slug={urllib.parse.quote(slug)}",
|
|
)
|
|
errors: list[str] = []
|
|
for url in attempts:
|
|
code, payload = req_any("GET", url)
|
|
if code != 200:
|
|
errors.append(f"{url} -> HTTP {code}: {payload!r}")
|
|
continue
|
|
if not isinstance(payload, dict):
|
|
errors.append(f"{url} -> unexpected body type")
|
|
continue
|
|
if "results" in payload:
|
|
results = payload.get("results") or []
|
|
if results:
|
|
return str(primary_key(results[0]["pk"]))
|
|
elif payload.get("pk") is not None:
|
|
return str(primary_key(payload["pk"]))
|
|
errors.append(f"{url} -> empty")
|
|
joined = "; ".join(errors)
|
|
extra = ""
|
|
low = joined.lower()
|
|
if (
|
|
"token invalid" in low
|
|
or "invalid/expired" in low
|
|
or ("invalid" in low and "expired" in low)
|
|
):
|
|
extra = (
|
|
" The API rejected the bearer token (invalid/expired). After a fresh install, wait until "
|
|
"the worker has finished bootstrap (re-run this playbook; Ansible now waits for HTTP 200). "
|
|
"Confirm NOBLE_AUTHENTIK_BOOTSTRAP_TOKEN in .env matches what Helm was given. "
|
|
"If you changed the token in .env without a matching helm upgrade, run the authentik tag again "
|
|
"or create a new API token for akadmin."
|
|
)
|
|
else:
|
|
extra = (
|
|
" Authentik 2026+ RBAC: the API user must be a superuser (member of **authentik Admins**) "
|
|
"or use a token for **akadmin** created from that account. "
|
|
"Alternatively set AUTHENTIK_OAUTH_AUTHORIZATION_FLOW_PK and "
|
|
"AUTHENTIK_OAUTH_INVALIDATION_FLOW_PK (flow UUIDs from Admin → Flows → open flow → URL / API)."
|
|
)
|
|
raise RuntimeError(f"cannot resolve flow slug {slug!r} ({joined}).{extra}")
|
|
|
|
|
|
def find_flow_pk_first(base: str, slugs: list[str]) -> str:
|
|
last_err: Exception | None = None
|
|
for s in slugs:
|
|
try:
|
|
return find_flow_pk(base, s)
|
|
except RuntimeError as e:
|
|
last_err = e
|
|
raise RuntimeError(f"no flow matched {slugs}: {last_err}")
|
|
|
|
|
|
def signing_key_pk(base: str):
|
|
raw = (os.environ.get("AUTHENTIK_OAUTH_SIGNING_KEY_PK") or "").strip()
|
|
if raw:
|
|
return primary_key(raw)
|
|
code, payload = req_any("GET", f"{base}/crypto/certificatekeypairs/?ordering=pk&page_size=1")
|
|
if code != 200:
|
|
raise RuntimeError(
|
|
f"GET certificatekeypairs -> HTTP {code}: {payload!r}. "
|
|
"Authentik 2026+ RBAC: set AUTHENTIK_OAUTH_SIGNING_KEY_PK (CertificateKeyPair UUID) or let "
|
|
"Ansible resolve it from the worker DB (noble_authentik_oauth_signing_key_pk)."
|
|
)
|
|
if not isinstance(payload, dict) or not payload.get("results"):
|
|
raise RuntimeError("no signing certificate keypairs returned")
|
|
return primary_key(payload["results"][0]["pk"])
|
|
|
|
|
|
def _collect_scope_mappings(base: str) -> list:
|
|
"""Try known Authentik API paths for OAuth2 scope property mappings (version-dependent)."""
|
|
last_err: Exception | None = None
|
|
for path in _OAUTH_SCOPE_MAPPING_LIST_PATHS:
|
|
try:
|
|
return collect(base, path)
|
|
except RuntimeError as exc:
|
|
err = str(exc)
|
|
if " 404" in err or "404:" in err or " 403" in err or "403:" in err:
|
|
last_err = exc
|
|
continue
|
|
raise
|
|
raise RuntimeError(f"could not list OAuth scope property mappings; last error: {last_err}")
|
|
|
|
|
|
def scope_mapping_pks(base: str) -> list[int | str]:
|
|
raw = (os.environ.get("AUTHENTIK_OAUTH_SCOPE_MAPPING_PKS") or "").strip()
|
|
if raw:
|
|
parts = [p.strip() for p in raw.split(",") if p.strip()]
|
|
uniq = dedupe_pks_preserve_order([primary_key(p) for p in parts])
|
|
if len(uniq) < 3:
|
|
raise RuntimeError(
|
|
f"AUTHENTIK_OAUTH_SCOPE_MAPPING_PKS must list at least 3 UUIDs (got {len(uniq)}); "
|
|
"use comma-separated ScopeMapping primary keys in order "
|
|
"openid,email,profile,offline_access,groups from the worker helper or Admin UI."
|
|
)
|
|
return uniq
|
|
mappings = _collect_scope_mappings(base)
|
|
want_scopes = {"openid", "email", "profile", "offline_access", "groups"}
|
|
pks: list[int | str] = []
|
|
for m in mappings:
|
|
sn = (m.get("scope_name") or "").strip()
|
|
if sn in want_scopes:
|
|
pks.append(primary_key(m["pk"]))
|
|
# de-dupe preserve order (str key: int 1 vs str "1" unlikely in one payload)
|
|
seen: set[str] = set()
|
|
uniq: list[int | str] = []
|
|
for p in pks:
|
|
key = str(p)
|
|
if key not in seen:
|
|
seen.add(key)
|
|
uniq.append(p)
|
|
if len(uniq) >= 3:
|
|
return uniq
|
|
# Fallback: include managed OpenID mappings by name heuristics
|
|
for m in mappings:
|
|
name = (m.get("name") or "").lower()
|
|
if "openid" in name and any(x in name for x in ("openid", "email", "profile", "groups", "offline")):
|
|
pk_val = primary_key(m["pk"])
|
|
key = str(pk_val)
|
|
if key not in seen:
|
|
seen.add(key)
|
|
uniq.append(pk_val)
|
|
if len(uniq) < 3:
|
|
hints = ", ".join(
|
|
f"GET {base}{p.split('?', 1)[0]}/" for p in _OAUTH_SCOPE_MAPPING_LIST_PATHS
|
|
)
|
|
raise RuntimeError(
|
|
f"too few oauth scope mappings resolved ({uniq}); inspect one of: {hints}. "
|
|
"Authentik 2026+ RBAC: set AUTHENTIK_OAUTH_SCOPE_MAPPING_PKS (comma-separated UUIDs) or "
|
|
"let Ansible resolve them from the worker DB."
|
|
)
|
|
return uniq
|
|
|
|
|
|
def find_oauth2_provider_by_client_id(base: str, client_id: str) -> dict | None:
|
|
"""OAuth2 provider list has no reliable **slug** filter; match on **client_id**."""
|
|
url = f"{base}/providers/oauth2/?page_size=100"
|
|
origin = base.split("/api/v3", 1)[0] if "/api/v3" in base else base
|
|
while url:
|
|
code, payload = req_any("GET", url)
|
|
if code != 200 or not isinstance(payload, dict):
|
|
raise RuntimeError(
|
|
f"provider list failed: GET {url} -> {code}: {payload!r}. "
|
|
"Authentik 2026+ RBAC: the token may not list OAuth2 providers; use an **akadmin** API token "
|
|
"with provider permissions, or grant **view_oauth2provider** (see Admin → Roles)."
|
|
)
|
|
for row in payload.get("results") or []:
|
|
if row.get("client_id") == client_id:
|
|
return row
|
|
nxt = payload.get("next") or ""
|
|
if not nxt:
|
|
return None
|
|
if nxt.startswith("http"):
|
|
url = nxt
|
|
else:
|
|
url = origin + nxt
|
|
return None
|
|
|
|
|
|
def upsert_oauth_provider(
|
|
base: str,
|
|
slug: str,
|
|
name: str,
|
|
client_id: str,
|
|
client_secret: str,
|
|
redirect_uri: str,
|
|
auth_flow: str,
|
|
invalidation_flow: str,
|
|
signing_key,
|
|
property_mappings: list[int | str],
|
|
) -> int | str:
|
|
# Authentik 2025+ expects redirect_uris as structured JSON (not newline-separated text).
|
|
redirect_uris = [
|
|
{
|
|
"matching_mode": "strict",
|
|
"url": redirect_uri.strip(),
|
|
"redirect_uri_type": "authorization",
|
|
}
|
|
]
|
|
existing = find_oauth2_provider_by_client_id(base, client_id)
|
|
body = {
|
|
"name": name,
|
|
"authorization_flow": auth_flow,
|
|
"invalidation_flow": invalidation_flow,
|
|
"property_mappings": property_mappings,
|
|
"client_type": "confidential",
|
|
"client_id": client_id,
|
|
"client_secret": client_secret,
|
|
"redirect_uris": redirect_uris,
|
|
"signing_key": signing_key,
|
|
"grant_types": ["authorization_code"],
|
|
}
|
|
if existing:
|
|
pk = primary_key(existing["pk"])
|
|
code2, _ = req("PATCH", f"{base}/providers/oauth2/{pk}/", body)
|
|
if code2 not in (200, 204):
|
|
raise RuntimeError(f"PATCH provider client_id={client_id} -> {code2}")
|
|
return pk
|
|
code3, created = req("POST", f"{base}/providers/oauth2/", body)
|
|
if code3 not in (200, 201) or not isinstance(created, dict):
|
|
raise RuntimeError(f"POST provider client_id={client_id} -> {code3} {created}")
|
|
return primary_key(created["pk"])
|
|
|
|
|
|
def upsert_application(base: str, slug: str, name: str, provider_pk: int | str) -> None:
|
|
# Detail URL is /core/applications/{slug}/ (lookup_field slug, 2026+). Do not trust list
|
|
# ordering: ?slug= filters can still return unrelated rows first → PATCH wrong app + 400.
|
|
slug = str(slug).strip()
|
|
seg = urllib.parse.quote(slug, safe="")
|
|
detail = f"{base}/core/applications/{seg}/"
|
|
code, payload = req_any("GET", detail)
|
|
if code == 200:
|
|
if not isinstance(payload, dict):
|
|
raise RuntimeError(f"GET application {slug}: unexpected response")
|
|
# Omit slug on update — sending the same slug/provider can trip UniqueValidator on PATCH.
|
|
code2, err = req_any("PATCH", detail, {"name": name, "provider": provider_pk})
|
|
if code2 not in (200, 204):
|
|
raise RuntimeError(f"PATCH application {slug} -> {code2}: {err!r}")
|
|
return
|
|
if code == 404:
|
|
body = {"name": name, "slug": slug, "provider": provider_pk}
|
|
code3, err = req_any("POST", f"{base}/core/applications/", body)
|
|
if code3 not in (200, 201):
|
|
raise RuntimeError(f"POST application {slug} -> {code3}: {err!r}")
|
|
return
|
|
raise RuntimeError(f"GET application {slug} -> {code}: {payload!r}")
|
|
|
|
|
|
def _group_pk_from_env(name: str) -> str | None:
|
|
env_keys = {
|
|
"noble-admins": "AUTHENTIK_NOBLE_ADMINS_GROUP_PK",
|
|
"noble-editors": "AUTHENTIK_NOBLE_EDITORS_GROUP_PK",
|
|
}
|
|
key = env_keys.get(name)
|
|
if not key:
|
|
return None
|
|
raw = (os.environ.get(key) or "").strip()
|
|
return raw or None
|
|
|
|
|
|
def ensure_group(base: str, name: str) -> int | str:
|
|
env_pk = _group_pk_from_env(name)
|
|
if env_pk:
|
|
return primary_key(env_pk)
|
|
code, payload = req_any("GET", f"{base}/core/groups/?name={urllib.parse.quote(name)}")
|
|
if code != 200:
|
|
raise RuntimeError(
|
|
f"GET groups name={name!r} -> HTTP {code}: {payload!r}. "
|
|
"Authentik 2026+ RBAC: set AUTHENTIK_NOBLE_ADMINS_GROUP_PK and "
|
|
"AUTHENTIK_NOBLE_EDITORS_GROUP_PK (Group UUIDs), or let Ansible resolve/create them via the worker DB."
|
|
)
|
|
if not isinstance(payload, dict):
|
|
raise RuntimeError("group list failed")
|
|
results = payload.get("results") or []
|
|
if results:
|
|
return primary_key(results[0]["pk"])
|
|
code2, created = req_any("POST", f"{base}/core/groups/", {"name": name})
|
|
if code2 not in (200, 201) or not isinstance(created, dict):
|
|
raise RuntimeError(
|
|
f"POST group {name} -> {code2}: {created!r}. "
|
|
"If the API user cannot create groups, run the noble_authentik worker helper or create the group in Admin."
|
|
)
|
|
return primary_key(created["pk"])
|
|
|
|
|
|
def add_user_to_groups(base: str, email: str, group_pks: list[int | str]) -> None:
|
|
code, payload = req_any("GET", f"{base}/core/users/?email={urllib.parse.quote(email)}")
|
|
if code != 200:
|
|
raise RuntimeError(
|
|
f"GET users email={email!r} -> HTTP {code}: {payload!r}. "
|
|
"Authentik 2026+ RBAC: the token may not list users; add the bootstrap user to **noble-admins** / "
|
|
"**noble-editors** in Admin, or use a token with **view_user** permission."
|
|
)
|
|
if not isinstance(payload, dict):
|
|
raise RuntimeError("user list failed")
|
|
results = payload.get("results") or []
|
|
if not results:
|
|
print(f"WARN: no user with email {email}; skip group membership", file=sys.stderr)
|
|
return
|
|
user = results[0]
|
|
upk = primary_key(user["pk"])
|
|
existing = [
|
|
primary_key(g["pk"])
|
|
for g in user.get("groups", [])
|
|
if isinstance(g, dict) and "pk" in g
|
|
]
|
|
merged = dedupe_pks_preserve_order([*existing, *group_pks])
|
|
if {str(x) for x in merged} == {str(x) for x in existing}:
|
|
return
|
|
code2, err = req_any("PATCH", f"{base}/core/users/{upk}/", {"groups": merged})
|
|
if code2 not in (200, 204):
|
|
raise RuntimeError(f"PATCH user groups -> {code2}: {err!r}")
|
|
|
|
|
|
def _truthy_env(name: str) -> bool:
|
|
return (os.environ.get(name) or "").strip().lower() in ("1", "true", "yes")
|
|
|
|
|
|
def main() -> int:
|
|
base = os.environ.get("AUTHENTIK_API_BASE", "").rstrip("/")
|
|
tok = os.environ.get("AUTHENTIK_TOKEN", "")
|
|
cfg_path = os.environ.get("CLIENT_JSON", "")
|
|
email = os.environ.get("BOOTSTRAP_EMAIL", "")
|
|
skip_oidc_rest = _truthy_env("AUTHENTIK_SKIP_OIDC_REST")
|
|
skip_user_group_rest = _truthy_env("AUTHENTIK_SKIP_USER_GROUP_REST")
|
|
if not base or not tok:
|
|
print("AUTHENTIK_API_BASE and AUTHENTIK_TOKEN required", file=sys.stderr)
|
|
return 2
|
|
if not skip_oidc_rest and not cfg_path:
|
|
print("CLIENT_JSON required unless AUTHENTIK_SKIP_OIDC_REST is set", file=sys.stderr)
|
|
return 2
|
|
|
|
clients: dict = {}
|
|
if not skip_oidc_rest:
|
|
with open(cfg_path, encoding="utf-8") as f:
|
|
clients = json.load(f)
|
|
if not isinstance(clients, dict):
|
|
print("CLIENT_JSON must be an object keyed by provider slug", file=sys.stderr)
|
|
return 2
|
|
|
|
auth_flow: str | None = None
|
|
invalidation_flow: str | None = None
|
|
signing_key = None
|
|
pmap: list[int | str] = []
|
|
|
|
if not skip_oidc_rest:
|
|
auth_pk = (os.environ.get("AUTHENTIK_OAUTH_AUTHORIZATION_FLOW_PK") or "").strip()
|
|
inv_pk = (os.environ.get("AUTHENTIK_OAUTH_INVALIDATION_FLOW_PK") or "").strip()
|
|
if auth_pk and inv_pk:
|
|
auth_flow, invalidation_flow = auth_pk, inv_pk
|
|
elif auth_pk or inv_pk:
|
|
print(
|
|
"Set both AUTHENTIK_OAUTH_AUTHORIZATION_FLOW_PK and AUTHENTIK_OAUTH_INVALIDATION_FLOW_PK "
|
|
"or neither",
|
|
file=sys.stderr,
|
|
)
|
|
return 2
|
|
else:
|
|
auth_flow = find_flow_pk(base, "default-provider-authorization-implicit-consent")
|
|
invalidation_flow = find_flow_pk_first(
|
|
base,
|
|
["default-invalidation-flow", "default-provider-invalidation-flow"],
|
|
)
|
|
|
|
signing_key = signing_key_pk(base)
|
|
pmap = scope_mapping_pks(base)
|
|
|
|
admins_pk = ensure_group(base, "noble-admins")
|
|
editors_pk = ensure_group(base, "noble-editors")
|
|
|
|
try:
|
|
if not skip_oidc_rest:
|
|
assert auth_flow is not None and invalidation_flow is not None
|
|
for slug, meta in clients.items():
|
|
name = meta.get("name") or slug
|
|
cid = meta["client_id"]
|
|
csec = meta["client_secret"]
|
|
redir = meta["redirect_uri"]
|
|
ppk = upsert_oauth_provider(
|
|
base,
|
|
slug,
|
|
name,
|
|
cid,
|
|
csec,
|
|
redir,
|
|
auth_flow,
|
|
invalidation_flow,
|
|
signing_key,
|
|
pmap,
|
|
)
|
|
upsert_application(base, slug, name, ppk)
|
|
|
|
if email and not skip_user_group_rest:
|
|
add_user_to_groups(base, email, [admins_pk, editors_pk])
|
|
if skip_oidc_rest and skip_user_group_rest:
|
|
print("authentik: noble groups verified; OAuth2 apps + bootstrap group membership via worker", flush=True)
|
|
elif skip_oidc_rest:
|
|
print("authentik: groups verified; OAuth2 apps via worker (user membership via API)", flush=True)
|
|
else:
|
|
print("authentik: providers + applications configured", flush=True)
|
|
return 0
|
|
except RuntimeError as exc:
|
|
print(str(exc), file=sys.stderr)
|
|
return 1
|
|
except Exception as exc:
|
|
print(f"{type(exc).__name__}: {exc}", file=sys.stderr)
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|