feat(local-identity): Stage 2 — Keycloak export & bootstrap integration (NK-WP-0002-T02)

export.py:
  - split_fullname(): last-token strategy (Bernd Worsch → firstName/lastName)
  - _deterministic_id(): uuid5(DNS, "local-identity.{realm}.{username}") for stable,
    re-import-idempotent Keycloak IDs
  - user_to_keycloak(): full Keycloak Admin REST API user representation;
    production_identity mapping applied to username + realm; isolation attributes
    (local_identity_environment, local_identity_generated) always present;
    validate_keycloak_user() called on every conversion to catch schema drift
  - bulk_export_body(): partial import body (ifResourceExists/realm/users)

cli.py: add `export` subcommand
  - export <username>         single user, prints Keycloak JSON
  - export (no args)          bulk; primary users only; stderr note on skipped test users
  - export --include-test     bulk; all users including generated
  - --realm / --if-resource-exists flags

docs/LocalIdentity.md: add two new sections
  - Keycloak import procedure: export → partialImport API → password reset → retire
  - Isolation guarantee: attribute schema, Keycloak Condition authenticator config,
    production_identity mapping walkthrough

tests/test_export.py: 34 new tests (88 total, all passing)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-02 00:23:39 +01:00
parent 666a56f4ed
commit dad8365e6a
5 changed files with 523 additions and 6 deletions

View File

