feat(WARDEN-WP-0015): T2 — machine-readable posture descriptors + warden policy

Adds registry/policy/security-posture.yaml (Axis A env postures, Axis B
maturity levels M0-M3, dataclass_floor, lattice rule — no secret
material) and src/warden/posture.py: typed loader with validation
(unique/contiguous ranks, floor references known levels) and the pure
can_deliver() lattice helper (no-write-down: prod posture + workload
maturity >= secret required_maturity + dataclass floor). New `warden
policy list|show` read-only lookup mirroring `warden route`.
tests/test_posture.py covers load, the allow/deny lattice matrix,
validation rejections, and CLI. 184 passed, lint clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-27 18:10:54 +02:00
parent a54403b9d7
commit 0812d7303d
5 changed files with 498 additions and 5 deletions

View File

@@ -28,6 +28,11 @@ route_app = typer.Typer(
no_args_is_help=True,
)
app.add_typer(route_app, name="route")
policy_app = typer.Typer(
help="Look up Workload Security Posture descriptors (read-only; env posture + maturity)",
no_args_is_help=True,
)
app.add_typer(policy_app, name="policy")
console = Console()
err = Console(stderr=True)
@@ -979,3 +984,81 @@ def access(
f"Obtain it from [bold]{entry.owner_repo}[/bold] as shown — "
"warden advises, the owner vends."
)
# ---------------------------------------------------------------------------
# warden policy — read-only Workload Security Posture lookup (WP-0015 T2)
# ---------------------------------------------------------------------------
def _load_posture():
from warden.posture import PostureError, load_posture
try:
return load_posture()
except PostureError as e:
err.print(f"[red]Posture descriptor error:[/red] {e}")
raise typer.Exit(1)
@policy_app.command("list")
def policy_list(
output_json: Annotated[bool, typer.Option("--json", help="Output JSON")] = False,
) -> None:
"""List both posture axes: environment postures and workload maturity levels."""
cat = _load_posture()
if output_json:
print(json.dumps({
"env_postures": [vars(e) for e in cat.env_postures],
"maturity_levels": [vars(m) for m in cat.maturity_levels],
"dataclass_floor": cat.dataclass_floor,
"requires_env_posture": cat.requires_env_posture,
}, indent=2))
return
env_table = Table(title="Axis A — environment posture")
for col in ("ID", "rank", "backend", "real values", "user data", "audit"):
env_table.add_column(col)
for e in sorted(cat.env_postures, key=lambda x: x.rank):
env_table.add_row(e.id, str(e.rank), e.backend, e.real_values, e.real_user_data, e.audit)
console.print(env_table)
mat_table = Table(title="Axis B — workload maturity")
for col in ("ID", "rank", "phase", "max dataclass", "promotion gate"):
mat_table.add_column(col)
for m in sorted(cat.maturity_levels, key=lambda x: x.rank):
mat_table.add_row(m.id, str(m.rank), m.phase, m.max_dataclass, ", ".join(m.promotion_gate) or "")
console.print(mat_table)
console.print(
f"\n[dim]lattice: deliver iff env=={cat.requires_env_posture} and "
"workload.maturity >= secret.required_maturity (and the dataclass floor).[/dim]"
)
@policy_app.command("show")
def policy_show(
descriptor_id: Annotated[str, typer.Argument(help="An env posture (dev/test/prod) or maturity level (M0M3)")],
output_json: Annotated[bool, typer.Option("--json", help="Output JSON")] = False,
) -> None:
"""Show one environment posture or maturity level."""
cat = _load_posture()
env = cat.env(descriptor_id)
mat = cat.maturity(descriptor_id)
if env is None and mat is None:
err.print(
f"[red]Unknown descriptor {descriptor_id!r}.[/red] "
"Try `warden policy list`."
)
raise typer.Exit(1)
obj = env or mat
if output_json:
print(json.dumps({"axis": "env_posture" if env else "maturity_level", **vars(obj)}, indent=2))
return
axis = "environment posture" if env else "workload maturity level"
console.print(f"[bold]{obj.id}[/bold] ([cyan]{axis}[/cyan])")
for k, v in vars(obj).items():
if k == "id":
continue
console.print(f" {k:14}: {', '.join(v) if isinstance(v, list) else v}")
if mat:
floor = [dc for dc, lvl in cat.dataclass_floor.items() if lvl == mat.id]
if floor:
console.print(f" {'dataclass floor':14}: {', '.join(floor)} require this level")

189
src/warden/posture.py Normal file
View File

