Files
net-kingdom/local-identity/src/local_identity/security.py
tegwick 52d44daec2 refactor(local-identity): post-Stage4 cleanups and micro-fixes
- audit: chmod only on file creation, not every append (TOCTOU fix)
- jwt_utils: add extract_unverified_payload() helper
- cli: use extract_unverified_payload + JWTError instead of inline decode
- keys: extract _public_key_bytes() helper, import _b64url from jwt_utils
- security: FileNotFoundError try/except instead of path.exists() (TOCTOU fix)
- serve: cache JWK response at server init instead of per-request recompute

138 tests passing.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-02 08:25:21 +01:00

150 lines
4.2 KiB
Python

"""
Filesystem permission checking for local-identity.
check_store() Audit all sensitive paths; return a list of CheckResult.
enforce_permissions() Call on CLI startup — exits 1 if any check fails.
print_security_check() Implement the 'security-check' subcommand.
"""
import os
import stat
import sys
from dataclasses import dataclass
from pathlib import Path
from .store import _config_file, _store_dir, _users_dir
@dataclass
class CheckResult:
path: str
status: str # "pass" | "warn" | "fail"
detail: str
def _check_mode(path: Path, expected: int) -> CheckResult:
"""Return a CheckResult for a single path against the expected mode."""
try:
actual = stat.S_IMODE(os.stat(path).st_mode)
except FileNotFoundError:
return CheckResult(str(path), "warn", "does not exist (skipped)")
if actual != expected:
return CheckResult(
str(path), "fail",
f"mode {oct(actual)}, expected {oct(expected)}",
)
return CheckResult(str(path), "pass", f"mode {oct(actual)}")
def check_store() -> list[CheckResult]:
"""
Audit every sensitive path under the store.
Returns a list of CheckResult objects; an empty list means the store
does not exist yet (not an error — init hasn't run).
"""
store_dir = _store_dir()
if not store_dir.exists():
return []
results: list[CheckResult] = []
# Store root
results.append(_check_mode(store_dir, 0o700))
# Config file
config = _config_file()
if config.exists():
results.append(_check_mode(config, 0o600))
# Users directory
users_dir = _users_dir()
if users_dir.exists():
results.append(_check_mode(users_dir, 0o700))
for p in sorted(users_dir.glob("*.yaml")):
results.append(_check_mode(p, 0o600))
# Signing key
signing_key = store_dir / "keys" / "signing_private.pem"
if signing_key.exists():
results.append(_check_mode(signing_key, 0o600))
# TLS private key
tls_key = store_dir / "tls" / "server.key"
if tls_key.exists():
results.append(_check_mode(tls_key, 0o600))
# Audit log
audit_log = store_dir / "audit.log"
if audit_log.exists():
results.append(_check_mode(audit_log, 0o600))
# Revocation list
revoked = store_dir / "revoked.json"
if revoked.exists():
results.append(_check_mode(revoked, 0o600))
return results
def enforce_permissions() -> None:
"""
Check store permissions and exit 1 if any check fails.
Call at CLI startup for every command that requires an initialised store.
Skips silently when the store does not yet exist (let the command handle
the missing-store error itself).
"""
results = check_store()
failures = [r for r in results if r.status == "fail"]
if not failures:
return
print(
"Error: local-identity store has incorrect permissions:", file=sys.stderr
)
for r in failures:
print(f" FAIL {r.path}{r.detail}", file=sys.stderr)
print(
"Run 'local-identity security-check' for a full report.",
file=sys.stderr,
)
sys.exit(1)
def print_security_check() -> int:
"""
Print a human-readable security audit and return an exit code.
Returns 0 if all checks pass or warn; 1 if any check fails.
"""
store_dir = _store_dir()
if not store_dir.exists():
print("Store not initialised. Run 'local-identity init' first.")
return 1
results = check_store()
if not results:
print("Store not initialised. Run 'local-identity init' first.")
return 1
fail_count = 0
warn_count = 0
label = {"pass": "PASS", "warn": "WARN", "fail": "FAIL"}
for r in results:
print(f" {label[r.status]} {r.path}{r.detail}")
if r.status == "fail":
fail_count += 1
elif r.status == "warn":
warn_count += 1
print()
if fail_count:
print(f"Result: {fail_count} failure(s), {warn_count} warning(s) — fix permissions before use")
return 1
elif warn_count:
print(f"Result: {warn_count} warning(s) — review recommended")
return 0
else:
print("Result: all checks passed")
return 0