Files
net-kingdom/local-identity/tests/test_security.py
tegwick e7bafd69fc feat(local-identity): Stage 4 — security hardening (NK-WP-0002-T04)
Permission enforcement on startup: enforce_permissions() checks store dir
(700), user files (600), signing key, TLS key, audit.log, revoked.json.
CLI and run_server() call it before any sensitive operation.

New modules:
  security.py  check_store(), enforce_permissions(), print_security_check()
  audit.py     log_event() — append-only TSV audit log (mode 600)
  revoke.py    revoke(jti), is_revoked(jti) — revocation list (mode 600)

New CLI commands:
  security-check          Print per-check pass/warn/fail report; exit 1 on failure
  revoke-token <jti|jwt>  Add JTI to revocation list; accepts raw JTI or full JWT

Serve integration:
  Audit log written for auth request, token issuance, and userinfo calls
  Revocation checked at /userinfo; revoked tokens return 401

Docs: security model section in LocalIdentity.md — threat model,
assumptions, non-guarantees, SELinux/AppArmor guidance, revocation usage.

138 tests passing (34 new for Stage 4).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-02 08:06:56 +01:00

321 lines
11 KiB
Python

"""
Tests for security.py (permission checks), audit.py (audit log),
revoke.py (token revocation), and related CLI commands.
"""
import argparse
import json
import os
import stat
import time
import pytest
from local_identity import audit, store
from local_identity.audit import log_event
from local_identity.cli import cmd_revoke_token, cmd_security_check
from local_identity.keys import ensure_signing_key, key_id
from local_identity.jwt_utils import create_token
from local_identity.revoke import is_revoked, revoke
from local_identity.security import (
CheckResult,
check_store,
enforce_permissions,
print_security_check,
)
from local_identity.user import UserRecord
# ------------------------------------------------------------------ #
# Helpers #
# ------------------------------------------------------------------ #
def _init_store(tmp_store):
"""Initialise a minimal store with one user."""
store.init_dirs()
store.write_user(
UserRecord(username="alice", fullname="Alice Smith", email="alice@example.com")
)
# ------------------------------------------------------------------ #
# check_store — passing case #
# ------------------------------------------------------------------ #
class TestCheckStorePassing:
def test_all_pass_on_fresh_store(self, tmp_store):
_init_store(tmp_store)
results = check_store()
assert results, "expected at least one result"
failures = [r for r in results if r.status == "fail"]
assert failures == [], f"unexpected failures: {failures}"
def test_returns_empty_when_store_absent(self, tmp_store):
# Store not yet initialised
results = check_store()
assert results == []
def test_checks_store_dir(self, tmp_store):
_init_store(tmp_store)
results = check_store()
paths = [r.path for r in results]
assert str(tmp_store) in paths
def test_checks_user_files(self, tmp_store):
_init_store(tmp_store)
results = check_store()
paths = [r.path for r in results]
assert any("alice.yaml" in p for p in paths)
# ------------------------------------------------------------------ #
# check_store — failure case #
# ------------------------------------------------------------------ #
class TestCheckStoreFailures:
def test_fails_on_world_readable_store_dir(self, tmp_store):
_init_store(tmp_store)
os.chmod(tmp_store, 0o755)
try:
results = check_store()
failures = [r for r in results if r.status == "fail"]
assert any(str(tmp_store) in r.path for r in failures)
finally:
os.chmod(tmp_store, 0o700)
def test_fails_on_world_readable_user_file(self, tmp_store):
_init_store(tmp_store)
user_file = tmp_store / "users" / "alice.yaml"
os.chmod(user_file, 0o644)
try:
results = check_store()
failures = [r for r in results if r.status == "fail"]
assert any("alice.yaml" in r.path for r in failures)
finally:
os.chmod(user_file, 0o600)
def test_checks_signing_key_when_present(self, tmp_store):
_init_store(tmp_store)
ensure_signing_key() # generate key
key_file = tmp_store / "keys" / "signing_private.pem"
assert key_file.exists()
os.chmod(key_file, 0o644)
try:
results = check_store()
failures = [r for r in results if r.status == "fail"]
assert any("signing_private.pem" in r.path for r in failures)
finally:
os.chmod(key_file, 0o600)
# ------------------------------------------------------------------ #
# enforce_permissions #
# ------------------------------------------------------------------ #
class TestEnforcePermissions:
def test_passes_silently_on_correct_store(self, tmp_store):
_init_store(tmp_store)
enforce_permissions() # must not raise or exit
def test_passes_silently_when_store_absent(self, tmp_store):
enforce_permissions() # store not yet created — silent pass
def test_exits_on_bad_dir_mode(self, tmp_store):
_init_store(tmp_store)
os.chmod(tmp_store, 0o755)
try:
with pytest.raises(SystemExit) as exc:
enforce_permissions()
assert exc.value.code == 1
finally:
os.chmod(tmp_store, 0o700)
def test_exits_on_bad_file_mode(self, tmp_store):
_init_store(tmp_store)
user_file = tmp_store / "users" / "alice.yaml"
os.chmod(user_file, 0o644)
try:
with pytest.raises(SystemExit) as exc:
enforce_permissions()
assert exc.value.code == 1
finally:
os.chmod(user_file, 0o600)
# ------------------------------------------------------------------ #
# print_security_check / cmd_security_check #
# ------------------------------------------------------------------ #
class TestSecurityCheckCommand:
def test_returns_0_on_clean_store(self, tmp_store):
_init_store(tmp_store)
rc = print_security_check()
assert rc == 0
def test_returns_1_on_bad_permissions(self, tmp_store):
_init_store(tmp_store)
os.chmod(tmp_store, 0o755)
try:
rc = print_security_check()
assert rc == 1
finally:
os.chmod(tmp_store, 0o700)
def test_returns_1_when_store_absent(self, tmp_store):
rc = print_security_check()
assert rc == 1
def test_cmd_security_check_exits_with_code(self, tmp_store):
_init_store(tmp_store)
ns = argparse.Namespace(command="security-check", func=cmd_security_check)
with pytest.raises(SystemExit) as exc:
cmd_security_check(ns)
assert exc.value.code == 0
# ------------------------------------------------------------------ #
# audit.log_event #
# ------------------------------------------------------------------ #
class TestAuditLog:
def test_creates_log_file(self, tmp_store):
_init_store(tmp_store)
log_event("test", "alice", "ok")
log_path = tmp_store / "audit.log"
assert log_path.exists()
def test_log_mode_600(self, tmp_store):
_init_store(tmp_store)
log_event("test", "alice", "ok")
log_path = tmp_store / "audit.log"
assert stat.S_IMODE(os.stat(log_path).st_mode) == 0o600
def test_log_contains_all_fields(self, tmp_store):
_init_store(tmp_store)
log_event("mycmd", "alice", "ok")
content = (tmp_store / "audit.log").read_text()
assert "mycmd" in content
assert "alice" in content
assert "ok" in content
def test_log_is_appended(self, tmp_store):
_init_store(tmp_store)
log_event("cmd1", "alice", "ok")
log_event("cmd2", "alice", "ok")
lines = (tmp_store / "audit.log").read_text().splitlines()
assert len(lines) == 2
def test_none_username_written_as_dash(self, tmp_store):
_init_store(tmp_store)
log_event("cmd", None, "ok")
content = (tmp_store / "audit.log").read_text()
assert "\t-\t" in content
def test_silent_when_store_absent(self, tmp_store):
"""log_event must not raise even if the store dir doesn't exist."""
log_event("cmd", "alice", "ok") # should not raise
# ------------------------------------------------------------------ #
# revoke.revoke / revoke.is_revoked #
# ------------------------------------------------------------------ #
class TestRevocationList:
def test_not_revoked_by_default(self, tmp_store):
_init_store(tmp_store)
assert not is_revoked("some-jti-123")
def test_revoke_then_is_revoked(self, tmp_store):
_init_store(tmp_store)
revoke("jti-abc")
assert is_revoked("jti-abc")
def test_other_jtis_not_affected(self, tmp_store):
_init_store(tmp_store)
revoke("jti-abc")
assert not is_revoked("jti-xyz")
def test_revoked_json_mode_600(self, tmp_store):
_init_store(tmp_store)
revoke("jti-abc")
revoked_file = tmp_store / "revoked.json"
assert stat.S_IMODE(os.stat(revoked_file).st_mode) == 0o600
def test_revoke_idempotent(self, tmp_store):
_init_store(tmp_store)
revoke("jti-abc")
revoke("jti-abc") # second call must not raise
data = json.loads((tmp_store / "revoked.json").read_text())
assert data["revoked"].count("jti-abc") == 1
def test_multiple_jtis_all_revoked(self, tmp_store):
_init_store(tmp_store)
revoke("jti-1")
revoke("jti-2")
assert is_revoked("jti-1")
assert is_revoked("jti-2")
def test_not_revoked_when_file_absent(self, tmp_store):
# No store at all — should return False safely
assert not is_revoked("any-jti")
# ------------------------------------------------------------------ #
# cmd_revoke_token (CLI) #
# ------------------------------------------------------------------ #
class TestCmdRevokeToken:
def _ns(self, token):
return argparse.Namespace(
command="revoke-token",
func=cmd_revoke_token,
token=token,
)
def test_revoke_by_jti_string(self, tmp_store):
_init_store(tmp_store)
cmd_revoke_token(self._ns("my-jti-string"))
assert is_revoked("my-jti-string")
def test_revoke_by_full_jwt(self, tmp_store):
_init_store(tmp_store)
private_key = ensure_signing_key()
kid = key_id(private_key)
token = create_token(
private_key=private_key,
kid=kid,
sub="alice",
iss="local-identity",
aud="testclient",
email="alice@example.com",
name="Alice Smith",
preferred_username="alice",
ttl=3600,
)
cmd_revoke_token(self._ns(token))
# Extract JTI from the token and verify it's revoked
import base64 as _b64
payload_b64 = token.split(".")[1]
pad = (4 - len(payload_b64) % 4) % 4
payload = json.loads(_b64.urlsafe_b64decode(payload_b64 + "=" * pad))
assert is_revoked(payload["jti"])
def test_revoke_jwt_without_jti_exits(self, tmp_store):
_init_store(tmp_store)
# Craft a JWT without a jti claim
import base64 as _b64
header = _b64.urlsafe_b64encode(b'{"alg":"RS256"}').rstrip(b"=").decode()
payload = _b64.urlsafe_b64encode(b'{"sub":"alice"}').rstrip(b"=").decode()
fake_jwt = f"{header}.{payload}.fakesig"
with pytest.raises(SystemExit) as exc:
cmd_revoke_token(self._ns(fake_jwt))
assert exc.value.code == 1
def test_revoke_writes_audit_log(self, tmp_store):
_init_store(tmp_store)
cmd_revoke_token(self._ns("test-jti"))
content = (tmp_store / "audit.log").read_text()
assert "revoke-token" in content
assert "test-jti" in content