"""Load and validate the Workload Security Posture descriptors (WP-0015 T2). Two axes — environment posture (`dev/test/prod`) and workload maturity (`M0–M3`) — plus the data-class floor, loaded from ``registry/policy/security-posture.yaml``. This module is **pure**: it parses descriptors and evaluates the secret-flow lattice. It holds no secret material and makes no runtime authorization decision (that is flex-auth's); it is the data + check substrate the conformance checker (T3) runs on. Authoritative prose: ``wiki/WorkloadSecurityPosture.md``. """ from __future__ import annotations import os from dataclasses import dataclass from pathlib import Path from typing import Dict, List, Optional import yaml class PostureError(Exception): """Raised when the posture descriptors are missing or invalid.""" @dataclass class EnvPosture: id: str rank: int backend: str real_values: str unseal: str real_user_data: str audit: str @dataclass class MaturityLevel: id: str rank: int phase: str max_dataclass: str promotion_gate: List[str] @dataclass class PostureCatalog: path: Path env_postures: List[EnvPosture] maturity_levels: List[MaturityLevel] dataclass_floor: Dict[str, str] # dataclass -> maturity id requires_env_posture: str # lattice: posture a secret fetch requires # --- lookups ---------------------------------------------------------- def env(self, env_id: str) -> Optional[EnvPosture]: return next((e for e in self.env_postures if e.id == env_id), None) def maturity(self, level_id: str) -> Optional[MaturityLevel]: return next((m for m in self.maturity_levels if m.id == level_id), None) def maturity_rank(self, level_id: str) -> int: m = self.maturity(level_id) if m is None: raise PostureError(f"unknown maturity level: {level_id!r}") return m.rank # --- the secret-flow lattice (no-write-down) -------------------------- def can_deliver( self, *, workload_env: str, workload_maturity: str, secret_required_maturity: str, secret_dataclass: Optional[str] = None, ) -> tuple[bool, List[str]]: """Evaluate the lattice. Returns (allowed, reasons-it-was-denied). deliver permitted iff workload is in the required env posture AND the workload's maturity is >= the secret's required maturity AND >= the floor for the secret's data classification. Pure — no I/O, no secret value involved. """ reasons: List[str] = [] if workload_env != self.requires_env_posture: reasons.append( f"env posture {workload_env!r} != required {self.requires_env_posture!r}" ) w_rank = self.maturity_rank(workload_maturity) if w_rank < self.maturity_rank(secret_required_maturity): reasons.append( f"workload maturity {workload_maturity} < required {secret_required_maturity}" ) if secret_dataclass is not None: floor = self.dataclass_floor.get(secret_dataclass) if floor is None: reasons.append(f"unknown data classification {secret_dataclass!r}") elif w_rank < self.maturity_rank(floor): reasons.append( f"workload maturity {workload_maturity} < floor {floor} " f"for dataclass {secret_dataclass}" ) return (not reasons, reasons) def find_posture_path(start: Optional[Path] = None) -> Path: """Locate registry/policy/security-posture.yaml (honors WARDEN_POSTURE_CATALOG).""" override = os.environ.get("WARDEN_POSTURE_CATALOG") if override: return Path(os.path.expanduser(override)) rel = Path("registry") / "policy" / "security-posture.yaml" here = (start or Path(__file__)).resolve() for parent in [here, *here.parents]: candidate = parent / rel if candidate.exists(): return candidate # Fallback: registry bundled into the installed wheel (warden/_registry/...). bundled = Path(__file__).resolve().parent / "_registry" / "policy" / "security-posture.yaml" if bundled.exists(): return bundled raise PostureError(f"Posture descriptors not found ({rel}).") def _require_unique_contiguous_ranks(items, kind: str) -> None: ranks = sorted(i.rank for i in items) if ranks != list(range(len(ranks))): raise PostureError( f"{kind} ranks must be unique and contiguous from 0, got {ranks}" ) def load_posture(path: Optional[Path] = None) -> PostureCatalog: """Load, parse, and validate the posture descriptors.""" posture_path = path or find_posture_path() if not posture_path.exists(): raise PostureError(f"Posture descriptors not found: {posture_path}") try: raw = yaml.safe_load(posture_path.read_text()) except yaml.YAMLError as e: raise PostureError(f"Invalid YAML in {posture_path}: {e}") from e if not isinstance(raw, dict): raise PostureError("Posture descriptors must be a YAML mapping") try: env_postures = [ EnvPosture( id=str(e["id"]), rank=int(e["rank"]), backend=str(e["backend"]), real_values=str(e["real_values"]), unseal=str(e["unseal"]), real_user_data=str(e["real_user_data"]), audit=str(e["audit"]), ) for e in raw.get("env_postures") or [] ] maturity_levels = [ MaturityLevel( id=str(m["id"]), rank=int(m["rank"]), phase=str(m["phase"]), max_dataclass=str(m["max_dataclass"]), promotion_gate=[str(g) for g in (m.get("promotion_gate") or [])], ) for m in raw.get("maturity_levels") or [] ] except (KeyError, TypeError, ValueError) as e: raise PostureError(f"malformed descriptor entry: {e}") from e if not env_postures or not maturity_levels: raise PostureError("posture descriptors need env_postures and maturity_levels") _require_unique_contiguous_ranks(env_postures, "env_posture") _require_unique_contiguous_ranks(maturity_levels, "maturity_level") maturity_ids = {m.id for m in maturity_levels} dataclass_floor = {str(k): str(v) for k, v in (raw.get("dataclass_floor") or {}).items()} if not dataclass_floor: raise PostureError("posture descriptors need a dataclass_floor mapping") for dc, lvl in dataclass_floor.items(): if lvl not in maturity_ids: raise PostureError( f"dataclass_floor[{dc!r}] = {lvl!r} is not a known maturity level" ) # Every maturity level's max_dataclass must be a known data classification. for m in maturity_levels: if m.max_dataclass not in dataclass_floor: raise PostureError( f"maturity {m.id} max_dataclass {m.max_dataclass!r} not in dataclass_floor" ) lattice = raw.get("lattice") or {} requires_env = str(lattice.get("requires_env_posture", "prod")) if not any(e.id == requires_env for e in env_postures): raise PostureError(f"lattice requires_env_posture {requires_env!r} is not an env posture") return PostureCatalog( path=posture_path, env_postures=env_postures, maturity_levels=maturity_levels, dataclass_floor=dataclass_floor, requires_env_posture=requires_env, )