#!/usr/bin/env python3 """Read-only conformance checker for the Workload Security Posture (WP-0015 T3). Given a *metadata-only* target manifest (see ``examples/posture-conformance.example.yaml``), assert two things against ``registry/policy/security-posture.yaml``: 1. **Environment posture conformance** — each environment's observed secret-store posture (backend / unseal / real_values) matches the standard descriptor for that tier. Catches "prod" stores that are not sealed-Shamir, or a "dev" store that admits real values. 2. **Secret-flow lattice** — every requested secret flow is permitted by the no-write-down lattice for its target workload (``warden.posture.can_deliver``): prod posture, and workload maturity >= the secret's ``required_maturity`` and the data-class floor. Exit 0 when fully conformant; exit 1 on any violation; exit 2 on bad input. This script reads descriptors and target metadata only — it never reads, fetches, or prints a secret value. Drift-report shaped, mirroring ``scripts/check_principals_drift.py``. Usage: python scripts/check_secret_posture_conformance.py \\ --manifest examples/posture-conformance.example.yaml """ from __future__ import annotations import argparse import sys from pathlib import Path from typing import Any, Dict, List, Optional # Allow running as a plain script (no install) by adding src/ to the path. _SRC = Path(__file__).resolve().parent.parent / "src" if _SRC.is_dir() and str(_SRC) not in sys.path: sys.path.insert(0, str(_SRC)) import yaml # noqa: E402 from warden.posture import PostureCatalog, PostureError, load_posture # noqa: E402 # Fields of an env posture that a target environment is expected to match. _ENV_CONFORMANCE_FIELDS = ("backend", "unseal", "real_values") def check_environments( cat: PostureCatalog, environments: Dict[str, Any] ) -> List[str]: """Return a list of env-posture conformance violations (empty == conformant).""" violations: List[str] = [] for env_id, observed in (environments or {}).items(): standard = cat.env(env_id) if standard is None: violations.append(f"environment {env_id!r}: not a known env posture") continue observed = observed or {} for field in _ENV_CONFORMANCE_FIELDS: if field not in observed: continue # field not asserted by the manifest — skip, don't fail want = getattr(standard, field) got = str(observed[field]) if got != want: violations.append( f"environment {env_id!r}: {field} is {got!r}, " f"standard requires {want!r}" ) return violations def check_secret_flows( cat: PostureCatalog, workloads: List[Dict[str, Any]], secret_requests: List[Dict[str, Any]], ) -> List[str]: """Return a list of lattice violations for the requested secret flows.""" by_id = {str(w["id"]): w for w in (workloads or [])} violations: List[str] = [] for req in secret_requests or []: secret = str(req.get("secret", "")) target = str(req.get("to_workload", "")) workload = by_id.get(target) if workload is None: violations.append( f"secret {secret!r}: target workload {target!r} not in manifest" ) continue try: allowed, reasons = cat.can_deliver( workload_env=str(workload["env_posture"]), workload_maturity=str(workload["maturity"]), secret_required_maturity=str(req["required_maturity"]), secret_dataclass=( str(req["dataclass"]) if req.get("dataclass") is not None else None ), ) except (PostureError, KeyError) as e: violations.append(f"secret {secret!r} -> {target!r}: cannot evaluate ({e})") continue if not allowed: violations.append( f"secret {secret!r} -> workload {target!r}: DENIED — " + "; ".join(reasons) ) return violations def run(manifest: Dict[str, Any], cat: Optional[PostureCatalog] = None) -> List[str]: """Evaluate a manifest, returning all violations (empty == conformant).""" cat = cat or load_posture() return check_environments(cat, manifest.get("environments") or {}) + check_secret_flows( cat, manifest.get("workloads") or [], manifest.get("secret_requests") or [], ) def main() -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "--manifest", type=Path, required=True, help="Target manifest (metadata only; see examples/posture-conformance.example.yaml)", ) args = parser.parse_args() if not args.manifest.exists(): print(f"manifest not found: {args.manifest}", file=sys.stderr) return 2 try: manifest = yaml.safe_load(args.manifest.read_text()) or {} except yaml.YAMLError as e: print(f"invalid YAML in manifest: {e}", file=sys.stderr) return 2 if not isinstance(manifest, dict): print("manifest must be a YAML mapping", file=sys.stderr) return 2 try: cat = load_posture() except PostureError as e: print(f"cannot load posture descriptors: {e}", file=sys.stderr) return 2 violations = run(manifest, cat) n_env = len(manifest.get("environments") or {}) n_workloads = len(manifest.get("workloads") or []) n_flows = len(manifest.get("secret_requests") or []) print( f"checked {n_env} environment(s), {n_workloads} workload(s), " f"{n_flows} secret flow(s) against {cat.path}" ) if not violations: print("\nOK — conformant with the Workload Security Posture standard") return 0 print(f"\n{len(violations)} CONFORMANCE VIOLATION(S):") for v in violations: print(f" - {v}") print("\nStandard: wiki/WorkloadSecurityPosture.md") return 1 if __name__ == "__main__": raise SystemExit(main())