@@ -0,0 +1,189 @@
"""Load and validate the Workload Security Posture descriptors (WP-0015 T2).
Two axes — environment posture (`dev/test/prod`) and workload maturity (`M0M3`) —
plus the data-class floor, loaded from ``registry/policy/security-posture.yaml``. This
module is **pure**: it parses descriptors and evaluates the secret-flow lattice. It
holds no secret material and makes no runtime authorization decision (that is
flex-auth's); it is the data + check substrate the conformance checker (T3) runs on.
Authoritative prose: ``wiki/WorkloadSecurityPosture.md``.
"""
from __future__ import annotations
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Optional
import yaml
class PostureError(Exception):
"""Raised when the posture descriptors are missing or invalid."""
@dataclass
class EnvPosture:
id: str
rank: int
backend: str
real_values: str
unseal: str
real_user_data: str
audit: str
@dataclass
class MaturityLevel:
id: str
rank: int
phase: str
max_dataclass: str
promotion_gate: List[str]
@dataclass
class PostureCatalog:
path: Path
env_postures: List[EnvPosture]
maturity_levels: List[MaturityLevel]
dataclass_floor: Dict[str, str] # dataclass -> maturity id
requires_env_posture: str # lattice: posture a secret fetch requires
# --- lookups ----------------------------------------------------------
def env(self, env_id: str) -> Optional[EnvPosture]:
return next((e for e in self.env_postures if e.id == env_id), None)
def maturity(self, level_id: str) -> Optional[MaturityLevel]:
return next((m for m in self.maturity_levels if m.id == level_id), None)
def maturity_rank(self, level_id: str) -> int:
m = self.maturity(level_id)
if m is None:
raise PostureError(f"unknown maturity level: {level_id!r}")
return m.rank
# --- the secret-flow lattice (no-write-down) --------------------------
def can_deliver(
self,
*,
workload_env: str,
workload_maturity: str,
secret_required_maturity: str,
secret_dataclass: Optional[str] = None,
) -> tuple[bool, List[str]]:
"""Evaluate the lattice. Returns (allowed, reasons-it-was-denied).
deliver permitted iff workload is in the required env posture AND the workload's
maturity is >= the secret's required maturity AND >= the floor for the secret's
data classification. Pure — no I/O, no secret value involved.
"""
reasons: List[str] = []
if workload_env != self.requires_env_posture:
reasons.append(
f"env posture {workload_env!r} != required {self.requires_env_posture!r}"
)
w_rank = self.maturity_rank(workload_maturity)
if w_rank < self.maturity_rank(secret_required_maturity):
reasons.append(
f"workload maturity {workload_maturity} < required {secret_required_maturity}"
)
if secret_dataclass is not None:
floor = self.dataclass_floor.get(secret_dataclass)
if floor is None:
reasons.append(f"unknown data classification {secret_dataclass!r}")
elif w_rank < self.maturity_rank(floor):
reasons.append(
f"workload maturity {workload_maturity} < floor {floor} "
f"for dataclass {secret_dataclass}"
)
return (not reasons, reasons)
def find_posture_path(start: Optional[Path] = None) -> Path:
"""Locate registry/policy/security-posture.yaml (honors WARDEN_POSTURE_CATALOG)."""
override = os.environ.get("WARDEN_POSTURE_CATALOG")
if override:
return Path(os.path.expanduser(override))
rel = Path("registry") / "policy" / "security-posture.yaml"
here = (start or Path(__file__)).resolve()
for parent in [here, *here.parents]:
candidate = parent / rel
if candidate.exists():
return candidate
raise PostureError(f"Posture descriptors not found ({rel}).")
def _require_unique_contiguous_ranks(items, kind: str) -> None:
ranks = sorted(i.rank for i in items)
if ranks != list(range(len(ranks))):
raise PostureError(
f"{kind} ranks must be unique and contiguous from 0, got {ranks}"
)
def load_posture(path: Optional[Path] = None) -> PostureCatalog:
"""Load, parse, and validate the posture descriptors."""
posture_path = path or find_posture_path()
if not posture_path.exists():
raise PostureError(f"Posture descriptors not found: {posture_path}")
try:
raw = yaml.safe_load(posture_path.read_text())
except yaml.YAMLError as e:
raise PostureError(f"Invalid YAML in {posture_path}: {e}") from e
if not isinstance(raw, dict):
raise PostureError("Posture descriptors must be a YAML mapping")
try:
env_postures = [
EnvPosture(
id=str(e["id"]), rank=int(e["rank"]), backend=str(e["backend"]),
real_values=str(e["real_values"]), unseal=str(e["unseal"]),
real_user_data=str(e["real_user_data"]), audit=str(e["audit"]),
)
for e in raw.get("env_postures") or []
]
maturity_levels = [
MaturityLevel(
id=str(m["id"]), rank=int(m["rank"]), phase=str(m["phase"]),
max_dataclass=str(m["max_dataclass"]),
promotion_gate=[str(g) for g in (m.get("promotion_gate") or [])],
)
for m in raw.get("maturity_levels") or []
]
except (KeyError, TypeError, ValueError) as e:
raise PostureError(f"malformed descriptor entry: {e}") from e
if not env_postures or not maturity_levels:
raise PostureError("posture descriptors need env_postures and maturity_levels")
_require_unique_contiguous_ranks(env_postures, "env_posture")
_require_unique_contiguous_ranks(maturity_levels, "maturity_level")
maturity_ids = {m.id for m in maturity_levels}
dataclass_floor = {str(k): str(v) for k, v in (raw.get("dataclass_floor") or {}).items()}
if not dataclass_floor:
raise PostureError("posture descriptors need a dataclass_floor mapping")
for dc, lvl in dataclass_floor.items():
if lvl not in maturity_ids:
raise PostureError(
f"dataclass_floor[{dc!r}] = {lvl!r} is not a known maturity level"
)
# Every maturity level's max_dataclass must be a known data classification.
for m in maturity_levels:
if m.max_dataclass not in dataclass_floor:
raise PostureError(
f"maturity {m.id} max_dataclass {m.max_dataclass!r} not in dataclass_floor"
)
lattice = raw.get("lattice") or {}
requires_env = str(lattice.get("requires_env_posture", "prod"))
if not any(e.id == requires_env for e in env_postures):
raise PostureError(f"lattice requires_env_posture {requires_env!r} is not an env posture")
return PostureCatalog(
path=posture_path,
env_postures=env_postures,
maturity_levels=maturity_levels,
dataclass_floor=dataclass_floor,
requires_env_posture=requires_env,
)