Files
net-kingdom/local-identity/src/local_identity/cli.py
2026-05-02 16:58:44 +02:00

383 lines
13 KiB
Python

"""
local-identity CLI — entry point.
Commands:
init [--force] [--username U] [--fullname N] [--email E]
Derive primary user, generate test users,
write store. All three identity fields are
resolved flag > config > system derivation.
list List all users in the store.
show <username> Display a user's YAML record.
export [<username>] Export a single user as Keycloak JSON.
export --all [--realm R] Bulk partial-import body (primary users only).
Add --include-test to include generated users.
bootstrap-oidc Persist and print local OIDC client settings.
serve [--port P] [--ttl T] Start the minimal OIDC server on 127.0.0.1.
security-check Validate filesystem permissions.
revoke-token <jti-or-jwt> Add a token JTI to the revocation list.
Environment:
LOCAL_IDENTITY_HOME Override the store directory (default: ~/.local-identity).
"""
import argparse
import json
import shlex
import sys
import urllib.parse
from .gecos import current_username, get_gecos_fullname
from .jwt_utils import JWTError, extract_unverified_payload
from .user import UserRecord, make_test_user
from . import audit
from . import export as export_mod
from . import revoke as revoke_mod
from . import serve as serve_mod
from . import store
from .security import enforce_permissions, print_security_check
# Commands that must not run the startup permission check
_SKIP_ENFORCE = {"init", "security-check"}
_LOCAL_OIDC_HOSTS = {"127.0.0.1", "localhost", "::1"}
def _resolve_init_params(args: argparse.Namespace, config: dict) -> tuple[str, str, str]:
"""
Resolve (username, fullname, email) for init from three sources in order:
1. CLI flags (--username, --fullname, --email)
2. Persisted config (~/.local-identity/config.yaml)
3. System derivation ($USER / /etc/passwd GECOS) — username + fullname only
Email has no system default; missing email falls through to prompt in cmd_init.
"""
username: str = args.username or config.get("username") or current_username()
fullname: str = args.fullname or config.get("fullname") or get_gecos_fullname(username)
email: str = args.email or config.get("email") or ""
return username, fullname, email
def cmd_init(args: argparse.Namespace) -> None:
if store.store_exists() and not args.force:
print(
f"Store already exists at {store._store_dir()}. "
"Use --force to reinitialise.",
file=sys.stderr,
)
sys.exit(1)
config = store.read_config() if store.store_exists() else {}
username, fullname, email = _resolve_init_params(args, config)
if not email:
try:
email = input(f"Email address for {username}: ").strip()
except EOFError:
email = ""
if not email:
print("Error: email address is required.", file=sys.stderr)
sys.exit(1)
store.init_dirs()
config.update({"username": username, "fullname": fullname, "email": email})
store.write_config(config)
primary = UserRecord(username=username, fullname=fullname, email=email)
store.write_user(primary)
test1 = make_test_user(primary, 1)
test2 = make_test_user(primary, 2)
store.write_user(test1)
store.write_user(test2)
audit.log_event("init", username, "ok")
print(f"Initialised local-identity store at {store._store_dir()}")
print(f" Primary : {primary.username} ({primary.fullname}) <{primary.email}>")
print(f" Test 1 : {test1.username} <{test1.email}>")
print(f" Test 2 : {test2.username} <{test2.email}>")
def cmd_list(args: argparse.Namespace) -> None:
users = store.list_users()
if not users:
print("No users found. Run 'local-identity init' first.")
return
header = f"{'USERNAME':<20} {'FULLNAME':<30} {'EMAIL':<40} TYPE"
print(header)
print("-" * len(header))
for u in users:
utype = "test " if u.generated else "primary"
print(f"{u.username:<20} {u.fullname:<30} {u.email:<40} {utype}")
audit.log_event("list", None, f"{len(users)}_users")
def cmd_export(args: argparse.Namespace) -> None:
if args.username:
try:
user = store.read_user(args.username)
except FileNotFoundError as exc:
print(str(exc), file=sys.stderr)
sys.exit(1)
kc = export_mod.user_to_keycloak(user, realm=args.realm)
print(json.dumps(kc, indent=2))
audit.log_event("export", args.username, "ok")
else:
all_users = store.list_users()
if args.include_test:
users = all_users
else:
users = [u for u in all_users if not u.generated]
skipped = len(all_users) - len(users)
if skipped:
print(
f"Note: skipping {skipped} test user(s). "
"Use --include-test to export them.",
file=sys.stderr,
)
body = export_mod.bulk_export_body(
users,
realm=args.realm,
if_resource_exists=args.if_resource_exists,
)
print(json.dumps(body, indent=2))
audit.log_event("export", "--all", f"{len(users)}_users")
def cmd_show(args: argparse.Namespace) -> None:
try:
user = store.read_user(args.username)
except FileNotFoundError as exc:
print(str(exc), file=sys.stderr)
sys.exit(1)
print(user.to_yaml(), end="")
audit.log_event("show", args.username, "ok")
def cmd_serve(args: argparse.Namespace) -> None:
serve_mod.run_server(port=args.port, token_ttl=args.ttl)
def _validate_loopback_redirect_uri(uri: str) -> None:
parsed = urllib.parse.urlparse(uri)
if parsed.scheme not in {"http", "https"} or not parsed.netloc:
raise ValueError("redirect URI must be an absolute http(s) URL")
if parsed.hostname not in _LOCAL_OIDC_HOSTS:
raise ValueError("redirect URI must use localhost or a loopback address")
def _oidc_bootstrap_payload(args: argparse.Namespace) -> dict:
_validate_loopback_redirect_uri(args.redirect_uri)
issuer = f"{args.scheme}://127.0.0.1:{args.port}"
return {
"issuer": issuer,
"discovery_url": f"{issuer}/.well-known/openid-configuration",
"client_id": args.client_id,
"redirect_uri": args.redirect_uri,
"scope": args.scope,
"token_endpoint_auth_method": "none",
}
def cmd_bootstrap_oidc(args: argparse.Namespace) -> None:
if not store.store_exists():
print(
"Error: store not initialised. Run 'local-identity init' first.",
file=sys.stderr,
)
sys.exit(1)
try:
payload = _oidc_bootstrap_payload(args)
except ValueError as exc:
print(f"Error: {exc}.", file=sys.stderr)
sys.exit(1)
config = store.read_config()
clients = config.setdefault("oidc_clients", {})
clients[args.client_id] = payload
config["last_oidc_bootstrap"] = args.client_id
store.write_config(config)
audit.log_event("bootstrap-oidc", args.client_id, "ok")
if args.output == "json":
print(json.dumps(payload, indent=2))
return
for key, value in {
"OIDC_ISSUER": payload["issuer"],
"OIDC_DISCOVERY_URL": payload["discovery_url"],
"OIDC_CLIENT_ID": payload["client_id"],
"OIDC_REDIRECT_URI": payload["redirect_uri"],
"OIDC_SCOPE": payload["scope"],
"OIDC_TOKEN_ENDPOINT_AUTH_METHOD": payload["token_endpoint_auth_method"],
}.items():
print(f"{key}={shlex.quote(value)}")
def cmd_security_check(args: argparse.Namespace) -> None:
rc = print_security_check()
sys.exit(rc)
def cmd_revoke_token(args: argparse.Namespace) -> None:
token_or_jti: str = args.token
if token_or_jti.count(".") == 2:
# Looks like a JWT — extract the JTI from the payload
try:
payload = extract_unverified_payload(token_or_jti)
jti = payload.get("jti")
if not jti:
print("Error: JWT has no 'jti' claim.", file=sys.stderr)
sys.exit(1)
except JWTError as exc:
print(f"Error decoding JWT: {exc}", file=sys.stderr)
sys.exit(1)
else:
jti = token_or_jti
revoke_mod.revoke(jti)
print(f"Token revoked: {jti}")
audit.log_event("revoke-token", None, f"jti={jti}")
def main() -> None:
parser = argparse.ArgumentParser(
prog="local-identity",
description="Zero-dependency bootstrap user store for net-kingdom environments.",
epilog=(
"Store location: ~/.local-identity "
"(override with LOCAL_IDENTITY_HOME)"
),
)
sub = parser.add_subparsers(dest="command", required=True)
p_init = sub.add_parser(
"init",
help="Initialise store from Linux identity",
)
p_init.add_argument(
"--force", action="store_true",
help="Reinitialise even if the store already exists",
)
p_init.add_argument(
"--username",
help="Bootstrap username (default: $USER / $LOGNAME)",
)
p_init.add_argument(
"--fullname",
help="Full display name (default: /etc/passwd GECOS field)",
)
p_init.add_argument(
"--email",
help="Email address (default: interactive prompt)",
)
p_init.set_defaults(func=cmd_init)
p_export = sub.add_parser(
"export",
help="Export user(s) as Keycloak-compatible JSON",
)
p_export.add_argument(
"username", nargs="?",
help="Export a single user (omit for bulk export)",
)
p_export.add_argument(
"--realm", default="net-kingdom",
help="Target Keycloak realm name (default: net-kingdom)",
)
p_export.add_argument(
"--include-test", action="store_true",
help="Include generated test users in bulk export",
)
p_export.add_argument(
"--if-resource-exists", default="SKIP",
choices=["SKIP", "OVERWRITE", "FAIL"],
help="Conflict strategy for bulk import (default: SKIP)",
)
p_export.set_defaults(func=cmd_export)
p_list = sub.add_parser("list", help="List all users in the store")
p_list.set_defaults(func=cmd_list)
p_show = sub.add_parser("show", help="Display a user record")
p_show.add_argument("username", help="Username to display")
p_show.set_defaults(func=cmd_show)
p_serve = sub.add_parser(
"serve",
help="Start the minimal OIDC server (127.0.0.1 only)",
)
p_serve.add_argument(
"--port", type=int, default=8443,
help="Port to listen on (default: 8443)",
)
p_serve.add_argument(
"--ttl", type=int, default=3600,
help="Token TTL in seconds (default: 3600)",
)
p_serve.set_defaults(func=cmd_serve)
p_bootstrap_oidc = sub.add_parser(
"bootstrap-oidc",
help="Persist and print localhost OIDC client bootstrap settings",
)
p_bootstrap_oidc.add_argument(
"--client-id",
default="local-dev",
help="OIDC client ID to advertise (default: local-dev)",
)
p_bootstrap_oidc.add_argument(
"--redirect-uri",
default="http://127.0.0.1:3000/callback",
help="Loopback redirect URI for the local client",
)
p_bootstrap_oidc.add_argument(
"--port", type=int, default=8443,
help="Port used by local-identity serve (default: 8443)",
)
p_bootstrap_oidc.add_argument(
"--scheme",
choices=["https", "http"],
default="https",
help="Issuer URL scheme (default: https)",
)
p_bootstrap_oidc.add_argument(
"--scope",
default="openid profile email",
help="OIDC scope string (default: openid profile email)",
)
p_bootstrap_oidc.add_argument(
"--output",
choices=["env", "json"],
default="env",
help="Output format (default: env)",
)
p_bootstrap_oidc.set_defaults(func=cmd_bootstrap_oidc)
sub.add_parser(
"security-check",
help="Validate filesystem permissions of the store",
).set_defaults(func=cmd_security_check)
p_revoke = sub.add_parser(
"revoke-token",
help="Add a token JTI to the revocation list",
)
p_revoke.add_argument(
"token",
help="Token JTI (UUID) or full JWT — the JTI is extracted automatically",
)
p_revoke.set_defaults(func=cmd_revoke_token)
args = parser.parse_args()
if args.command not in _SKIP_ENFORCE:
enforce_permissions()
args.func(args)
if __name__ == "__main__":
main()