316 lines
9.6 KiB
Python
316 lines
9.6 KiB
Python
#!/usr/bin/env python3
|
|
"""Idempotent dev bootstrap: OpenLDAP IdP + OIDC Web app for Gateway (k6 / dev-up).
|
|
|
|
Writes deploy/zitadel/machinekey/dev-bootstrap.env with:
|
|
ZITADEL_DEFAULT_ORG_ID, ZITADEL_OAUTH_CLIENT_ID, ZITADEL_OAUTH_CLIENT_SECRET,
|
|
ZITADEL_LDAP_IDP_ID, ZITADEL_GOOGLE_IDP_ID (empty)
|
|
|
|
Requires: ZITADEL up, PAT at deploy/zitadel/machinekey/zitadel-admin-sa.token
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import urllib.error
|
|
import urllib.request
|
|
from pathlib import Path
|
|
|
|
ROOT = Path(__file__).resolve().parents[2]
|
|
PAT_FILE = ROOT / "deploy/zitadel/machinekey/zitadel-admin-sa.token"
|
|
OUT_FILE = ROOT / "deploy/zitadel/machinekey/dev-bootstrap.env"
|
|
|
|
ZITADEL_BASE = os.environ.get("ZITADEL_BASE", "http://localhost:8080").rstrip("/")
|
|
|
|
LDAP_IDP_NAME = "GatewayDevLDAP"
|
|
PROJECT_NAME = "Gateway"
|
|
APP_NAME = "Gateway Backend"
|
|
REDIRECT_URIS = [
|
|
"http://localhost:5173/auth/callback/login",
|
|
"http://localhost:5173/auth/callback/register",
|
|
]
|
|
POST_LOGOUT_URIS = ["http://localhost:5173/"]
|
|
|
|
LDAP_BODY = {
|
|
"name": LDAP_IDP_NAME,
|
|
"servers": ["ldap://openldap:389"],
|
|
"startTls": False,
|
|
"baseDn": "dc=gateway,dc=local",
|
|
"bindDn": "cn=admin,dc=gateway,dc=local",
|
|
"bindPassword": "admin",
|
|
"userBase": "ou=people,dc=gateway,dc=local",
|
|
"userObjectClasses": ["inetOrgPerson"],
|
|
"userFilters": ["(uid=%s)"],
|
|
"attributes": {
|
|
"idAttribute": "uid",
|
|
"emailAttribute": "mail",
|
|
"firstNameAttribute": "givenName",
|
|
"lastNameAttribute": "sn",
|
|
"displayNameAttribute": "cn",
|
|
"nickNameAttribute": "uid",
|
|
},
|
|
"creationAllowed": True,
|
|
"linkingAllowed": True,
|
|
"autoCreation": True,
|
|
"autoUpdate": True,
|
|
}
|
|
|
|
|
|
class BootstrapError(RuntimeError):
|
|
pass
|
|
|
|
|
|
def log(msg: str) -> None:
|
|
print(f"[zitadel-bootstrap] {msg}", file=sys.stderr)
|
|
|
|
|
|
def api(method: str, path: str, body: dict | None = None) -> dict:
|
|
url = f"{ZITADEL_BASE}{path}"
|
|
data = None if body is None else json.dumps(body).encode()
|
|
req = urllib.request.Request(
|
|
url,
|
|
data=data,
|
|
method=method,
|
|
headers={
|
|
"Authorization": f"Bearer {read_pat()}",
|
|
"Content-Type": "application/json",
|
|
"Accept": "application/json",
|
|
},
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
raw = resp.read().decode()
|
|
if not raw.strip():
|
|
return {}
|
|
return json.loads(raw)
|
|
except urllib.error.HTTPError as e:
|
|
detail = e.read().decode(errors="replace")
|
|
raise BootstrapError(f"{method} {path} -> HTTP {e.code}: {detail}") from e
|
|
|
|
|
|
def read_pat() -> str:
|
|
if not PAT_FILE.is_file():
|
|
raise BootstrapError(f"PAT missing: {PAT_FILE} (run make k6-wait)")
|
|
pat = PAT_FILE.read_text().strip()
|
|
if not pat:
|
|
raise BootstrapError(f"PAT empty: {PAT_FILE}")
|
|
return pat
|
|
|
|
|
|
def load_saved() -> dict[str, str]:
|
|
if not OUT_FILE.is_file():
|
|
return {}
|
|
out: dict[str, str] = {}
|
|
for line in OUT_FILE.read_text().splitlines():
|
|
line = line.strip()
|
|
if not line or line.startswith("#") or "=" not in line:
|
|
continue
|
|
k, _, v = line.partition("=")
|
|
if k.startswith("export "):
|
|
k = k[len("export ") :]
|
|
out[k.strip()] = v.strip().strip('"').strip("'")
|
|
return out
|
|
|
|
|
|
def write_env(org_id: str, client_id: str, client_secret: str, ldap_idp_id: str) -> None:
|
|
OUT_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
content = f"""# Auto-generated by deploy/zitadel/bootstrap_dev.py — do not commit
|
|
export ZITADEL_DEFAULT_ORG_ID={org_id}
|
|
export ZITADEL_OAUTH_CLIENT_ID={client_id}
|
|
export ZITADEL_OAUTH_CLIENT_SECRET={client_secret}
|
|
export ZITADEL_LDAP_IDP_ID={ldap_idp_id}
|
|
export ZITADEL_GOOGLE_IDP_ID=
|
|
"""
|
|
OUT_FILE.write_text(content)
|
|
log(f"wrote {OUT_FILE.relative_to(ROOT)}")
|
|
|
|
|
|
def org_id() -> str:
|
|
data = api("GET", "/management/v1/orgs/me")
|
|
oid = (data.get("org") or {}).get("id") or ""
|
|
if not oid:
|
|
raise BootstrapError("could not resolve org id")
|
|
return oid
|
|
|
|
|
|
def login_policy() -> tuple[dict, bool]:
|
|
data = api("GET", "/management/v1/policies/login")
|
|
return data.get("policy") or {}, bool(data.get("isDefault"))
|
|
|
|
|
|
def find_ldap_idp_in_policy(policy: dict) -> str:
|
|
for item in policy.get("idps") or []:
|
|
if item.get("idpName") == LDAP_IDP_NAME:
|
|
return item.get("idpId") or ""
|
|
return ""
|
|
|
|
|
|
def ensure_ldap_idp() -> str:
|
|
policy, is_default = login_policy()
|
|
existing = find_ldap_idp_in_policy(policy)
|
|
if existing:
|
|
log(f"LDAP IdP already linked: {existing}")
|
|
return existing
|
|
|
|
created = api("POST", "/management/v1/idps/ldap", LDAP_BODY)
|
|
idp_id = created.get("id") or ""
|
|
if not idp_id:
|
|
raise BootstrapError(f"create LDAP IdP: unexpected response {created}")
|
|
|
|
log(f"created LDAP IdP {idp_id}")
|
|
|
|
if is_default or not policy.get("allowExternalIdp"):
|
|
api(
|
|
"POST",
|
|
"/management/v1/policies/login",
|
|
{
|
|
"allowExternalIdp": True,
|
|
"allowUsernamePassword": True,
|
|
"allowRegister": True,
|
|
"passwordlessType": "PASSWORDLESS_TYPE_ALLOWED",
|
|
"idps": [{"idpId": idp_id, "ownerType": "IDP_OWNER_TYPE_ORG"}],
|
|
},
|
|
)
|
|
log("created org login policy with LDAP IdP")
|
|
else:
|
|
api("POST", "/management/v1/policies/login/idps", {"idpId": idp_id})
|
|
log("linked LDAP IdP to existing login policy")
|
|
|
|
return idp_id
|
|
|
|
|
|
def find_project() -> str:
|
|
data = api(
|
|
"POST",
|
|
"/management/v1/projects/_search",
|
|
{
|
|
"queries": [
|
|
{
|
|
"nameQuery": {
|
|
"name": PROJECT_NAME,
|
|
"method": "TEXT_QUERY_METHOD_EQUALS",
|
|
}
|
|
}
|
|
]
|
|
},
|
|
)
|
|
for item in data.get("result") or []:
|
|
if item.get("name") == PROJECT_NAME:
|
|
return item.get("id") or ""
|
|
created = api(
|
|
"POST",
|
|
"/management/v1/projects",
|
|
{
|
|
"name": PROJECT_NAME,
|
|
"projectRoleAssertion": True,
|
|
"projectRoleCheck": False,
|
|
"hasProjectCheck": False,
|
|
"privateLabelingSetting": "PRIVATE_LABELING_SETTING_UNSPECIFIED",
|
|
},
|
|
)
|
|
pid = created.get("id") or ""
|
|
if not pid:
|
|
raise BootstrapError(f"create project: unexpected response {created}")
|
|
log(f"created project {PROJECT_NAME} ({pid})")
|
|
return pid
|
|
|
|
|
|
def find_app(project_id: str) -> tuple[str, str]:
|
|
data = api(
|
|
"POST",
|
|
f"/management/v1/projects/{project_id}/apps/_search",
|
|
{
|
|
"queries": [
|
|
{
|
|
"nameQuery": {
|
|
"name": APP_NAME,
|
|
"method": "TEXT_QUERY_METHOD_EQUALS",
|
|
}
|
|
}
|
|
]
|
|
},
|
|
)
|
|
for item in data.get("result") or []:
|
|
if item.get("name") != APP_NAME:
|
|
continue
|
|
app_id = item.get("id") or ""
|
|
client_id = (item.get("oidcConfig") or {}).get("clientId") or ""
|
|
return app_id, client_id
|
|
return "", ""
|
|
|
|
|
|
def create_app(project_id: str) -> tuple[str, str, str]:
|
|
created = api(
|
|
"POST",
|
|
f"/management/v1/projects/{project_id}/apps/oidc",
|
|
{
|
|
"name": APP_NAME,
|
|
"redirectUris": REDIRECT_URIS,
|
|
"responseTypes": ["OIDC_RESPONSE_TYPE_CODE"],
|
|
"grantTypes": [
|
|
"OIDC_GRANT_TYPE_AUTHORIZATION_CODE",
|
|
"OIDC_GRANT_TYPE_REFRESH_TOKEN",
|
|
],
|
|
"appType": "OIDC_APP_TYPE_WEB",
|
|
"authMethodType": "OIDC_AUTH_METHOD_TYPE_BASIC",
|
|
"postLogoutRedirectUris": POST_LOGOUT_URIS,
|
|
"devMode": True,
|
|
"accessTokenType": "OIDC_TOKEN_TYPE_BEARER",
|
|
},
|
|
)
|
|
app_id = created.get("appId") or ""
|
|
client_id = created.get("clientId") or ""
|
|
client_secret = created.get("clientSecret") or ""
|
|
if not app_id or not client_id or not client_secret:
|
|
raise BootstrapError(f"create OIDC app: unexpected response {created}")
|
|
log(f"created OIDC app {APP_NAME} client_id={client_id}")
|
|
return app_id, client_id, client_secret
|
|
|
|
|
|
def regenerate_secret(project_id: str, app_id: str) -> str:
|
|
data = api(
|
|
"POST",
|
|
f"/management/v1/projects/{project_id}/apps/{app_id}/oidc_config/_generate_client_secret",
|
|
{},
|
|
)
|
|
secret = data.get("clientSecret") or ""
|
|
if not secret:
|
|
raise BootstrapError(f"regenerate client secret: unexpected response {data}")
|
|
log("regenerated OIDC client secret")
|
|
return secret
|
|
|
|
|
|
def ensure_oidc_app(saved: dict[str, str]) -> tuple[str, str]:
|
|
project_id = find_project()
|
|
app_id, client_id = find_app(project_id)
|
|
if not app_id:
|
|
app_id, client_id, client_secret = create_app(project_id)
|
|
return client_id, client_secret
|
|
|
|
log(f"OIDC app exists client_id={client_id}")
|
|
saved_id = saved.get("ZITADEL_OAUTH_CLIENT_ID", "")
|
|
saved_secret = saved.get("ZITADEL_OAUTH_CLIENT_SECRET", "")
|
|
if saved_id == client_id and saved_secret:
|
|
return client_id, saved_secret
|
|
|
|
return client_id, regenerate_secret(project_id, app_id)
|
|
|
|
|
|
def main() -> int:
|
|
try:
|
|
oid = org_id()
|
|
ldap_idp_id = ensure_ldap_idp()
|
|
saved = load_saved()
|
|
client_id, client_secret = ensure_oidc_app(saved)
|
|
write_env(oid, client_id, client_secret, ldap_idp_id)
|
|
print(f"LDAP IdP={ldap_idp_id} OAuth client={client_id}")
|
|
return 0
|
|
except BootstrapError as e:
|
|
log(f"error: {e}")
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|