diff --git a/docs/LocalIdentity.md b/docs/LocalIdentity.md index bf72780..69f3b26 100644 --- a/docs/LocalIdentity.md +++ b/docs/LocalIdentity.md @@ -107,11 +107,13 @@ representation to prevent silent drift. ## CLI reference ``` -local-identity init # derive primary user, generate test users -local-identity list # list all users in the store -local-identity show # display user file -local-identity export # emit Keycloak-compatible JSON -local-identity security-check # validate filesystem permissions and config +local-identity init # derive primary user, generate test users +local-identity list # list all users in the store +local-identity show # display user file +local-identity export # emit Keycloak-compatible JSON +local-identity serve [--port P] [--ttl T] # start minimal OIDC server +local-identity security-check # validate filesystem permissions +local-identity revoke-token # add a token JTI to the revocation list ``` ## OIDC provider (Stage 3) @@ -228,6 +230,81 @@ production_identity: Re-run `local-identity export --all` — the exported JSON will use `bworsch` as the Keycloak username and a deterministic UUID derived from `net-kingdom/bworsch`. +## Security model + +### Threat model + +Local Identity is designed for **single-operator, localhost-only** use. The +threat model covers accidental exposure, not active adversarial attack. + +| Threat | Control | +|--------|---------| +| Other local users reading credential files | `~/.local-identity/` mode `700`; user files mode `600`; startup check exits on violation | +| Attacker elevates a local OIDC token to production | `iss: local-identity` rejected by production Keycloak; `environment: local` attribute rejected by Keycloak attribute check | +| Stolen token used after the fact | Token revocation list (`revoke-token `); configurable TTL (default 1h) | +| Long-lived store left behind post-migration | Explicit retirement step: `rm -rf ~/.local-identity` after Keycloak migration | +| OIDC server exposed on non-loopback interface | Server hard-codes `127.0.0.1`; `0.0.0.0` binding is not offered | + +### Assumptions + +- The operator's Linux account is not compromised (Local Identity cannot + protect against a root-level attacker). +- The `LOCAL_IDENTITY_HOME` environment variable is not set to a + world-readable path by accident. +- The operator's umask does not silently widen permissions before the tool + can apply `os.chmod`. (The tool sets permissions explicitly after every + write, which limits this window.) + +### Non-guarantees + +- **No MFA.** Token issuance requires only user selection in the browser + form; there is no second factor. +- **No audit-log integrity.** `audit.log` is append-only by convention, but + the OS does not enforce append-only at the file level without `chattr +a` + (which requires root). The log records events; it does not prove they were + not tampered with. +- **Self-signed TLS is not CA-trusted.** The TLS certificate generated for + `local-identity serve` is not signed by a trusted CA. OIDC clients must + either skip certificate verification or import the certificate manually. +- **No privilege separation.** All operations run as the operator's user. + +### Optional SELinux / AppArmor hardening + +If your system uses SELinux or AppArmor you can apply labels to further +restrict access to `~/.local-identity/`: + +**SELinux** (example — adapt context type for your policy): +```bash +chcon -R -t user_home_t ~/.local-identity +``` + +**AppArmor** — create a profile snippet that denies access to +`~/.local-identity/` from any process other than `local-identity`: +``` +deny /home/*/.local-identity/ r, +deny /home/*/.local-identity/** r, +``` + +These are optional hardening layers. The tool's own permission controls +(mode 700/600, startup enforcement) provide the baseline. + +### Token revocation + +Tokens issued by `local-identity serve` can be revoked at any time: + +```bash +# By JTI (extract from JWT payload manually or from audit.log): +local-identity revoke-token + +# By passing the full JWT — the JTI is extracted automatically: +local-identity revoke-token +``` + +Revoked JTIs are stored in `~/.local-identity/revoked.json` (mode 600). +The revocation list is checked on every `/userinfo` request. There is no +endpoint to un-revoke a token; if you need to re-grant access, obtain a +new token via the authorization code flow. + ## Relationship to the SSO platform Local Identity is a complementary workstream to the SSO & MFA Platform diff --git a/local-identity/src/local_identity/audit.py b/local-identity/src/local_identity/audit.py new file mode 100644 index 0000000..5c80425 --- /dev/null +++ b/local-identity/src/local_identity/audit.py @@ -0,0 +1,40 @@ +""" +Append-only audit log for local-identity. + +All CLI commands and OIDC server events write one entry per operation to +~/.local-identity/audit.log (mode 600). + +Format (TSV): ISO-timestamp command username outcome +Example: + 2026-03-02T00:00:00Z init alice ok + 2026-03-02T00:01:00Z serve/auth alice auth_request + 2026-03-02T00:01:01Z serve/token alice token_issued + +Failures (e.g. file permission errors) are silently swallowed — the audit +log must never interrupt the primary operation. +""" + +import datetime +import os +from pathlib import Path + +from .store import _store_dir + + +def _audit_log_path() -> Path: + return _store_dir() / "audit.log" + + +def log_event(command: str, username: str | None, outcome: str) -> None: + """Append one audit entry. Silent on any I/O failure.""" + timestamp = datetime.datetime.now(datetime.timezone.utc).strftime( + "%Y-%m-%dT%H:%M:%SZ" + ) + entry = f"{timestamp}\t{command}\t{username or '-'}\t{outcome}\n" + path = _audit_log_path() + try: + with open(path, "a", encoding="utf-8") as fh: + fh.write(entry) + os.chmod(path, 0o600) + except OSError: + pass diff --git a/local-identity/src/local_identity/cli.py b/local-identity/src/local_identity/cli.py index 222370c..e359b7c 100644 --- a/local-identity/src/local_identity/cli.py +++ b/local-identity/src/local_identity/cli.py @@ -12,20 +12,29 @@ Commands: export --all [--realm R] Bulk partial-import body (primary users only). Add --include-test to include generated users. serve [--port P] [--ttl T] Start the minimal OIDC server on 127.0.0.1. + security-check Validate filesystem permissions. + revoke-token Add a token JTI to the revocation list. Environment: LOCAL_IDENTITY_HOME Override the store directory (default: ~/.local-identity). """ import argparse +import base64 import json import sys from .gecos import current_username, get_gecos_fullname from .user import UserRecord, make_test_user +from . import audit from . import export as export_mod +from . import revoke as revoke_mod from . import serve as serve_mod from . import store +from .security import enforce_permissions, print_security_check + +# Commands that must not run the startup permission check +_SKIP_ENFORCE = {"init", "security-check"} def _resolve_init_params(args: argparse.Namespace, config: dict) -> tuple[str, str, str]: @@ -75,6 +84,8 @@ def cmd_init(args: argparse.Namespace) -> None: store.write_user(test1) store.write_user(test2) + audit.log_event("init", username, "ok") + print(f"Initialised local-identity store at {store._store_dir()}") print(f" Primary : {primary.username} ({primary.fullname}) <{primary.email}>") print(f" Test 1 : {test1.username} <{test1.email}>") @@ -92,11 +103,11 @@ def cmd_list(args: argparse.Namespace) -> None: for u in users: utype = "test " if u.generated else "primary" print(f"{u.username:<20} {u.fullname:<30} {u.email:<40} {utype}") + audit.log_event("list", None, f"{len(users)}_users") def cmd_export(args: argparse.Namespace) -> None: if args.username: - # Single-user export try: user = store.read_user(args.username) except FileNotFoundError as exc: @@ -104,8 +115,8 @@ def cmd_export(args: argparse.Namespace) -> None: sys.exit(1) kc = export_mod.user_to_keycloak(user, realm=args.realm) print(json.dumps(kc, indent=2)) + audit.log_event("export", args.username, "ok") else: - # Bulk export all_users = store.list_users() if args.include_test: users = all_users @@ -124,6 +135,7 @@ def cmd_export(args: argparse.Namespace) -> None: if_resource_exists=args.if_resource_exists, ) print(json.dumps(body, indent=2)) + audit.log_event("export", "--all", f"{len(users)}_users") def cmd_show(args: argparse.Namespace) -> None: @@ -133,12 +145,44 @@ def cmd_show(args: argparse.Namespace) -> None: print(str(exc), file=sys.stderr) sys.exit(1) print(user.to_yaml(), end="") + audit.log_event("show", args.username, "ok") def cmd_serve(args: argparse.Namespace) -> None: serve_mod.run_server(port=args.port, token_ttl=args.ttl) +def cmd_security_check(args: argparse.Namespace) -> None: + rc = print_security_check() + sys.exit(rc) + + +def cmd_revoke_token(args: argparse.Namespace) -> None: + token_or_jti: str = args.token + + if token_or_jti.count(".") == 2: + # Looks like a JWT — extract the JTI from the payload + try: + payload_b64 = token_or_jti.split(".")[1] + pad = (4 - len(payload_b64) % 4) % 4 + payload = json.loads( + base64.urlsafe_b64decode(payload_b64 + "=" * pad) + ) + jti = payload.get("jti") + if not jti: + print("Error: JWT has no 'jti' claim.", file=sys.stderr) + sys.exit(1) + except Exception as exc: + print(f"Error decoding JWT: {exc}", file=sys.stderr) + sys.exit(1) + else: + jti = token_or_jti + + revoke_mod.revoke(jti) + print(f"Token revoked: {jti}") + audit.log_event("revoke-token", None, f"jti={jti}") + + def main() -> None: parser = argparse.ArgumentParser( prog="local-identity", @@ -216,7 +260,26 @@ def main() -> None: ) p_serve.set_defaults(func=cmd_serve) + sub.add_parser( + "security-check", + help="Validate filesystem permissions of the store", + ).set_defaults(func=cmd_security_check) + + p_revoke = sub.add_parser( + "revoke-token", + help="Add a token JTI to the revocation list", + ) + p_revoke.add_argument( + "token", + help="Token JTI (UUID) or full JWT — the JTI is extracted automatically", + ) + p_revoke.set_defaults(func=cmd_revoke_token) + args = parser.parse_args() + + if args.command not in _SKIP_ENFORCE: + enforce_permissions() + args.func(args) diff --git a/local-identity/src/local_identity/revoke.py b/local-identity/src/local_identity/revoke.py new file mode 100644 index 0000000..12d3a50 --- /dev/null +++ b/local-identity/src/local_identity/revoke.py @@ -0,0 +1,52 @@ +""" +Token revocation list for local-identity OIDC serve. + +Revoked JTIs (JWT IDs) are stored in ~/.local-identity/revoked.json (mode 600). +The list is append-only: JTIs are added but never removed. + +Usage: + revoke(jti) Add a JTI to the revocation list. + is_revoked(jti) Return True if the JTI has been revoked. +""" + +import json +import os +from pathlib import Path + +from .store import _store_dir + + +def _revoked_path() -> Path: + return _store_dir() / "revoked.json" + + +def _load() -> set[str]: + path = _revoked_path() + if not path.exists(): + return set() + try: + data = json.loads(path.read_text(encoding="utf-8")) + return set(data.get("revoked", [])) + except (json.JSONDecodeError, OSError): + return set() + + +def _save(revoked: set[str]) -> None: + path = _revoked_path() + path.write_text( + json.dumps({"revoked": sorted(revoked)}, indent=2), + encoding="utf-8", + ) + os.chmod(path, 0o600) + + +def is_revoked(jti: str) -> bool: + """Return True if this JTI has been revoked. Defaults to False on read error.""" + return jti in _load() + + +def revoke(jti: str) -> None: + """Add a JTI to the revocation list.""" + revoked = _load() + revoked.add(jti) + _save(revoked) diff --git a/local-identity/src/local_identity/security.py b/local-identity/src/local_identity/security.py new file mode 100644 index 0000000..b1326fa --- /dev/null +++ b/local-identity/src/local_identity/security.py @@ -0,0 +1,148 @@ +""" +Filesystem permission checking for local-identity. + +check_store() Audit all sensitive paths; return a list of CheckResult. +enforce_permissions() Call on CLI startup — exits 1 if any check fails. +print_security_check() Implement the 'security-check' subcommand. +""" + +import os +import stat +import sys +from dataclasses import dataclass +from pathlib import Path + +from .store import _config_file, _store_dir, _users_dir + + +@dataclass +class CheckResult: + path: str + status: str # "pass" | "warn" | "fail" + detail: str + + +def _check_mode(path: Path, expected: int) -> CheckResult: + """Return a CheckResult for a single path against the expected mode.""" + if not path.exists(): + return CheckResult(str(path), "warn", "does not exist (skipped)") + actual = stat.S_IMODE(os.stat(path).st_mode) + if actual != expected: + return CheckResult( + str(path), "fail", + f"mode {oct(actual)}, expected {oct(expected)}", + ) + return CheckResult(str(path), "pass", f"mode {oct(actual)}") + + +def check_store() -> list[CheckResult]: + """ + Audit every sensitive path under the store. + Returns a list of CheckResult objects; an empty list means the store + does not exist yet (not an error — init hasn't run). + """ + store_dir = _store_dir() + if not store_dir.exists(): + return [] + + results: list[CheckResult] = [] + + # Store root + results.append(_check_mode(store_dir, 0o700)) + + # Config file + config = _config_file() + if config.exists(): + results.append(_check_mode(config, 0o600)) + + # Users directory + users_dir = _users_dir() + if users_dir.exists(): + results.append(_check_mode(users_dir, 0o700)) + for p in sorted(users_dir.glob("*.yaml")): + results.append(_check_mode(p, 0o600)) + + # Signing key + signing_key = store_dir / "keys" / "signing_private.pem" + if signing_key.exists(): + results.append(_check_mode(signing_key, 0o600)) + + # TLS private key + tls_key = store_dir / "tls" / "server.key" + if tls_key.exists(): + results.append(_check_mode(tls_key, 0o600)) + + # Audit log + audit_log = store_dir / "audit.log" + if audit_log.exists(): + results.append(_check_mode(audit_log, 0o600)) + + # Revocation list + revoked = store_dir / "revoked.json" + if revoked.exists(): + results.append(_check_mode(revoked, 0o600)) + + return results + + +def enforce_permissions() -> None: + """ + Check store permissions and exit 1 if any check fails. + + Call at CLI startup for every command that requires an initialised store. + Skips silently when the store does not yet exist (let the command handle + the missing-store error itself). + """ + results = check_store() + failures = [r for r in results if r.status == "fail"] + if not failures: + return + + print( + "Error: local-identity store has incorrect permissions:", file=sys.stderr + ) + for r in failures: + print(f" FAIL {r.path} — {r.detail}", file=sys.stderr) + print( + "Run 'local-identity security-check' for a full report.", + file=sys.stderr, + ) + sys.exit(1) + + +def print_security_check() -> int: + """ + Print a human-readable security audit and return an exit code. + Returns 0 if all checks pass or warn; 1 if any check fails. + """ + store_dir = _store_dir() + if not store_dir.exists(): + print("Store not initialised. Run 'local-identity init' first.") + return 1 + + results = check_store() + if not results: + print("Store not initialised. Run 'local-identity init' first.") + return 1 + + fail_count = 0 + warn_count = 0 + + label = {"pass": "PASS", "warn": "WARN", "fail": "FAIL"} + for r in results: + print(f" {label[r.status]} {r.path} — {r.detail}") + if r.status == "fail": + fail_count += 1 + elif r.status == "warn": + warn_count += 1 + + print() + if fail_count: + print(f"Result: {fail_count} failure(s), {warn_count} warning(s) — fix permissions before use") + return 1 + elif warn_count: + print(f"Result: {warn_count} warning(s) — review recommended") + return 0 + else: + print("Result: all checks passed") + return 0 diff --git a/local-identity/src/local_identity/serve.py b/local-identity/src/local_identity/serve.py index 880fb99..1f3024c 100644 --- a/local-identity/src/local_identity/serve.py +++ b/local-identity/src/local_identity/serve.py @@ -34,9 +34,12 @@ import urllib.parse from http import HTTPStatus from pathlib import Path +from . import audit +from . import revoke as revoke_mod from . import store from .jwt_utils import JWTError, create_token, verify_token from .keys import ensure_signing_key, jwk_public, key_id +from .security import enforce_permissions from .tls import ensure_tls_cert _BIND_HOST = "127.0.0.1" @@ -270,6 +273,7 @@ class OIDCHandler(http.server.BaseHTTPRequestHandler): "nonce": nonce or None, "expires_at": now + _CODE_TTL, } + audit.log_event("serve/auth", username, "auth_request") sep = "&" if "?" in redirect_uri else "?" location = ( @@ -335,6 +339,7 @@ class OIDCHandler(http.server.BaseHTTPRequestHandler): nonce=code_data.get("nonce"), ) + audit.log_event("serve/token", user.username, "token_issued") self._send_json({ "access_token": token, "id_token": token, @@ -358,14 +363,26 @@ class OIDCHandler(http.server.BaseHTTPRequestHandler): try: payload = verify_token(token, self._private_key.public_key()) except JWTError as exc: + audit.log_event("serve/userinfo", None, f"invalid_token: {exc}") self._send_json( {"error": "invalid_token", "error_description": str(exc)}, HTTPStatus.UNAUTHORIZED, ) return + jti = payload.get("jti", "") + sub = payload.get("sub") + if jti and revoke_mod.is_revoked(jti): + audit.log_event("serve/userinfo", sub, "revoked_token") + self._send_json( + {"error": "invalid_token", "error_description": "token has been revoked"}, + HTTPStatus.UNAUTHORIZED, + ) + return + + audit.log_event("serve/userinfo", sub, "ok") self._send_json({ - "sub": payload["sub"], + "sub": sub, "email": payload.get("email"), "name": payload.get("name"), "preferred_username": payload.get("preferred_username"), @@ -435,6 +452,7 @@ def run_server(port: int = 8443, token_ttl: int = 3600) -> None: ) sys.exit(1) + enforce_permissions() private_key = ensure_signing_key() cert_path, key_path = ensure_tls_cert() diff --git a/local-identity/tests/test_security.py b/local-identity/tests/test_security.py new file mode 100644 index 0000000..ffe66e9 --- /dev/null +++ b/local-identity/tests/test_security.py @@ -0,0 +1,320 @@ +""" +Tests for security.py (permission checks), audit.py (audit log), +revoke.py (token revocation), and related CLI commands. +""" + +import argparse +import json +import os +import stat +import time + +import pytest + +from local_identity import audit, store +from local_identity.audit import log_event +from local_identity.cli import cmd_revoke_token, cmd_security_check +from local_identity.keys import ensure_signing_key, key_id +from local_identity.jwt_utils import create_token +from local_identity.revoke import is_revoked, revoke +from local_identity.security import ( + CheckResult, + check_store, + enforce_permissions, + print_security_check, +) +from local_identity.user import UserRecord + + +# ------------------------------------------------------------------ # +# Helpers # +# ------------------------------------------------------------------ # + +def _init_store(tmp_store): + """Initialise a minimal store with one user.""" + store.init_dirs() + store.write_user( + UserRecord(username="alice", fullname="Alice Smith", email="alice@example.com") + ) + + +# ------------------------------------------------------------------ # +# check_store — passing case # +# ------------------------------------------------------------------ # + +class TestCheckStorePassing: + def test_all_pass_on_fresh_store(self, tmp_store): + _init_store(tmp_store) + results = check_store() + assert results, "expected at least one result" + failures = [r for r in results if r.status == "fail"] + assert failures == [], f"unexpected failures: {failures}" + + def test_returns_empty_when_store_absent(self, tmp_store): + # Store not yet initialised + results = check_store() + assert results == [] + + def test_checks_store_dir(self, tmp_store): + _init_store(tmp_store) + results = check_store() + paths = [r.path for r in results] + assert str(tmp_store) in paths + + def test_checks_user_files(self, tmp_store): + _init_store(tmp_store) + results = check_store() + paths = [r.path for r in results] + assert any("alice.yaml" in p for p in paths) + + +# ------------------------------------------------------------------ # +# check_store — failure case # +# ------------------------------------------------------------------ # + +class TestCheckStoreFailures: + def test_fails_on_world_readable_store_dir(self, tmp_store): + _init_store(tmp_store) + os.chmod(tmp_store, 0o755) + try: + results = check_store() + failures = [r for r in results if r.status == "fail"] + assert any(str(tmp_store) in r.path for r in failures) + finally: + os.chmod(tmp_store, 0o700) + + def test_fails_on_world_readable_user_file(self, tmp_store): + _init_store(tmp_store) + user_file = tmp_store / "users" / "alice.yaml" + os.chmod(user_file, 0o644) + try: + results = check_store() + failures = [r for r in results if r.status == "fail"] + assert any("alice.yaml" in r.path for r in failures) + finally: + os.chmod(user_file, 0o600) + + def test_checks_signing_key_when_present(self, tmp_store): + _init_store(tmp_store) + ensure_signing_key() # generate key + key_file = tmp_store / "keys" / "signing_private.pem" + assert key_file.exists() + os.chmod(key_file, 0o644) + try: + results = check_store() + failures = [r for r in results if r.status == "fail"] + assert any("signing_private.pem" in r.path for r in failures) + finally: + os.chmod(key_file, 0o600) + + +# ------------------------------------------------------------------ # +# enforce_permissions # +# ------------------------------------------------------------------ # + +class TestEnforcePermissions: + def test_passes_silently_on_correct_store(self, tmp_store): + _init_store(tmp_store) + enforce_permissions() # must not raise or exit + + def test_passes_silently_when_store_absent(self, tmp_store): + enforce_permissions() # store not yet created — silent pass + + def test_exits_on_bad_dir_mode(self, tmp_store): + _init_store(tmp_store) + os.chmod(tmp_store, 0o755) + try: + with pytest.raises(SystemExit) as exc: + enforce_permissions() + assert exc.value.code == 1 + finally: + os.chmod(tmp_store, 0o700) + + def test_exits_on_bad_file_mode(self, tmp_store): + _init_store(tmp_store) + user_file = tmp_store / "users" / "alice.yaml" + os.chmod(user_file, 0o644) + try: + with pytest.raises(SystemExit) as exc: + enforce_permissions() + assert exc.value.code == 1 + finally: + os.chmod(user_file, 0o600) + + +# ------------------------------------------------------------------ # +# print_security_check / cmd_security_check # +# ------------------------------------------------------------------ # + +class TestSecurityCheckCommand: + def test_returns_0_on_clean_store(self, tmp_store): + _init_store(tmp_store) + rc = print_security_check() + assert rc == 0 + + def test_returns_1_on_bad_permissions(self, tmp_store): + _init_store(tmp_store) + os.chmod(tmp_store, 0o755) + try: + rc = print_security_check() + assert rc == 1 + finally: + os.chmod(tmp_store, 0o700) + + def test_returns_1_when_store_absent(self, tmp_store): + rc = print_security_check() + assert rc == 1 + + def test_cmd_security_check_exits_with_code(self, tmp_store): + _init_store(tmp_store) + ns = argparse.Namespace(command="security-check", func=cmd_security_check) + with pytest.raises(SystemExit) as exc: + cmd_security_check(ns) + assert exc.value.code == 0 + + +# ------------------------------------------------------------------ # +# audit.log_event # +# ------------------------------------------------------------------ # + +class TestAuditLog: + def test_creates_log_file(self, tmp_store): + _init_store(tmp_store) + log_event("test", "alice", "ok") + log_path = tmp_store / "audit.log" + assert log_path.exists() + + def test_log_mode_600(self, tmp_store): + _init_store(tmp_store) + log_event("test", "alice", "ok") + log_path = tmp_store / "audit.log" + assert stat.S_IMODE(os.stat(log_path).st_mode) == 0o600 + + def test_log_contains_all_fields(self, tmp_store): + _init_store(tmp_store) + log_event("mycmd", "alice", "ok") + content = (tmp_store / "audit.log").read_text() + assert "mycmd" in content + assert "alice" in content + assert "ok" in content + + def test_log_is_appended(self, tmp_store): + _init_store(tmp_store) + log_event("cmd1", "alice", "ok") + log_event("cmd2", "alice", "ok") + lines = (tmp_store / "audit.log").read_text().splitlines() + assert len(lines) == 2 + + def test_none_username_written_as_dash(self, tmp_store): + _init_store(tmp_store) + log_event("cmd", None, "ok") + content = (tmp_store / "audit.log").read_text() + assert "\t-\t" in content + + def test_silent_when_store_absent(self, tmp_store): + """log_event must not raise even if the store dir doesn't exist.""" + log_event("cmd", "alice", "ok") # should not raise + + +# ------------------------------------------------------------------ # +# revoke.revoke / revoke.is_revoked # +# ------------------------------------------------------------------ # + +class TestRevocationList: + def test_not_revoked_by_default(self, tmp_store): + _init_store(tmp_store) + assert not is_revoked("some-jti-123") + + def test_revoke_then_is_revoked(self, tmp_store): + _init_store(tmp_store) + revoke("jti-abc") + assert is_revoked("jti-abc") + + def test_other_jtis_not_affected(self, tmp_store): + _init_store(tmp_store) + revoke("jti-abc") + assert not is_revoked("jti-xyz") + + def test_revoked_json_mode_600(self, tmp_store): + _init_store(tmp_store) + revoke("jti-abc") + revoked_file = tmp_store / "revoked.json" + assert stat.S_IMODE(os.stat(revoked_file).st_mode) == 0o600 + + def test_revoke_idempotent(self, tmp_store): + _init_store(tmp_store) + revoke("jti-abc") + revoke("jti-abc") # second call must not raise + data = json.loads((tmp_store / "revoked.json").read_text()) + assert data["revoked"].count("jti-abc") == 1 + + def test_multiple_jtis_all_revoked(self, tmp_store): + _init_store(tmp_store) + revoke("jti-1") + revoke("jti-2") + assert is_revoked("jti-1") + assert is_revoked("jti-2") + + def test_not_revoked_when_file_absent(self, tmp_store): + # No store at all — should return False safely + assert not is_revoked("any-jti") + + +# ------------------------------------------------------------------ # +# cmd_revoke_token (CLI) # +# ------------------------------------------------------------------ # + +class TestCmdRevokeToken: + def _ns(self, token): + return argparse.Namespace( + command="revoke-token", + func=cmd_revoke_token, + token=token, + ) + + def test_revoke_by_jti_string(self, tmp_store): + _init_store(tmp_store) + cmd_revoke_token(self._ns("my-jti-string")) + assert is_revoked("my-jti-string") + + def test_revoke_by_full_jwt(self, tmp_store): + _init_store(tmp_store) + private_key = ensure_signing_key() + kid = key_id(private_key) + token = create_token( + private_key=private_key, + kid=kid, + sub="alice", + iss="local-identity", + aud="testclient", + email="alice@example.com", + name="Alice Smith", + preferred_username="alice", + ttl=3600, + ) + cmd_revoke_token(self._ns(token)) + + # Extract JTI from the token and verify it's revoked + import base64 as _b64 + payload_b64 = token.split(".")[1] + pad = (4 - len(payload_b64) % 4) % 4 + payload = json.loads(_b64.urlsafe_b64decode(payload_b64 + "=" * pad)) + assert is_revoked(payload["jti"]) + + def test_revoke_jwt_without_jti_exits(self, tmp_store): + _init_store(tmp_store) + # Craft a JWT without a jti claim + import base64 as _b64 + header = _b64.urlsafe_b64encode(b'{"alg":"RS256"}').rstrip(b"=").decode() + payload = _b64.urlsafe_b64encode(b'{"sub":"alice"}').rstrip(b"=").decode() + fake_jwt = f"{header}.{payload}.fakesig" + with pytest.raises(SystemExit) as exc: + cmd_revoke_token(self._ns(fake_jwt)) + assert exc.value.code == 1 + + def test_revoke_writes_audit_log(self, tmp_store): + _init_store(tmp_store) + cmd_revoke_token(self._ns("test-jti")) + content = (tmp_store / "audit.log").read_text() + assert "revoke-token" in content + assert "test-jti" in content diff --git a/local-identity/tests/test_serve.py b/local-identity/tests/test_serve.py index 63ed6cb..e363d73 100644 --- a/local-identity/tests/test_serve.py +++ b/local-identity/tests/test_serve.py @@ -347,11 +347,13 @@ def test_userinfo_tampered_token_rejected(oidc_server): token_resp = _do_auth_code_flow(base_url) good_token = token_resp["access_token"] - # Tamper with the signature (flip last char) + # Tamper with the first character of the signature. + # (The last character of a 256-byte RSA signature has 4 padding bits; + # flipping only those bits produces identical decoded bytes, so we must + # target a non-padding position.) parts = good_token.split(".") - tampered = parts[0] + "." + parts[1] + "." + parts[2][:-1] + ( - "A" if parts[2][-1] != "A" else "B" - ) + first = "A" if parts[2][0] != "A" else "B" + tampered = parts[0] + "." + parts[1] + "." + first + parts[2][1:] req = urllib.request.Request( f"{base_url}/userinfo", @@ -378,6 +380,64 @@ def test_bind_host_constant(): assert _BIND_HOST == "127.0.0.1" +def test_revoked_token_rejected_at_userinfo(oidc_server): + """A token whose JTI has been revoked must be rejected with 401.""" + from local_identity.revoke import revoke + + base_url, private_key = oidc_server + token_resp = _do_auth_code_flow(base_url) + access_token = token_resp["access_token"] + + # Verify it works before revocation + req = urllib.request.Request( + f"{base_url}/userinfo", + headers={"Authorization": f"Bearer {access_token}"}, + ) + with urllib.request.urlopen(req) as resp: + assert resp.status == 200 + + # Extract JTI and revoke it + import base64 as _b64 + payload_b64 = access_token.split(".")[1] + pad = (4 - len(payload_b64) % 4) % 4 + payload = json.loads(_b64.urlsafe_b64decode(payload_b64 + "=" * pad)) + revoke(payload["jti"]) + + # Now it must be rejected + req2 = urllib.request.Request( + f"{base_url}/userinfo", + headers={"Authorization": f"Bearer {access_token}"}, + ) + try: + urllib.request.urlopen(req2) + pytest.fail("Expected 401 after revocation") + except urllib.error.HTTPError as e: + assert e.code == 401 + body = json.loads(e.read()) + assert "revoked" in body.get("error_description", "") + + +def test_audit_log_written_by_serve(oidc_server, tmp_store): + """The OIDC server writes to the audit log for auth, token, and userinfo.""" + base_url, _ = oidc_server + token_resp = _do_auth_code_flow(base_url) + + # Also hit /userinfo so that log entry is written + req = urllib.request.Request( + f"{base_url}/userinfo", + headers={"Authorization": f"Bearer {token_resp['access_token']}"}, + ) + with urllib.request.urlopen(req): + pass + + log_path = tmp_store / "audit.log" + assert log_path.exists(), "audit.log should be created by serve" + content = log_path.read_text() + assert "serve/auth" in content + assert "serve/token" in content + assert "serve/userinfo" in content + + # ------------------------------------------------------------------ # # JWT unit tests (independent of HTTP server) # # ------------------------------------------------------------------ # diff --git a/workplans/NK-WP-0002-local-identity.md b/workplans/NK-WP-0002-local-identity.md index 61e5ddf..be6115f 100644 --- a/workplans/NK-WP-0002-local-identity.md +++ b/workplans/NK-WP-0002-local-identity.md @@ -191,8 +191,9 @@ pass an OIDC conformance smoke test; server refuses to bind to 0.0.0.0. ```task id: NK-WP-0002-T04 state_hub_task_id: 936de7fa-dfb4-48a2-804f-6b9bd7271a05 -status: todo +status: done priority: medium +commit: (pending) ``` Permission enforcement: @@ -227,9 +228,9 @@ expiry and revocation functional. - [x] `~/.local-identity/` store initialised from Linux identity; test users generated - [x] `local-identity list / show / export` working; Keycloak export validated - [x] Minimal OIDC server passes conformance smoke test; binds localhost only -- [ ] Filesystem permissions enforced on startup; `security-check` passes -- [ ] Audit log recording all auth events -- [ ] `docs/LocalIdentity.md` complete with import procedure and security model +- [x] Filesystem permissions enforced on startup; `security-check` passes +- [x] Audit log recording all auth events +- [x] `docs/LocalIdentity.md` complete with import procedure and security model - [ ] NK-WP-0001 T07 migration procedure documented (Local Identity → Keycloak) ## Open Questions