generated from coulomb/repo-seed
Separated open-cmis-tck and guide-board repositories
This commit is contained in:
124
src/guide_board/policy.py
Normal file
124
src/guide_board/policy.py
Normal file
@@ -0,0 +1,124 @@
|
||||
"""Expectation and waiver policy application."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import date
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from guide_board.io import load_json
|
||||
from guide_board.schema import assert_valid
|
||||
|
||||
|
||||
def apply_policy(
|
||||
root: Path,
|
||||
plan: dict[str, Any],
|
||||
findings: list[dict[str, Any]],
|
||||
) -> tuple[list[dict[str, Any]], dict[str, Any], list[dict[str, Any]]]:
|
||||
expectations = _load_optional_set(root, plan, "expectations_ref", "expectation-set")
|
||||
waiver_set = _load_optional_set(root, plan, "waivers_ref", "waiver-set")
|
||||
waivers = waiver_set.get("waivers", []) if waiver_set else []
|
||||
|
||||
applied_expectations = 0
|
||||
applied_waivers: list[dict[str, Any]] = []
|
||||
|
||||
for finding in findings:
|
||||
for expectation in expectations.get("expectations", []) if expectations else []:
|
||||
if _matches_rule(finding, expectation):
|
||||
finding["expected"] = expectation["expected"]
|
||||
finding["policy_ref"] = expectation["id"]
|
||||
applied_expectations += 1
|
||||
break
|
||||
|
||||
for waiver in waivers:
|
||||
if not _waiver_active(waiver):
|
||||
continue
|
||||
if _matches_rule(finding, waiver):
|
||||
finding["waiver_ref"] = waiver["id"]
|
||||
finding["expected"] = True
|
||||
finding["policy_ref"] = waiver["id"]
|
||||
finding["remediation"] = f"Waived: {waiver['reason']}"
|
||||
applied_waivers.append(waiver)
|
||||
break
|
||||
|
||||
policy_summary = {
|
||||
"expectations_ref": plan["assessment_profile_snapshot"].get("expectations_ref"),
|
||||
"waivers_ref": plan["assessment_profile_snapshot"].get("waivers_ref"),
|
||||
"applied_expectations": applied_expectations,
|
||||
"applied_waivers": len(applied_waivers),
|
||||
"unexpected_findings": sum(
|
||||
1 for finding in findings if not finding.get("expected") and not finding.get("waiver_ref")
|
||||
),
|
||||
}
|
||||
return findings, policy_summary, applied_waivers
|
||||
|
||||
|
||||
def _load_optional_set(
|
||||
root: Path,
|
||||
plan: dict[str, Any],
|
||||
ref_name: str,
|
||||
schema_name: str,
|
||||
) -> dict[str, Any] | None:
|
||||
ref = plan["assessment_profile_snapshot"].get(ref_name)
|
||||
if not ref:
|
||||
return None
|
||||
path = _resolve_policy_ref(root, plan, ref)
|
||||
document = load_json(path)
|
||||
assert_valid(document, schema_name)
|
||||
target_ref = plan["target_profile_snapshot"]["id"]
|
||||
if document["target_profile_ref"] != target_ref:
|
||||
raise ValueError(
|
||||
f"{path}: target_profile_ref {document['target_profile_ref']!r} "
|
||||
f"does not match target profile {target_ref!r}"
|
||||
)
|
||||
return document
|
||||
|
||||
|
||||
def _resolve_policy_ref(root: Path, plan: dict[str, Any], ref: str) -> Path:
|
||||
ref_path = Path(ref)
|
||||
if ref_path.is_absolute():
|
||||
return ref_path
|
||||
|
||||
root_relative = root / ref_path
|
||||
if root_relative.exists():
|
||||
return root_relative
|
||||
|
||||
assessment_dir = plan.get("profile_paths", {}).get("assessment_profile_dir")
|
||||
if isinstance(assessment_dir, str):
|
||||
return Path(assessment_dir) / ref_path
|
||||
|
||||
return root_relative
|
||||
|
||||
|
||||
def _matches_rule(finding: dict[str, Any], rule: dict[str, Any]) -> bool:
|
||||
return (
|
||||
_matches_any(finding.get("requirement_refs", []), rule.get("requirement_refs", []))
|
||||
and _matches_any([finding.get("check_id", "")], rule.get("check_refs", []))
|
||||
and _matches_scalar(finding.get("status"), rule.get("result_refs", []))
|
||||
and _matches_scalar(finding.get("classification"), rule.get("classification_refs", []))
|
||||
)
|
||||
|
||||
|
||||
def _matches_any(values: list[str], patterns: list[str]) -> bool:
|
||||
if not patterns:
|
||||
return True
|
||||
return any(value in patterns for value in values)
|
||||
|
||||
|
||||
def _matches_scalar(value: Any, patterns: list[str]) -> bool:
|
||||
if not patterns:
|
||||
return True
|
||||
return isinstance(value, str) and value in patterns
|
||||
|
||||
|
||||
def _waiver_active(waiver: dict[str, Any]) -> bool:
|
||||
if waiver.get("review_status") != "approved":
|
||||
return False
|
||||
expires_at = waiver.get("expires_at")
|
||||
if not expires_at:
|
||||
return True
|
||||
try:
|
||||
expiry = date.fromisoformat(expires_at)
|
||||
except ValueError:
|
||||
return False
|
||||
return expiry >= date.today()
|
||||
Reference in New Issue
Block a user