""" Keycloak export: convert UserRecords to Keycloak Admin REST API representations. Single-user export: user_to_keycloak(user, realm="net-kingdom") → dict Bulk export (Keycloak partial import body): bulk_export_body(users, realm="net-kingdom") → dict POST /admin/realms/{realm}/partialImport Deterministic IDs: UUIDs are generated with uuid5(DNS, "local-identity.{realm}.{username}") so repeated exports of the same user produce the same Keycloak ID. This makes re-imports idempotent when ifResourceExists="SKIP". Isolation markers: All exported users carry: attributes.local_identity_environment = ["local"] Generated test users additionally carry: attributes.local_identity_generated = ["true"] Configure Keycloak to deny authentication for users with local_identity_environment == "local" as a defence-in-depth measure. See docs/LocalIdentity.md — "Isolation guarantee" section. """ from __future__ import annotations import uuid from typing import Sequence from .user import UserRecord # ------------------------------------------------------------------ # # Name splitting # # ------------------------------------------------------------------ # def split_fullname(fullname: str) -> tuple[str, str]: """ Split a display name into (firstName, lastName). Strategy: the last whitespace-separated token is lastName; everything before it is firstName. A single-word name maps to (name, ''). Examples: "Bernd Worsch" → ("Bernd", "Worsch") "Jean Claude Damme" → ("Jean Claude", "Damme") "Madonna" → ("Madonna", "") """ parts = fullname.strip().split() if len(parts) >= 2: return " ".join(parts[:-1]), parts[-1] return fullname.strip(), "" # ------------------------------------------------------------------ # # Deterministic ID # # ------------------------------------------------------------------ # def _deterministic_id(realm: str, username: str) -> str: """ Stable UUID5 for a (realm, username) pair. Using NAMESPACE_DNS keeps it domain-agnostic and collision-resistant. """ return str(uuid.uuid5(uuid.NAMESPACE_DNS, f"local-identity.{realm}.{username}")) # ------------------------------------------------------------------ # # Schema validation # # ------------------------------------------------------------------ # _REQUIRED_FIELDS: dict[str, type] = { "username": str, "firstName": str, "lastName": str, "email": str, "enabled": bool, "emailVerified": bool, "attributes": dict, "credentials": list, } def validate_keycloak_user(d: dict) -> None: """ Minimal structural validation against the expected Keycloak user schema. Raises ValueError listing every missing or wrongly-typed field, so that changes to user_to_keycloak() are caught before they reach a live realm. """ errors: list[str] = [] for field, expected in _REQUIRED_FIELDS.items(): if field not in d: errors.append(f"missing required field '{field}'") elif not isinstance(d[field], expected): actual = type(d[field]).__name__ errors.append( f"field '{field}': expected {expected.__name__}, got {actual}" ) if errors: raise ValueError( "Keycloak user schema validation failed:\n" + "\n".join(f" - {e}" for e in errors) ) # ------------------------------------------------------------------ # # Conversion # # ------------------------------------------------------------------ # def user_to_keycloak(user: UserRecord, realm: str = "net-kingdom") -> dict: """ Convert a UserRecord to a Keycloak Admin REST API user representation. production_identity mapping: If the user carries a production_identity block, the Keycloak username and realm are taken from it. This allows a local username of 'tegwick' to map to a different production username/realm before import. """ first_name, last_name = split_fullname(user.fullname) kc_username = user.username kc_realm = realm if user.production_identity: kc_username = user.production_identity.username if user.production_identity.realm: kc_realm = user.production_identity.realm attrs: dict[str, list[str]] = { "local_identity_environment": [user.environment], } if user.generated: attrs["local_identity_generated"] = ["true"] d: dict = { "id": _deterministic_id(kc_realm, kc_username), "username": kc_username, "firstName": first_name, "lastName": last_name, "email": user.email, "enabled": True, "emailVerified": False, "attributes": attrs, "credentials": [], "requiredActions": [], "groups": [], "realmRoles": [], "clientRoles": {}, } validate_keycloak_user(d) return d # ------------------------------------------------------------------ # # Bulk export # # ------------------------------------------------------------------ # def bulk_export_body( users: Sequence[UserRecord], realm: str = "net-kingdom", if_resource_exists: str = "SKIP", ) -> dict: """ Produce a Keycloak partial import request body. POST /admin/realms/{realm}/partialImport Content-Type: application/json ifResourceExists values: SKIP | OVERWRITE | FAIL SKIP is the safe default — existing users are left unchanged. """ return { "ifResourceExists": if_resource_exists, "realm": realm, "users": [user_to_keycloak(u, realm=realm) for u in users], }