"""Compliance scorecard — cert-side checks (AccessManagementDirective §5).""" from __future__ import annotations from dataclasses import dataclass from datetime import datetime, timedelta, timezone from pathlib import Path from typing import List from warden.ca import CAError, parse_cert_metadata from warden.inventory import PrincipalsInventory from warden.models import ACTOR_PREFIX, MAX_TTL_HOURS @dataclass class CheckResult: name: str passed: bool detail: str = "" def check_actor_name_prefixes(inventory: PrincipalsInventory) -> CheckResult: """All actor names must carry the prefix matching their type.""" violations = [] for name, entry in inventory.actors.items(): expected = ACTOR_PREFIX[entry.actor_type] if not name.startswith(expected): violations.append(f"{name!r} should start with {expected!r}") return CheckResult( name="actor_name_prefixes", passed=len(violations) == 0, detail=( "; ".join(violations) if violations else "all actor names match prefix convention" ), ) def check_all_actors_have_principals(inventory: PrincipalsInventory) -> CheckResult: """Every actor in inventory must have at least one principal.""" missing = [name for name, e in inventory.actors.items() if not e.principals] return CheckResult( name="actors_have_principals", passed=len(missing) == 0, detail=f"missing principals: {missing}" if missing else "all actors have principals", ) def check_no_expired_certs(state_dir: Path) -> CheckResult: """No cert in state_dir should be currently expired.""" if not state_dir.exists(): return CheckResult("no_expired_certs", passed=True, detail="no state dir") now = datetime.now(timezone.utc) expired = [] for cert_path in state_dir.glob("*-cert.pub"): try: meta = parse_cert_metadata(cert_path) except CAError: continue if meta["valid_before"] < now: expired.append(cert_path.stem.replace("-cert", "")) return CheckResult( name="no_expired_certs", passed=len(expired) == 0, detail=f"expired: {expired}" if expired else "no expired certs", ) def check_no_stale_certs(state_dir: Path) -> CheckResult: """Certs expired by more than 5 minutes should have been cleaned up.""" if not state_dir.exists(): return CheckResult("no_stale_certs", passed=True, detail="no state dir") cutoff = datetime.now(timezone.utc) - timedelta(minutes=5) stale = [] for cert_path in state_dir.glob("*-cert.pub"): try: meta = parse_cert_metadata(cert_path) except CAError: continue if meta["valid_before"] < cutoff: stale.append(cert_path.name) return CheckResult( name="no_stale_certs", passed=len(stale) == 0, detail=( f"stale certs present: {stale} — run 'warden cleanup'" if stale else "no stale certs" ), ) def check_ttl_policy(state_dir: Path, inventory: PrincipalsInventory) -> CheckResult: """Certs in state_dir must not exceed the type-max TTL (directive §2).""" if not state_dir.exists(): return CheckResult("ttl_policy", passed=True, detail="no state dir") violations = [] for cert_path in state_dir.glob("*-cert.pub"): actor_name = cert_path.stem.replace("-cert", "") entry = inventory.actors.get(actor_name) if entry is None: continue try: meta = parse_cert_metadata(cert_path) except CAError: continue valid_from = meta.get("valid_from") if valid_from is None: continue ttl_hours = (meta["valid_before"] - valid_from).total_seconds() / 3600 max_hours = MAX_TTL_HOURS[entry.actor_type] if ttl_hours > max_hours + 0.01: violations.append( f"{actor_name!r}: {ttl_hours:.1f}h issued > {max_hours}h max" ) return CheckResult( name="ttl_policy", passed=len(violations) == 0, detail=( "; ".join(violations) if violations else "all certs within TTL policy" ), ) def check_file_permissions(state_dir: Path) -> CheckResult: """Cert files and keys must not be group- or world-readable (mode 600/644).""" if not state_dir.exists(): return CheckResult("file_permissions", passed=True, detail="no state dir") bad = [] for cert_path in state_dir.glob("*-cert.pub"): if cert_path.stat().st_mode & 0o044: bad.append(cert_path.name) keys_dir = state_dir / "keys" if keys_dir.exists(): for key_path in keys_dir.iterdir(): if key_path.stat().st_mode & 0o044: bad.append(f"keys/{key_path.name}") return CheckResult( name="file_permissions", passed=len(bad) == 0, detail=( f"world/group readable files: {bad} — run 'chmod 600'" if bad else "all cert/key files have restricted permissions" ), ) def run_scorecard(state_dir: Path, inventory: PrincipalsInventory) -> List[CheckResult]: """Run all cert-side scorecard checks. Returns list of CheckResult.""" return [ check_actor_name_prefixes(inventory), check_all_actors_have_principals(inventory), check_no_expired_certs(state_dir), check_no_stale_certs(state_dir), check_ttl_policy(state_dir, inventory), check_file_permissions(state_dir), ]