generated from coulomb/repo-seed
feat(WARDEN-WP-0015): T3 conformance checker + T4 dev-tier contract doubles
Finish the Workload Security Posture workplan (all five tasks done). T3 — scripts/check_secret_posture_conformance.py: read-only checker that asserts env-posture conformance (backend/unseal/real_values per tier) and evaluates the secret-flow lattice via posture.can_deliver. Metadata-only manifest, no secret values, exit 0/1/2. examples/posture-conformance.example.yaml as the reference. T4 — src/warden/doubles.py: generalizes "fake bao" into materialize_doubles() — hermetic, synthetic-only (synthetic- prefix) stand-ins for bao/key-cape honoring each argv/stdout/exit contract, for fully offline dev/test access flows. Documented as the sanctioned dev backend in WorkloadSecurityPosture.md R1. T5 — INTENT/SCOPE/wiki aligned; canon landing in net-kingdom/info-tech-canon left owner-driven (tracked via coordination messages). 16 new tests, 200 passing, ruff clean. Archived WP-0012/0014/0015 to workplans/archived/ with 260627- prefix. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
165
scripts/check_secret_posture_conformance.py
Normal file
165
scripts/check_secret_posture_conformance.py
Normal file
@@ -0,0 +1,165 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Read-only conformance checker for the Workload Security Posture (WP-0015 T3).
|
||||
|
||||
Given a *metadata-only* target manifest (see ``examples/posture-conformance.example.yaml``),
|
||||
assert two things against ``registry/policy/security-posture.yaml``:
|
||||
|
||||
1. **Environment posture conformance** — each environment's observed secret-store
|
||||
posture (backend / unseal / real_values) matches the standard descriptor for that
|
||||
tier. Catches "prod" stores that are not sealed-Shamir, or a "dev" store that admits
|
||||
real values.
|
||||
2. **Secret-flow lattice** — every requested secret flow is permitted by the
|
||||
no-write-down lattice for its target workload (``warden.posture.can_deliver``):
|
||||
prod posture, and workload maturity >= the secret's ``required_maturity`` and the
|
||||
data-class floor.
|
||||
|
||||
Exit 0 when fully conformant; exit 1 on any violation; exit 2 on bad input. This script
|
||||
reads descriptors and target metadata only — it never reads, fetches, or prints a secret
|
||||
value. Drift-report shaped, mirroring ``scripts/check_principals_drift.py``.
|
||||
|
||||
Usage:
|
||||
python scripts/check_secret_posture_conformance.py \\
|
||||
--manifest examples/posture-conformance.example.yaml
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
# Allow running as a plain script (no install) by adding src/ to the path.
|
||||
_SRC = Path(__file__).resolve().parent.parent / "src"
|
||||
if _SRC.is_dir() and str(_SRC) not in sys.path:
|
||||
sys.path.insert(0, str(_SRC))
|
||||
|
||||
import yaml # noqa: E402
|
||||
|
||||
from warden.posture import PostureCatalog, PostureError, load_posture # noqa: E402
|
||||
|
||||
# Fields of an env posture that a target environment is expected to match.
|
||||
_ENV_CONFORMANCE_FIELDS = ("backend", "unseal", "real_values")
|
||||
|
||||
|
||||
def check_environments(
|
||||
cat: PostureCatalog, environments: Dict[str, Any]
|
||||
) -> List[str]:
|
||||
"""Return a list of env-posture conformance violations (empty == conformant)."""
|
||||
violations: List[str] = []
|
||||
for env_id, observed in (environments or {}).items():
|
||||
standard = cat.env(env_id)
|
||||
if standard is None:
|
||||
violations.append(f"environment {env_id!r}: not a known env posture")
|
||||
continue
|
||||
observed = observed or {}
|
||||
for field in _ENV_CONFORMANCE_FIELDS:
|
||||
if field not in observed:
|
||||
continue # field not asserted by the manifest — skip, don't fail
|
||||
want = getattr(standard, field)
|
||||
got = str(observed[field])
|
||||
if got != want:
|
||||
violations.append(
|
||||
f"environment {env_id!r}: {field} is {got!r}, "
|
||||
f"standard requires {want!r}"
|
||||
)
|
||||
return violations
|
||||
|
||||
|
||||
def check_secret_flows(
|
||||
cat: PostureCatalog,
|
||||
workloads: List[Dict[str, Any]],
|
||||
secret_requests: List[Dict[str, Any]],
|
||||
) -> List[str]:
|
||||
"""Return a list of lattice violations for the requested secret flows."""
|
||||
by_id = {str(w["id"]): w for w in (workloads or [])}
|
||||
violations: List[str] = []
|
||||
for req in secret_requests or []:
|
||||
secret = str(req.get("secret", "<unnamed>"))
|
||||
target = str(req.get("to_workload", ""))
|
||||
workload = by_id.get(target)
|
||||
if workload is None:
|
||||
violations.append(
|
||||
f"secret {secret!r}: target workload {target!r} not in manifest"
|
||||
)
|
||||
continue
|
||||
try:
|
||||
allowed, reasons = cat.can_deliver(
|
||||
workload_env=str(workload["env_posture"]),
|
||||
workload_maturity=str(workload["maturity"]),
|
||||
secret_required_maturity=str(req["required_maturity"]),
|
||||
secret_dataclass=(
|
||||
str(req["dataclass"]) if req.get("dataclass") is not None else None
|
||||
),
|
||||
)
|
||||
except (PostureError, KeyError) as e:
|
||||
violations.append(f"secret {secret!r} -> {target!r}: cannot evaluate ({e})")
|
||||
continue
|
||||
if not allowed:
|
||||
violations.append(
|
||||
f"secret {secret!r} -> workload {target!r}: DENIED — "
|
||||
+ "; ".join(reasons)
|
||||
)
|
||||
return violations
|
||||
|
||||
|
||||
def run(manifest: Dict[str, Any], cat: Optional[PostureCatalog] = None) -> List[str]:
|
||||
"""Evaluate a manifest, returning all violations (empty == conformant)."""
|
||||
cat = cat or load_posture()
|
||||
return check_environments(cat, manifest.get("environments") or {}) + check_secret_flows(
|
||||
cat,
|
||||
manifest.get("workloads") or [],
|
||||
manifest.get("secret_requests") or [],
|
||||
)
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument(
|
||||
"--manifest",
|
||||
type=Path,
|
||||
required=True,
|
||||
help="Target manifest (metadata only; see examples/posture-conformance.example.yaml)",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
if not args.manifest.exists():
|
||||
print(f"manifest not found: {args.manifest}", file=sys.stderr)
|
||||
return 2
|
||||
try:
|
||||
manifest = yaml.safe_load(args.manifest.read_text()) or {}
|
||||
except yaml.YAMLError as e:
|
||||
print(f"invalid YAML in manifest: {e}", file=sys.stderr)
|
||||
return 2
|
||||
if not isinstance(manifest, dict):
|
||||
print("manifest must be a YAML mapping", file=sys.stderr)
|
||||
return 2
|
||||
|
||||
try:
|
||||
cat = load_posture()
|
||||
except PostureError as e:
|
||||
print(f"cannot load posture descriptors: {e}", file=sys.stderr)
|
||||
return 2
|
||||
|
||||
violations = run(manifest, cat)
|
||||
|
||||
n_env = len(manifest.get("environments") or {})
|
||||
n_workloads = len(manifest.get("workloads") or [])
|
||||
n_flows = len(manifest.get("secret_requests") or [])
|
||||
print(
|
||||
f"checked {n_env} environment(s), {n_workloads} workload(s), "
|
||||
f"{n_flows} secret flow(s) against {cat.path}"
|
||||
)
|
||||
|
||||
if not violations:
|
||||
print("\nOK — conformant with the Workload Security Posture standard")
|
||||
return 0
|
||||
|
||||
print(f"\n{len(violations)} CONFORMANCE VIOLATION(S):")
|
||||
for v in violations:
|
||||
print(f" - {v}")
|
||||
print("\nStandard: wiki/WorkloadSecurityPosture.md")
|
||||
return 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Reference in New Issue
Block a user