From 0812d7303d1ebe8284f80a0a3ed801217dc63d41 Mon Sep 17 00:00:00 2001 From: tegwick Date: Sat, 27 Jun 2026 18:10:54 +0200 Subject: [PATCH] =?UTF-8?q?feat(WARDEN-WP-0015):=20T2=20=E2=80=94=20machin?= =?UTF-8?q?e-readable=20posture=20descriptors=20+=20warden=20policy?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds registry/policy/security-posture.yaml (Axis A env postures, Axis B maturity levels M0-M3, dataclass_floor, lattice rule — no secret material) and src/warden/posture.py: typed loader with validation (unique/contiguous ranks, floor references known levels) and the pure can_deliver() lattice helper (no-write-down: prod posture + workload maturity >= secret required_maturity + dataclass floor). New `warden policy list|show` read-only lookup mirroring `warden route`. tests/test_posture.py covers load, the allow/deny lattice matrix, validation rejections, and CLI. 184 passed, lint clean. Co-Authored-By: Claude Opus 4.8 --- registry/policy/security-posture.yaml | 73 +++++++ src/warden/cli.py | 83 ++++++++ src/warden/posture.py | 189 ++++++++++++++++++ tests/test_posture.py | 144 +++++++++++++ ...WARDEN-WP-0015-secret-lifecycle-tiering.md | 14 +- 5 files changed, 498 insertions(+), 5 deletions(-) create mode 100644 registry/policy/security-posture.yaml create mode 100644 src/warden/posture.py create mode 100644 tests/test_posture.py diff --git a/registry/policy/security-posture.yaml b/registry/policy/security-posture.yaml new file mode 100644 index 0000000..1ea4248 --- /dev/null +++ b/registry/policy/security-posture.yaml @@ -0,0 +1,73 @@ +# NetKingdom Workload Security Posture — machine-readable descriptors +# WARDEN-WP-0015 T2. Authoritative prose: wiki/WorkloadSecurityPosture.md (pending +# promotion to net-kingdom + info-tech-canon canon). +# +# Rules: +# - No secret material in this file, ever (it is git-tracked and agent-visible). +# - DataClassification names are REUSED from the info-tech-canon Data Model. +# - This is a descriptor/data layer; runtime enforcement is flex-auth's. +version: 1 + +# --- Axis A — environment posture (how the secret store is secured) ---------- +env_postures: + - id: dev + rank: 0 + backend: mock-or-contract-double + real_values: forbidden # synthetic only + unseal: n/a + real_user_data: never + audit: optional + - id: test + rank: 1 + backend: openbao-dev-single-unseal + real_values: generated-reuse-allowed + unseal: single-key-or-auto + real_user_data: never + audit: "on" + - id: prod + rank: 2 + backend: openbao-sealed-shamir + real_values: generated-fresh-no-reuse + unseal: shamir-3-of-5-break-glass + real_user_data: allowed + audit: full-tamper-evident + +# --- Axis B — workload maturity (how trusted a workload is) ------------------- +maturity_levels: + - id: M0 + rank: 0 + phase: experimental-poc + max_dataclass: synthetic + promotion_gate: [] + - id: M1 + rank: 1 + phase: alpha-early-access + max_dataclass: internal + promotion_gate: [friendly-customer-scope, basic-slo, data-handling-note] + - id: M2 + rank: 2 + phase: beta-ga + max_dataclass: confidential + promotion_gate: [security-review, slo-history, on-call, incident-runbooks] + - id: M3 + rank: 3 + phase: critical-regulated + max_dataclass: restricted + promotion_gate: [pen-test, shamir-3-of-5-custody, human-in-loop-ops, compliance-audit] + +# --- Data-class floor — minimum maturity to handle each DataClassification ---- +# required_maturity(dataclass). DataClassification names reused from info-tech-canon. +dataclass_floor: + synthetic: M0 + internal: M1 + confidential: M2 + restricted: M3 + +# --- Secret-flow lattice (informational; enforced by T3 checker + flex-auth) -- +# deliver(secret -> workload) permitted iff: +# workload.env_posture == prod +# and rank(workload.maturity) >= rank(secret.required_maturity) +# and rank(workload.maturity) >= rank(dataclass_floor[dataclass(secret)]) +lattice: + requires_env_posture: prod + rule: no-write-down diff --git a/src/warden/cli.py b/src/warden/cli.py index b631644..1eec9ed 100644 --- a/src/warden/cli.py +++ b/src/warden/cli.py @@ -28,6 +28,11 @@ route_app = typer.Typer( no_args_is_help=True, ) app.add_typer(route_app, name="route") +policy_app = typer.Typer( + help="Look up Workload Security Posture descriptors (read-only; env posture + maturity)", + no_args_is_help=True, +) +app.add_typer(policy_app, name="policy") console = Console() err = Console(stderr=True) @@ -979,3 +984,81 @@ def access( f"Obtain it from [bold]{entry.owner_repo}[/bold] as shown — " "warden advises, the owner vends." ) + + +# --------------------------------------------------------------------------- +# warden policy — read-only Workload Security Posture lookup (WP-0015 T2) +# --------------------------------------------------------------------------- + +def _load_posture(): + from warden.posture import PostureError, load_posture + try: + return load_posture() + except PostureError as e: + err.print(f"[red]Posture descriptor error:[/red] {e}") + raise typer.Exit(1) + + +@policy_app.command("list") +def policy_list( + output_json: Annotated[bool, typer.Option("--json", help="Output JSON")] = False, +) -> None: + """List both posture axes: environment postures and workload maturity levels.""" + cat = _load_posture() + if output_json: + print(json.dumps({ + "env_postures": [vars(e) for e in cat.env_postures], + "maturity_levels": [vars(m) for m in cat.maturity_levels], + "dataclass_floor": cat.dataclass_floor, + "requires_env_posture": cat.requires_env_posture, + }, indent=2)) + return + + env_table = Table(title="Axis A — environment posture") + for col in ("ID", "rank", "backend", "real values", "user data", "audit"): + env_table.add_column(col) + for e in sorted(cat.env_postures, key=lambda x: x.rank): + env_table.add_row(e.id, str(e.rank), e.backend, e.real_values, e.real_user_data, e.audit) + console.print(env_table) + + mat_table = Table(title="Axis B — workload maturity") + for col in ("ID", "rank", "phase", "max dataclass", "promotion gate"): + mat_table.add_column(col) + for m in sorted(cat.maturity_levels, key=lambda x: x.rank): + mat_table.add_row(m.id, str(m.rank), m.phase, m.max_dataclass, ", ".join(m.promotion_gate) or "—") + console.print(mat_table) + console.print( + f"\n[dim]lattice: deliver iff env=={cat.requires_env_posture} and " + "workload.maturity >= secret.required_maturity (and the dataclass floor).[/dim]" + ) + + +@policy_app.command("show") +def policy_show( + descriptor_id: Annotated[str, typer.Argument(help="An env posture (dev/test/prod) or maturity level (M0–M3)")], + output_json: Annotated[bool, typer.Option("--json", help="Output JSON")] = False, +) -> None: + """Show one environment posture or maturity level.""" + cat = _load_posture() + env = cat.env(descriptor_id) + mat = cat.maturity(descriptor_id) + if env is None and mat is None: + err.print( + f"[red]Unknown descriptor {descriptor_id!r}.[/red] " + "Try `warden policy list`." + ) + raise typer.Exit(1) + obj = env or mat + if output_json: + print(json.dumps({"axis": "env_posture" if env else "maturity_level", **vars(obj)}, indent=2)) + return + axis = "environment posture" if env else "workload maturity level" + console.print(f"[bold]{obj.id}[/bold] ([cyan]{axis}[/cyan])") + for k, v in vars(obj).items(): + if k == "id": + continue + console.print(f" {k:14}: {', '.join(v) if isinstance(v, list) else v}") + if mat: + floor = [dc for dc, lvl in cat.dataclass_floor.items() if lvl == mat.id] + if floor: + console.print(f" {'dataclass floor':14}: {', '.join(floor)} require this level") diff --git a/src/warden/posture.py b/src/warden/posture.py new file mode 100644 index 0000000..f755c66 --- /dev/null +++ b/src/warden/posture.py @@ -0,0 +1,189 @@ +"""Load and validate the Workload Security Posture descriptors (WP-0015 T2). + +Two axes — environment posture (`dev/test/prod`) and workload maturity (`M0–M3`) — +plus the data-class floor, loaded from ``registry/policy/security-posture.yaml``. This +module is **pure**: it parses descriptors and evaluates the secret-flow lattice. It +holds no secret material and makes no runtime authorization decision (that is +flex-auth's); it is the data + check substrate the conformance checker (T3) runs on. + +Authoritative prose: ``wiki/WorkloadSecurityPosture.md``. +""" +from __future__ import annotations + +import os +from dataclasses import dataclass +from pathlib import Path +from typing import Dict, List, Optional + +import yaml + + +class PostureError(Exception): + """Raised when the posture descriptors are missing or invalid.""" + + +@dataclass +class EnvPosture: + id: str + rank: int + backend: str + real_values: str + unseal: str + real_user_data: str + audit: str + + +@dataclass +class MaturityLevel: + id: str + rank: int + phase: str + max_dataclass: str + promotion_gate: List[str] + + +@dataclass +class PostureCatalog: + path: Path + env_postures: List[EnvPosture] + maturity_levels: List[MaturityLevel] + dataclass_floor: Dict[str, str] # dataclass -> maturity id + requires_env_posture: str # lattice: posture a secret fetch requires + + # --- lookups ---------------------------------------------------------- + def env(self, env_id: str) -> Optional[EnvPosture]: + return next((e for e in self.env_postures if e.id == env_id), None) + + def maturity(self, level_id: str) -> Optional[MaturityLevel]: + return next((m for m in self.maturity_levels if m.id == level_id), None) + + def maturity_rank(self, level_id: str) -> int: + m = self.maturity(level_id) + if m is None: + raise PostureError(f"unknown maturity level: {level_id!r}") + return m.rank + + # --- the secret-flow lattice (no-write-down) -------------------------- + def can_deliver( + self, + *, + workload_env: str, + workload_maturity: str, + secret_required_maturity: str, + secret_dataclass: Optional[str] = None, + ) -> tuple[bool, List[str]]: + """Evaluate the lattice. Returns (allowed, reasons-it-was-denied). + + deliver permitted iff workload is in the required env posture AND the workload's + maturity is >= the secret's required maturity AND >= the floor for the secret's + data classification. Pure — no I/O, no secret value involved. + """ + reasons: List[str] = [] + if workload_env != self.requires_env_posture: + reasons.append( + f"env posture {workload_env!r} != required {self.requires_env_posture!r}" + ) + w_rank = self.maturity_rank(workload_maturity) + if w_rank < self.maturity_rank(secret_required_maturity): + reasons.append( + f"workload maturity {workload_maturity} < required {secret_required_maturity}" + ) + if secret_dataclass is not None: + floor = self.dataclass_floor.get(secret_dataclass) + if floor is None: + reasons.append(f"unknown data classification {secret_dataclass!r}") + elif w_rank < self.maturity_rank(floor): + reasons.append( + f"workload maturity {workload_maturity} < floor {floor} " + f"for dataclass {secret_dataclass}" + ) + return (not reasons, reasons) + + +def find_posture_path(start: Optional[Path] = None) -> Path: + """Locate registry/policy/security-posture.yaml (honors WARDEN_POSTURE_CATALOG).""" + override = os.environ.get("WARDEN_POSTURE_CATALOG") + if override: + return Path(os.path.expanduser(override)) + rel = Path("registry") / "policy" / "security-posture.yaml" + here = (start or Path(__file__)).resolve() + for parent in [here, *here.parents]: + candidate = parent / rel + if candidate.exists(): + return candidate + raise PostureError(f"Posture descriptors not found ({rel}).") + + +def _require_unique_contiguous_ranks(items, kind: str) -> None: + ranks = sorted(i.rank for i in items) + if ranks != list(range(len(ranks))): + raise PostureError( + f"{kind} ranks must be unique and contiguous from 0, got {ranks}" + ) + + +def load_posture(path: Optional[Path] = None) -> PostureCatalog: + """Load, parse, and validate the posture descriptors.""" + posture_path = path or find_posture_path() + if not posture_path.exists(): + raise PostureError(f"Posture descriptors not found: {posture_path}") + try: + raw = yaml.safe_load(posture_path.read_text()) + except yaml.YAMLError as e: + raise PostureError(f"Invalid YAML in {posture_path}: {e}") from e + if not isinstance(raw, dict): + raise PostureError("Posture descriptors must be a YAML mapping") + + try: + env_postures = [ + EnvPosture( + id=str(e["id"]), rank=int(e["rank"]), backend=str(e["backend"]), + real_values=str(e["real_values"]), unseal=str(e["unseal"]), + real_user_data=str(e["real_user_data"]), audit=str(e["audit"]), + ) + for e in raw.get("env_postures") or [] + ] + maturity_levels = [ + MaturityLevel( + id=str(m["id"]), rank=int(m["rank"]), phase=str(m["phase"]), + max_dataclass=str(m["max_dataclass"]), + promotion_gate=[str(g) for g in (m.get("promotion_gate") or [])], + ) + for m in raw.get("maturity_levels") or [] + ] + except (KeyError, TypeError, ValueError) as e: + raise PostureError(f"malformed descriptor entry: {e}") from e + + if not env_postures or not maturity_levels: + raise PostureError("posture descriptors need env_postures and maturity_levels") + _require_unique_contiguous_ranks(env_postures, "env_posture") + _require_unique_contiguous_ranks(maturity_levels, "maturity_level") + + maturity_ids = {m.id for m in maturity_levels} + dataclass_floor = {str(k): str(v) for k, v in (raw.get("dataclass_floor") or {}).items()} + if not dataclass_floor: + raise PostureError("posture descriptors need a dataclass_floor mapping") + for dc, lvl in dataclass_floor.items(): + if lvl not in maturity_ids: + raise PostureError( + f"dataclass_floor[{dc!r}] = {lvl!r} is not a known maturity level" + ) + # Every maturity level's max_dataclass must be a known data classification. + for m in maturity_levels: + if m.max_dataclass not in dataclass_floor: + raise PostureError( + f"maturity {m.id} max_dataclass {m.max_dataclass!r} not in dataclass_floor" + ) + + lattice = raw.get("lattice") or {} + requires_env = str(lattice.get("requires_env_posture", "prod")) + if not any(e.id == requires_env for e in env_postures): + raise PostureError(f"lattice requires_env_posture {requires_env!r} is not an env posture") + + return PostureCatalog( + path=posture_path, + env_postures=env_postures, + maturity_levels=maturity_levels, + dataclass_floor=dataclass_floor, + requires_env_posture=requires_env, + ) diff --git a/tests/test_posture.py b/tests/test_posture.py new file mode 100644 index 0000000..da3fad1 --- /dev/null +++ b/tests/test_posture.py @@ -0,0 +1,144 @@ +"""Tests for Workload Security Posture descriptors + lattice (WP-0015 T2).""" +from __future__ import annotations + +import json +from pathlib import Path + +import pytest +import yaml +from typer.testing import CliRunner + +from warden.cli import app +from warden.posture import PostureError, load_posture + +runner = CliRunner() + + +def _repo_posture() -> Path: + return Path(__file__).resolve().parents[1] / "registry" / "policy" / "security-posture.yaml" + + +# --- real descriptors load + shape ----------------------------------------- + +def test_real_descriptors_load(): + c = load_posture(_repo_posture()) + assert {e.id for e in c.env_postures} == {"dev", "test", "prod"} + assert {m.id for m in c.maturity_levels} == {"M0", "M1", "M2", "M3"} + assert c.requires_env_posture == "prod" + # YAML `on` gotcha must not have become a boolean + assert c.env("test").audit == "on" + + +# --- the secret-flow lattice ----------------------------------------------- + +def test_lattice_allows_matched_prod_workload(): + c = load_posture(_repo_posture()) + ok, why = c.can_deliver( + workload_env="prod", workload_maturity="M3", + secret_required_maturity="M3", secret_dataclass="restricted", + ) + assert ok and why == [] + + +def test_lattice_denies_below_required_maturity(): + c = load_posture(_repo_posture()) + ok, why = c.can_deliver( + workload_env="prod", workload_maturity="M1", + secret_required_maturity="M3", secret_dataclass="restricted", + ) + assert not ok + assert any("maturity M1 < required M3" in r for r in why) + assert any("floor M3" in r for r in why) + + +def test_lattice_denies_non_prod_posture(): + c = load_posture(_repo_posture()) + ok, why = c.can_deliver( + workload_env="test", workload_maturity="M3", + secret_required_maturity="M1", secret_dataclass="internal", + ) + assert not ok and any("env posture" in r for r in why) + + +def test_lattice_unknown_maturity_raises(): + c = load_posture(_repo_posture()) + with pytest.raises(PostureError, match="unknown maturity"): + c.can_deliver( + workload_env="prod", workload_maturity="M9", + secret_required_maturity="M1", + ) + + +# --- validation ------------------------------------------------------------ + +def _write(tmp_path, data) -> Path: + p = tmp_path / "security-posture.yaml" + p.write_text(yaml.dump(data)) + return p + + +def _valid_data() -> dict: + return { + "version": 1, + "env_postures": [ + {"id": "dev", "rank": 0, "backend": "m", "real_values": "f", + "unseal": "n", "real_user_data": "never", "audit": "optional"}, + {"id": "prod", "rank": 1, "backend": "b", "real_values": "g", + "unseal": "s", "real_user_data": "allowed", "audit": "full"}, + ], + "maturity_levels": [ + {"id": "M0", "rank": 0, "phase": "poc", "max_dataclass": "synthetic", "promotion_gate": []}, + {"id": "M1", "rank": 1, "phase": "ga", "max_dataclass": "internal", "promotion_gate": ["x"]}, + ], + "dataclass_floor": {"synthetic": "M0", "internal": "M1"}, + "lattice": {"requires_env_posture": "prod", "rule": "no-write-down"}, + } + + +def test_valid_minimal_loads(tmp_path): + c = load_posture(_write(tmp_path, _valid_data())) + assert c.requires_env_posture == "prod" + + +def test_non_contiguous_ranks_rejected(tmp_path): + data = _valid_data() + data["maturity_levels"][1]["rank"] = 5 + with pytest.raises(PostureError, match="contiguous"): + load_posture(_write(tmp_path, data)) + + +def test_dataclass_floor_unknown_level_rejected(tmp_path): + data = _valid_data() + data["dataclass_floor"]["internal"] = "M9" + with pytest.raises(PostureError, match="not a known maturity level"): + load_posture(_write(tmp_path, data)) + + +def test_lattice_requires_known_env_posture(tmp_path): + data = _valid_data() + data["lattice"]["requires_env_posture"] = "staging" + with pytest.raises(PostureError, match="not an env posture"): + load_posture(_write(tmp_path, data)) + + +# --- CLI ------------------------------------------------------------------- + +def test_cli_policy_list(monkeypatch): + monkeypatch.setenv("WARDEN_POSTURE_CATALOG", str(_repo_posture())) + r = runner.invoke(app, ["policy", "list"]) + assert r.exit_code == 0 + assert "environment posture" in r.stdout and "workload maturity" in r.stdout + + +def test_cli_policy_list_json(monkeypatch): + monkeypatch.setenv("WARDEN_POSTURE_CATALOG", str(_repo_posture())) + r = runner.invoke(app, ["policy", "list", "--json"]) + payload = json.loads(r.stdout) + assert payload["requires_env_posture"] == "prod" + assert len(payload["maturity_levels"]) == 4 + + +def test_cli_policy_show_unknown_exits_1(monkeypatch): + monkeypatch.setenv("WARDEN_POSTURE_CATALOG", str(_repo_posture())) + r = runner.invoke(app, ["policy", "show", "nope"]) + assert r.exit_code == 1 diff --git a/workplans/WARDEN-WP-0015-secret-lifecycle-tiering.md b/workplans/WARDEN-WP-0015-secret-lifecycle-tiering.md index f1fb0ef..9c67d90 100644 --- a/workplans/WARDEN-WP-0015-secret-lifecycle-tiering.md +++ b/workplans/WARDEN-WP-0015-secret-lifecycle-tiering.md @@ -139,16 +139,20 @@ state_hub_task_id: "85aeb676-a593-4056-986a-db14d4c5209f" ```task id: WARDEN-WP-0015-T02 -status: todo +status: done priority: high state_hub_task_id: "011fb0af-154d-40f4-a03e-3172c325321a" ``` -- [ ] `registry/policy/security-posture.yaml` — env-posture tiers (backend, value-policy, +- [x] `registry/policy/security-posture.yaml` — env-posture tiers (backend, value-policy, unseal, data-class, audit) **and** maturity levels (M0–M3, max DataClassification, - promotion-gate criteria), plus per-secret `required_maturity` tagging convention. -- [ ] Loader + validation (mirror `routing/catalog.py` rigor; no secret material). -- [ ] Optional `warden policy show|list` lookup (mirrors `warden route`). + promotion gates), `dataclass_floor` mapping, and the lattice rule. No secret material. +- [x] Loader + validation in `src/warden/posture.py` (mirrors `routing/catalog.py`): + unique/contiguous ranks, dataclass_floor references known levels, lattice env + posture exists. Includes the pure `can_deliver` lattice helper (reused by T3). +- [x] `warden policy list|show` lookup (mirrors `warden route`; `--json`). +- [x] Tests: `tests/test_posture.py` (load, lattice allow/deny matrix, validation + rejections, CLI). 184 pass, lint clean. ### T3 — Conformance checker (incl. secret-flow lattice)