feat(local-identity): Stage 4 — security hardening (NK-WP-0002-T04)

Permission enforcement on startup: enforce_permissions() checks store dir
(700), user files (600), signing key, TLS key, audit.log, revoked.json.
CLI and run_server() call it before any sensitive operation.

New modules:
  security.py  check_store(), enforce_permissions(), print_security_check()
  audit.py     log_event() — append-only TSV audit log (mode 600)
  revoke.py    revoke(jti), is_revoked(jti) — revocation list (mode 600)

New CLI commands:
  security-check          Print per-check pass/warn/fail report; exit 1 on failure
  revoke-token <jti|jwt>  Add JTI to revocation list; accepts raw JTI or full JWT

Serve integration:
  Audit log written for auth request, token issuance, and userinfo calls
  Revocation checked at /userinfo; revoked tokens return 401

Docs: security model section in LocalIdentity.md — threat model,
assumptions, non-guarantees, SELinux/AppArmor guidance, revocation usage.

138 tests passing (34 new for Stage 4).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-02 08:06:56 +01:00
parent ae348d0e54
commit e7bafd69fc
9 changed files with 795 additions and 16 deletions

View File

@@ -0,0 +1,320 @@
"""
Tests for security.py (permission checks), audit.py (audit log),
revoke.py (token revocation), and related CLI commands.
"""
import argparse
import json
import os
import stat
import time
import pytest
from local_identity import audit, store
from local_identity.audit import log_event
from local_identity.cli import cmd_revoke_token, cmd_security_check
from local_identity.keys import ensure_signing_key, key_id
from local_identity.jwt_utils import create_token
from local_identity.revoke import is_revoked, revoke
from local_identity.security import (
CheckResult,
check_store,
enforce_permissions,
print_security_check,
)
from local_identity.user import UserRecord
# ------------------------------------------------------------------ #
# Helpers #
# ------------------------------------------------------------------ #
def _init_store(tmp_store):
"""Initialise a minimal store with one user."""
store.init_dirs()
store.write_user(
UserRecord(username="alice", fullname="Alice Smith", email="alice@example.com")
)
# ------------------------------------------------------------------ #
# check_store — passing case #
# ------------------------------------------------------------------ #
class TestCheckStorePassing:
def test_all_pass_on_fresh_store(self, tmp_store):
_init_store(tmp_store)
results = check_store()
assert results, "expected at least one result"
failures = [r for r in results if r.status == "fail"]
assert failures == [], f"unexpected failures: {failures}"
def test_returns_empty_when_store_absent(self, tmp_store):
# Store not yet initialised
results = check_store()
assert results == []
def test_checks_store_dir(self, tmp_store):
_init_store(tmp_store)
results = check_store()
paths = [r.path for r in results]
assert str(tmp_store) in paths
def test_checks_user_files(self, tmp_store):
_init_store(tmp_store)
results = check_store()
paths = [r.path for r in results]
assert any("alice.yaml" in p for p in paths)
# ------------------------------------------------------------------ #
# check_store — failure case #
# ------------------------------------------------------------------ #
class TestCheckStoreFailures:
def test_fails_on_world_readable_store_dir(self, tmp_store):
_init_store(tmp_store)
os.chmod(tmp_store, 0o755)
try:
results = check_store()
failures = [r for r in results if r.status == "fail"]
assert any(str(tmp_store) in r.path for r in failures)
finally:
os.chmod(tmp_store, 0o700)
def test_fails_on_world_readable_user_file(self, tmp_store):
_init_store(tmp_store)
user_file = tmp_store / "users" / "alice.yaml"
os.chmod(user_file, 0o644)
try:
results = check_store()
failures = [r for r in results if r.status == "fail"]
assert any("alice.yaml" in r.path for r in failures)
finally:
os.chmod(user_file, 0o600)
def test_checks_signing_key_when_present(self, tmp_store):
_init_store(tmp_store)
ensure_signing_key() # generate key
key_file = tmp_store / "keys" / "signing_private.pem"
assert key_file.exists()
os.chmod(key_file, 0o644)
try:
results = check_store()
failures = [r for r in results if r.status == "fail"]
assert any("signing_private.pem" in r.path for r in failures)
finally:
os.chmod(key_file, 0o600)
# ------------------------------------------------------------------ #
# enforce_permissions #
# ------------------------------------------------------------------ #
class TestEnforcePermissions:
def test_passes_silently_on_correct_store(self, tmp_store):
_init_store(tmp_store)
enforce_permissions() # must not raise or exit
def test_passes_silently_when_store_absent(self, tmp_store):
enforce_permissions() # store not yet created — silent pass
def test_exits_on_bad_dir_mode(self, tmp_store):
_init_store(tmp_store)
os.chmod(tmp_store, 0o755)
try:
with pytest.raises(SystemExit) as exc:
enforce_permissions()
assert exc.value.code == 1
finally:
os.chmod(tmp_store, 0o700)
def test_exits_on_bad_file_mode(self, tmp_store):
_init_store(tmp_store)
user_file = tmp_store / "users" / "alice.yaml"
os.chmod(user_file, 0o644)
try:
with pytest.raises(SystemExit) as exc:
enforce_permissions()
assert exc.value.code == 1
finally:
os.chmod(user_file, 0o600)
# ------------------------------------------------------------------ #
# print_security_check / cmd_security_check #
# ------------------------------------------------------------------ #
class TestSecurityCheckCommand:
def test_returns_0_on_clean_store(self, tmp_store):
_init_store(tmp_store)
rc = print_security_check()
assert rc == 0
def test_returns_1_on_bad_permissions(self, tmp_store):
_init_store(tmp_store)
os.chmod(tmp_store, 0o755)
try:
rc = print_security_check()
assert rc == 1
finally:
os.chmod(tmp_store, 0o700)
def test_returns_1_when_store_absent(self, tmp_store):
rc = print_security_check()
assert rc == 1
def test_cmd_security_check_exits_with_code(self, tmp_store):
_init_store(tmp_store)
ns = argparse.Namespace(command="security-check", func=cmd_security_check)
with pytest.raises(SystemExit) as exc:
cmd_security_check(ns)
assert exc.value.code == 0
# ------------------------------------------------------------------ #
# audit.log_event #
# ------------------------------------------------------------------ #
class TestAuditLog:
def test_creates_log_file(self, tmp_store):
_init_store(tmp_store)
log_event("test", "alice", "ok")
log_path = tmp_store / "audit.log"
assert log_path.exists()
def test_log_mode_600(self, tmp_store):
_init_store(tmp_store)
log_event("test", "alice", "ok")
log_path = tmp_store / "audit.log"
assert stat.S_IMODE(os.stat(log_path).st_mode) == 0o600
def test_log_contains_all_fields(self, tmp_store):
_init_store(tmp_store)
log_event("mycmd", "alice", "ok")
content = (tmp_store / "audit.log").read_text()
assert "mycmd" in content
assert "alice" in content
assert "ok" in content
def test_log_is_appended(self, tmp_store):
_init_store(tmp_store)
log_event("cmd1", "alice", "ok")
log_event("cmd2", "alice", "ok")
lines = (tmp_store / "audit.log").read_text().splitlines()
assert len(lines) == 2
def test_none_username_written_as_dash(self, tmp_store):
_init_store(tmp_store)
log_event("cmd", None, "ok")
content = (tmp_store / "audit.log").read_text()
assert "\t-\t" in content
def test_silent_when_store_absent(self, tmp_store):
"""log_event must not raise even if the store dir doesn't exist."""
log_event("cmd", "alice", "ok") # should not raise
# ------------------------------------------------------------------ #
# revoke.revoke / revoke.is_revoked #
# ------------------------------------------------------------------ #
class TestRevocationList:
def test_not_revoked_by_default(self, tmp_store):
_init_store(tmp_store)
assert not is_revoked("some-jti-123")
def test_revoke_then_is_revoked(self, tmp_store):
_init_store(tmp_store)
revoke("jti-abc")
assert is_revoked("jti-abc")
def test_other_jtis_not_affected(self, tmp_store):
_init_store(tmp_store)
revoke("jti-abc")
assert not is_revoked("jti-xyz")
def test_revoked_json_mode_600(self, tmp_store):
_init_store(tmp_store)
revoke("jti-abc")
revoked_file = tmp_store / "revoked.json"
assert stat.S_IMODE(os.stat(revoked_file).st_mode) == 0o600
def test_revoke_idempotent(self, tmp_store):
_init_store(tmp_store)
revoke("jti-abc")
revoke("jti-abc") # second call must not raise
data = json.loads((tmp_store / "revoked.json").read_text())
assert data["revoked"].count("jti-abc") == 1
def test_multiple_jtis_all_revoked(self, tmp_store):
_init_store(tmp_store)
revoke("jti-1")
revoke("jti-2")
assert is_revoked("jti-1")
assert is_revoked("jti-2")
def test_not_revoked_when_file_absent(self, tmp_store):
# No store at all — should return False safely
assert not is_revoked("any-jti")
# ------------------------------------------------------------------ #
# cmd_revoke_token (CLI) #
# ------------------------------------------------------------------ #
class TestCmdRevokeToken:
def _ns(self, token):
return argparse.Namespace(
command="revoke-token",
func=cmd_revoke_token,
token=token,
)
def test_revoke_by_jti_string(self, tmp_store):
_init_store(tmp_store)
cmd_revoke_token(self._ns("my-jti-string"))
assert is_revoked("my-jti-string")
def test_revoke_by_full_jwt(self, tmp_store):
_init_store(tmp_store)
private_key = ensure_signing_key()
kid = key_id(private_key)
token = create_token(
private_key=private_key,
kid=kid,
sub="alice",
iss="local-identity",
aud="testclient",
email="alice@example.com",
name="Alice Smith",
preferred_username="alice",
ttl=3600,
)
cmd_revoke_token(self._ns(token))
# Extract JTI from the token and verify it's revoked
import base64 as _b64
payload_b64 = token.split(".")[1]
pad = (4 - len(payload_b64) % 4) % 4
payload = json.loads(_b64.urlsafe_b64decode(payload_b64 + "=" * pad))
assert is_revoked(payload["jti"])
def test_revoke_jwt_without_jti_exits(self, tmp_store):
_init_store(tmp_store)
# Craft a JWT without a jti claim
import base64 as _b64
header = _b64.urlsafe_b64encode(b'{"alg":"RS256"}').rstrip(b"=").decode()
payload = _b64.urlsafe_b64encode(b'{"sub":"alice"}').rstrip(b"=").decode()
fake_jwt = f"{header}.{payload}.fakesig"
with pytest.raises(SystemExit) as exc:
cmd_revoke_token(self._ns(fake_jwt))
assert exc.value.code == 1
def test_revoke_writes_audit_log(self, tmp_store):
_init_store(tmp_store)
cmd_revoke_token(self._ns("test-jti"))
content = (tmp_store / "audit.log").read_text()
assert "revoke-token" in content
assert "test-jti" in content