@@ -8,16 +8,21 @@ Commands:
resolved flag > config > system derivation.
list List all users in the store.
show <username> Display a user's YAML record.
export [<username>] Export a single user as Keycloak JSON.
export --all [--realm R] Bulk partial-import body (primary users only).
Add --include-test to include generated users.
Environment:
LOCAL_IDENTITY_HOME Override the store directory (default: ~/.local-identity).
"""
import argparse
import json
import sys
from .gecos import current_username, get_gecos_fullname
from .user import UserRecord, make_test_user
from . import export as export_mod
from . import store
@@ -87,6 +92,38 @@ def cmd_list(args: argparse.Namespace) -> None:
print(f"{u.username:<20} {u.fullname:<30} {u.email:<40} {utype}")
def cmd_export(args: argparse.Namespace) -> None:
if args.username:
# Single-user export
try:
user = store.read_user(args.username)
except FileNotFoundError as exc:
print(str(exc), file=sys.stderr)
sys.exit(1)
kc = export_mod.user_to_keycloak(user, realm=args.realm)
print(json.dumps(kc, indent=2))
else:
# Bulk export
all_users = store.list_users()
if args.include_test:
users = all_users
else:
users = [u for u in all_users if not u.generated]
skipped = len(all_users) - len(users)
if skipped:
print(
f"Note: skipping {skipped} test user(s). "
"Use --include-test to export them.",
file=sys.stderr,
)
body = export_mod.bulk_export_body(
users,
realm=args.realm,
if_resource_exists=args.if_resource_exists,
)
print(json.dumps(body, indent=2))
def cmd_show(args: argparse.Namespace) -> None:
try:
user = store.read_user(args.username)
@@ -129,6 +166,29 @@ def main() -> None:
)
p_init.set_defaults(func=cmd_init)
p_export = sub.add_parser(
"export",
help="Export user(s) as Keycloak-compatible JSON",
)
p_export.add_argument(
"username", nargs="?",
help="Export a single user (omit for bulk export)",
)
p_export.add_argument(
"--realm", default="net-kingdom",
help="Target Keycloak realm name (default: net-kingdom)",
)
p_export.add_argument(
"--include-test", action="store_true",
help="Include generated test users in bulk export",
)
p_export.add_argument(
"--if-resource-exists", default="SKIP",
choices=["SKIP", "OVERWRITE", "FAIL"],
help="Conflict strategy for bulk import (default: SKIP)",
)
p_export.set_defaults(func=cmd_export)
p_list = sub.add_parser("list", help="List all users in the store")
p_list.set_defaults(func=cmd_list)

View File

@@ -0,0 +1,175 @@
"""
Keycloak export: convert UserRecords to Keycloak Admin REST API representations.
Single-user export:
user_to_keycloak(user, realm="net-kingdom") → dict
Bulk export (Keycloak partial import body):
bulk_export_body(users, realm="net-kingdom") → dict
POST /admin/realms/{realm}/partialImport
Deterministic IDs:
UUIDs are generated with uuid5(DNS, "local-identity.{realm}.{username}") so
repeated exports of the same user produce the same Keycloak ID. This makes
re-imports idempotent when ifResourceExists="SKIP".
Isolation markers:
All exported users carry:
attributes.local_identity_environment = ["local"]
Generated test users additionally carry:
attributes.local_identity_generated = ["true"]
Configure Keycloak to deny authentication for users with
local_identity_environment == "local" as a defence-in-depth measure.
See docs/LocalIdentity.md — "Isolation guarantee" section.
"""
from __future__ import annotations
import uuid
from typing import Sequence
from .user import UserRecord
# ------------------------------------------------------------------ #
# Name splitting #
# ------------------------------------------------------------------ #
def split_fullname(fullname: str) -> tuple[str, str]:
"""
Split a display name into (firstName, lastName).
Strategy: the last whitespace-separated token is lastName; everything
before it is firstName. A single-word name maps to (name, '').
Examples:
"Bernd Worsch" → ("Bernd", "Worsch")
"Jean Claude Damme" → ("Jean Claude", "Damme")
"Madonna" → ("Madonna", "")
"""
parts = fullname.strip().split()
if len(parts) >= 2:
return " ".join(parts[:-1]), parts[-1]
return fullname.strip(), ""
# ------------------------------------------------------------------ #
# Deterministic ID #
# ------------------------------------------------------------------ #
def _deterministic_id(realm: str, username: str) -> str:
"""
Stable UUID5 for a (realm, username) pair.
Using NAMESPACE_DNS keeps it domain-agnostic and collision-resistant.
"""
return str(uuid.uuid5(uuid.NAMESPACE_DNS, f"local-identity.{realm}.{username}"))
# ------------------------------------------------------------------ #
# Schema validation #
# ------------------------------------------------------------------ #
_REQUIRED_FIELDS: dict[str, type] = {
"username": str,
"firstName": str,
"lastName": str,
"email": str,
"enabled": bool,
"emailVerified": bool,
"attributes": dict,
"credentials": list,
}
def validate_keycloak_user(d: dict) -> None:
"""
Minimal structural validation against the expected Keycloak user schema.
Raises ValueError listing every missing or wrongly-typed field, so that
changes to user_to_keycloak() are caught before they reach a live realm.
"""
errors: list[str] = []
for field, expected in _REQUIRED_FIELDS.items():
if field not in d:
errors.append(f"missing required field '{field}'")
elif not isinstance(d[field], expected):
actual = type(d[field]).__name__
errors.append(
f"field '{field}': expected {expected.__name__}, got {actual}"
)
if errors:
raise ValueError(
"Keycloak user schema validation failed:\n"
+ "\n".join(f" - {e}" for e in errors)
)
# ------------------------------------------------------------------ #
# Conversion #
# ------------------------------------------------------------------ #
def user_to_keycloak(user: UserRecord, realm: str = "net-kingdom") -> dict:
"""
Convert a UserRecord to a Keycloak Admin REST API user representation.
production_identity mapping:
If the user carries a production_identity block, the Keycloak username
and realm are taken from it. This allows a local username of 'tegwick'
to map to a different production username/realm before import.
"""
first_name, last_name = split_fullname(user.fullname)
kc_username = user.username
kc_realm = realm
if user.production_identity:
kc_username = user.production_identity.username
if user.production_identity.realm:
kc_realm = user.production_identity.realm
attrs: dict[str, list[str]] = {
"local_identity_environment": [user.environment],
}
if user.generated:
attrs["local_identity_generated"] = ["true"]
d: dict = {
"id": _deterministic_id(kc_realm, kc_username),
"username": kc_username,
"firstName": first_name,
"lastName": last_name,
"email": user.email,
"enabled": True,
"emailVerified": False,
"attributes": attrs,
"credentials": [],
"requiredActions": [],
"groups": [],
"realmRoles": [],
"clientRoles": {},
}
validate_keycloak_user(d)
return d
# ------------------------------------------------------------------ #
# Bulk export #
# ------------------------------------------------------------------ #
def bulk_export_body(
users: Sequence[UserRecord],
realm: str = "net-kingdom",
if_resource_exists: str = "SKIP",
) -> dict:
"""
Produce a Keycloak partial import request body.
POST /admin/realms/{realm}/partialImport
Content-Type: application/json
ifResourceExists values: SKIP | OVERWRITE | FAIL
SKIP is the safe default — existing users are left unchanged.
"""
return {
"ifResourceExists": if_resource_exists,
"realm": realm,
"users": [user_to_keycloak(u, realm=realm) for u in users],
}