#!/usr/bin/env python3 """Idempotent dev bootstrap: OpenLDAP IdP + OIDC Web app for Gateway (k6 / dev-up). Writes deploy/zitadel/machinekey/dev-bootstrap.env with: ZITADEL_DEFAULT_ORG_ID, ZITADEL_OAUTH_CLIENT_ID, ZITADEL_OAUTH_CLIENT_SECRET, ZITADEL_LDAP_IDP_ID, ZITADEL_GOOGLE_IDP_ID (empty) Requires: ZITADEL up, PAT at deploy/zitadel/machinekey/zitadel-admin-sa.token """ from __future__ import annotations import json import os import sys import urllib.error import urllib.request from pathlib import Path ROOT = Path(__file__).resolve().parents[2] PAT_FILE = ROOT / "deploy/zitadel/machinekey/zitadel-admin-sa.token" OUT_FILE = ROOT / "deploy/zitadel/machinekey/dev-bootstrap.env" ZITADEL_BASE = os.environ.get("ZITADEL_BASE", "http://localhost:8080").rstrip("/") LDAP_IDP_NAME = "GatewayDevLDAP" PROJECT_NAME = "Gateway" APP_NAME = "Gateway Backend" REDIRECT_URIS = [ "http://localhost:5173/auth/callback/login", "http://localhost:5173/auth/callback/register", ] POST_LOGOUT_URIS = ["http://localhost:5173/"] LDAP_BODY = { "name": LDAP_IDP_NAME, "servers": ["ldap://openldap:389"], "startTls": False, "baseDn": "dc=gateway,dc=local", "bindDn": "cn=admin,dc=gateway,dc=local", "bindPassword": "admin", "userBase": "ou=people,dc=gateway,dc=local", "userObjectClasses": ["inetOrgPerson"], "userFilters": ["(uid=%s)"], "attributes": { "idAttribute": "uid", "emailAttribute": "mail", "firstNameAttribute": "givenName", "lastNameAttribute": "sn", "displayNameAttribute": "cn", "nickNameAttribute": "uid", }, "creationAllowed": True, "linkingAllowed": True, "autoCreation": True, "autoUpdate": True, } class BootstrapError(RuntimeError): pass def log(msg: str) -> None: print(f"[zitadel-bootstrap] {msg}", file=sys.stderr) def api(method: str, path: str, body: dict | None = None) -> dict: url = f"{ZITADEL_BASE}{path}" data = None if body is None else json.dumps(body).encode() req = urllib.request.Request( url, data=data, method=method, headers={ "Authorization": f"Bearer {read_pat()}", "Content-Type": "application/json", "Accept": "application/json", }, ) try: with urllib.request.urlopen(req, timeout=30) as resp: raw = resp.read().decode() if not raw.strip(): return {} return json.loads(raw) except urllib.error.HTTPError as e: detail = e.read().decode(errors="replace") raise BootstrapError(f"{method} {path} -> HTTP {e.code}: {detail}") from e def read_pat() -> str: if not PAT_FILE.is_file(): raise BootstrapError(f"PAT missing: {PAT_FILE} (run make k6-wait)") pat = PAT_FILE.read_text().strip() if not pat: raise BootstrapError(f"PAT empty: {PAT_FILE}") return pat def load_saved() -> dict[str, str]: if not OUT_FILE.is_file(): return {} out: dict[str, str] = {} for line in OUT_FILE.read_text().splitlines(): line = line.strip() if not line or line.startswith("#") or "=" not in line: continue k, _, v = line.partition("=") if k.startswith("export "): k = k[len("export ") :] out[k.strip()] = v.strip().strip('"').strip("'") return out def write_env(org_id: str, client_id: str, client_secret: str, ldap_idp_id: str) -> None: OUT_FILE.parent.mkdir(parents=True, exist_ok=True) content = f"""# Auto-generated by deploy/zitadel/bootstrap_dev.py — do not commit export ZITADEL_DEFAULT_ORG_ID={org_id} export ZITADEL_OAUTH_CLIENT_ID={client_id} export ZITADEL_OAUTH_CLIENT_SECRET={client_secret} export ZITADEL_LDAP_IDP_ID={ldap_idp_id} export ZITADEL_GOOGLE_IDP_ID= """ OUT_FILE.write_text(content) log(f"wrote {OUT_FILE.relative_to(ROOT)}") def org_id() -> str: data = api("GET", "/management/v1/orgs/me") oid = (data.get("org") or {}).get("id") or "" if not oid: raise BootstrapError("could not resolve org id") return oid def login_policy() -> tuple[dict, bool]: data = api("GET", "/management/v1/policies/login") return data.get("policy") or {}, bool(data.get("isDefault")) def find_ldap_idp_in_policy(policy: dict) -> str: for item in policy.get("idps") or []: if item.get("idpName") == LDAP_IDP_NAME: return item.get("idpId") or "" return "" def ensure_ldap_idp() -> str: policy, is_default = login_policy() existing = find_ldap_idp_in_policy(policy) if existing: log(f"LDAP IdP already linked: {existing}") return existing created = api("POST", "/management/v1/idps/ldap", LDAP_BODY) idp_id = created.get("id") or "" if not idp_id: raise BootstrapError(f"create LDAP IdP: unexpected response {created}") log(f"created LDAP IdP {idp_id}") if is_default or not policy.get("allowExternalIdp"): api( "POST", "/management/v1/policies/login", { "allowExternalIdp": True, "allowUsernamePassword": True, "allowRegister": True, "passwordlessType": "PASSWORDLESS_TYPE_ALLOWED", "idps": [{"idpId": idp_id, "ownerType": "IDP_OWNER_TYPE_ORG"}], }, ) log("created org login policy with LDAP IdP") else: api("POST", "/management/v1/policies/login/idps", {"idpId": idp_id}) log("linked LDAP IdP to existing login policy") return idp_id def find_project() -> str: data = api( "POST", "/management/v1/projects/_search", { "queries": [ { "nameQuery": { "name": PROJECT_NAME, "method": "TEXT_QUERY_METHOD_EQUALS", } } ] }, ) for item in data.get("result") or []: if item.get("name") == PROJECT_NAME: return item.get("id") or "" created = api( "POST", "/management/v1/projects", { "name": PROJECT_NAME, "projectRoleAssertion": True, "projectRoleCheck": False, "hasProjectCheck": False, "privateLabelingSetting": "PRIVATE_LABELING_SETTING_UNSPECIFIED", }, ) pid = created.get("id") or "" if not pid: raise BootstrapError(f"create project: unexpected response {created}") log(f"created project {PROJECT_NAME} ({pid})") return pid def find_app(project_id: str) -> tuple[str, str]: data = api( "POST", f"/management/v1/projects/{project_id}/apps/_search", { "queries": [ { "nameQuery": { "name": APP_NAME, "method": "TEXT_QUERY_METHOD_EQUALS", } } ] }, ) for item in data.get("result") or []: if item.get("name") != APP_NAME: continue app_id = item.get("id") or "" client_id = (item.get("oidcConfig") or {}).get("clientId") or "" return app_id, client_id return "", "" def create_app(project_id: str) -> tuple[str, str, str]: created = api( "POST", f"/management/v1/projects/{project_id}/apps/oidc", { "name": APP_NAME, "redirectUris": REDIRECT_URIS, "responseTypes": ["OIDC_RESPONSE_TYPE_CODE"], "grantTypes": [ "OIDC_GRANT_TYPE_AUTHORIZATION_CODE", "OIDC_GRANT_TYPE_REFRESH_TOKEN", ], "appType": "OIDC_APP_TYPE_WEB", "authMethodType": "OIDC_AUTH_METHOD_TYPE_BASIC", "postLogoutRedirectUris": POST_LOGOUT_URIS, "devMode": True, "accessTokenType": "OIDC_TOKEN_TYPE_BEARER", }, ) app_id = created.get("appId") or "" client_id = created.get("clientId") or "" client_secret = created.get("clientSecret") or "" if not app_id or not client_id or not client_secret: raise BootstrapError(f"create OIDC app: unexpected response {created}") log(f"created OIDC app {APP_NAME} client_id={client_id}") return app_id, client_id, client_secret def regenerate_secret(project_id: str, app_id: str) -> str: data = api( "POST", f"/management/v1/projects/{project_id}/apps/{app_id}/oidc_config/_generate_client_secret", {}, ) secret = data.get("clientSecret") or "" if not secret: raise BootstrapError(f"regenerate client secret: unexpected response {data}") log("regenerated OIDC client secret") return secret def ensure_oidc_app(saved: dict[str, str]) -> tuple[str, str]: project_id = find_project() app_id, client_id = find_app(project_id) if not app_id: app_id, client_id, client_secret = create_app(project_id) return client_id, client_secret log(f"OIDC app exists client_id={client_id}") saved_id = saved.get("ZITADEL_OAUTH_CLIENT_ID", "") saved_secret = saved.get("ZITADEL_OAUTH_CLIENT_SECRET", "") if saved_id == client_id and saved_secret: return client_id, saved_secret return client_id, regenerate_secret(project_id, app_id) def main() -> int: try: oid = org_id() ldap_idp_id = ensure_ldap_idp() saved = load_saved() client_id, client_secret = ensure_oidc_app(saved) write_env(oid, client_id, client_secret, ldap_idp_id) print(f"LDAP IdP={ldap_idp_id} OAuth client={client_id}") return 0 except BootstrapError as e: log(f"error: {e}") return 1 if __name__ == "__main__": raise SystemExit(main())