Files
guide-board/src/guide_board/policy.py

125 lines
4.1 KiB
Python

"""Expectation and waiver policy application."""
from __future__ import annotations
from datetime import date
from pathlib import Path
from typing import Any
from guide_board.io import load_json
from guide_board.schema import assert_valid
def apply_policy(
root: Path,
plan: dict[str, Any],
findings: list[dict[str, Any]],
) -> tuple[list[dict[str, Any]], dict[str, Any], list[dict[str, Any]]]:
expectations = _load_optional_set(root, plan, "expectations_ref", "expectation-set")
waiver_set = _load_optional_set(root, plan, "waivers_ref", "waiver-set")
waivers = waiver_set.get("waivers", []) if waiver_set else []
applied_expectations = 0
applied_waivers: list[dict[str, Any]] = []
for finding in findings:
for expectation in expectations.get("expectations", []) if expectations else []:
if _matches_rule(finding, expectation):
finding["expected"] = expectation["expected"]
finding["policy_ref"] = expectation["id"]
applied_expectations += 1
break
for waiver in waivers:
if not _waiver_active(waiver):
continue
if _matches_rule(finding, waiver):
finding["waiver_ref"] = waiver["id"]
finding["expected"] = True
finding["policy_ref"] = waiver["id"]
finding["remediation"] = f"Waived: {waiver['reason']}"
applied_waivers.append(waiver)
break
policy_summary = {
"expectations_ref": plan["assessment_profile_snapshot"].get("expectations_ref"),
"waivers_ref": plan["assessment_profile_snapshot"].get("waivers_ref"),
"applied_expectations": applied_expectations,
"applied_waivers": len(applied_waivers),
"unexpected_findings": sum(
1 for finding in findings if not finding.get("expected") and not finding.get("waiver_ref")
),
}
return findings, policy_summary, applied_waivers
def _load_optional_set(
root: Path,
plan: dict[str, Any],
ref_name: str,
schema_name: str,
) -> dict[str, Any] | None:
ref = plan["assessment_profile_snapshot"].get(ref_name)
if not ref:
return None
path = _resolve_policy_ref(root, plan, ref)
document = load_json(path)
assert_valid(document, schema_name)
target_ref = plan["target_profile_snapshot"]["id"]
if document["target_profile_ref"] != target_ref:
raise ValueError(
f"{path}: target_profile_ref {document['target_profile_ref']!r} "
f"does not match target profile {target_ref!r}"
)
return document
def _resolve_policy_ref(root: Path, plan: dict[str, Any], ref: str) -> Path:
ref_path = Path(ref)
if ref_path.is_absolute():
return ref_path
root_relative = root / ref_path
if root_relative.exists():
return root_relative
assessment_dir = plan.get("profile_paths", {}).get("assessment_profile_dir")
if isinstance(assessment_dir, str):
return Path(assessment_dir) / ref_path
return root_relative
def _matches_rule(finding: dict[str, Any], rule: dict[str, Any]) -> bool:
return (
_matches_any(finding.get("requirement_refs", []), rule.get("requirement_refs", []))
and _matches_any([finding.get("check_id", "")], rule.get("check_refs", []))
and _matches_scalar(finding.get("status"), rule.get("result_refs", []))
and _matches_scalar(finding.get("classification"), rule.get("classification_refs", []))
)
def _matches_any(values: list[str], patterns: list[str]) -> bool:
if not patterns:
return True
return any(value in patterns for value in values)
def _matches_scalar(value: Any, patterns: list[str]) -> bool:
if not patterns:
return True
return isinstance(value, str) and value in patterns
def _waiver_active(waiver: dict[str, Any]) -> bool:
if waiver.get("review_status") != "approved":
return False
expires_at = waiver.get("expires_at")
if not expires_at:
return True
try:
expiry = date.fromisoformat(expires_at)
except ValueError:
return False
return expiry >= date.today()