Add challenge and exclusion review handling

This commit is contained in:
2026-05-16 02:58:18 +02:00
parent c8ac42154c
commit b1dff0440d
16 changed files with 644 additions and 21 deletions

View File

@@ -35,7 +35,15 @@ def run_assessment(
assert_valid(item, "evidence-item")
findings = _findings_for_evidence(run_id, evidence)
findings, policy_summary, applied_waivers = apply_policy(root, plan, findings)
(
findings,
policy_summary,
applied_waivers,
applied_challenges,
applied_exclusions,
) = apply_policy(root, plan, evidence, findings)
for item in evidence:
assert_valid(item, "evidence-item")
for finding in findings:
assert_valid(finding, "finding")
@@ -52,6 +60,8 @@ def run_assessment(
mapping_summary,
policy_summary,
applied_waivers,
applied_challenges,
applied_exclusions,
created_at,
)
assert_valid(assessment_package, "assessment-package")
@@ -308,6 +318,7 @@ def _findings_for_evidence(run_id: str, evidence: list[dict[str, Any]]) -> list[
for item in evidence:
if item["result"] not in {"blocked", "fail", "infrastructure_error"}:
continue
expected = _expected_for_item(item)
findings.append(
{
"id": f"finding:{item['check_id']}",
@@ -318,9 +329,12 @@ def _findings_for_evidence(run_id: str, evidence: list[dict[str, Any]]) -> list[
"classification": _classification_for_item(item),
"requirement_refs": item["requirement_refs"],
"evidence_refs": [item["id"]],
"expected": _expected_for_item(item),
"expected": expected,
"waiver_ref": None,
"challenge_ref": None,
"exclusion_ref": None,
"policy_ref": None,
"review_status": "expected" if expected else "unresolved_defect",
"remediation": _remediation_for_item(item),
}
)
@@ -382,6 +396,8 @@ def _assessment_package(
mapping_summary: dict[str, Any],
policy_summary: dict[str, Any],
applied_waivers: list[dict[str, Any]],
applied_challenges: list[dict[str, Any]],
applied_exclusions: list[dict[str, Any]],
created_at: str,
) -> dict[str, Any]:
summary = dict(Counter(item["result"] for item in evidence))
@@ -401,6 +417,8 @@ def _assessment_package(
"evidence_refs": [item["id"] for item in evidence],
"artifact_manifest": artifact_manifest,
"waivers": applied_waivers,
"challenges": applied_challenges,
"exclusions": applied_exclusions,
"certification_boundary": "Guide Board produces preparation evidence only and does not issue certifications or audit assurance.",
"created_at": created_at,
}
@@ -452,6 +470,7 @@ def _markdown_report(run_metadata: dict[str, Any], package: dict[str, Any]) -> s
summary_lines = "- no evidence produced"
mapping_lines = _mapping_summary_lines(package)
policy_lines = _policy_summary_lines(package)
review_lines = _review_summary_lines(package)
return "\n".join(
[
@@ -473,6 +492,10 @@ def _markdown_report(run_metadata: dict[str, Any], package: dict[str, Any]) -> s
"",
policy_lines,
"",
"## Review",
"",
review_lines,
"",
"## Boundary",
"",
package["certification_boundary"],
@@ -502,10 +525,27 @@ def _policy_summary_lines(package: dict[str, Any]) -> str:
f"- applied expectations: {summary.get('applied_expectations', 0)}",
f"- applied waivers: {summary.get('applied_waivers', 0)}",
f"- unexpected findings: {summary.get('unexpected_findings', 0)}",
f"- challenged findings: {summary.get('challenged_findings', 0)}",
f"- authority exclusions: {summary.get('authority_exclusions', 0)}",
f"- unresolved defects: {summary.get('unresolved_defects', 0)}",
]
)
def _review_summary_lines(package: dict[str, Any]) -> str:
findings = package.get("findings", [])
if not findings:
return "- no findings requiring review"
counts = Counter(
finding.get("review_status", "unreviewed")
for finding in findings
if isinstance(finding, dict)
)
return "\n".join(
f"- {status}: {count}" for status, count in sorted(counts.items())
)
def _run_status(evidence: list[dict[str, Any]]) -> str:
if any(item["result"] == "fail" for item in evidence):
return "failed"

View File

@@ -262,6 +262,18 @@ def _build_source_lock(
assessment.get("waivers_ref"),
"waiver-set",
),
"challenges": _optional_policy_source_record(
root,
assessment_path,
assessment.get("challenges_ref"),
"challenge-set",
),
"exclusions": _optional_policy_source_record(
root,
assessment_path,
assessment.get("exclusions_ref"),
"exclusion-set",
),
},
"authorities": _authority_source_records(extensions),
"metadata_hooks": {

View File

@@ -13,20 +13,36 @@ from guide_board.schema import assert_valid
def apply_policy(
root: Path,
plan: dict[str, Any],
evidence: list[dict[str, Any]],
findings: list[dict[str, Any]],
) -> tuple[list[dict[str, Any]], dict[str, Any], list[dict[str, Any]]]:
) -> tuple[
list[dict[str, Any]],
dict[str, Any],
list[dict[str, Any]],
list[dict[str, Any]],
list[dict[str, Any]],
]:
expectations = _load_optional_set(root, plan, "expectations_ref", "expectation-set")
waiver_set = _load_optional_set(root, plan, "waivers_ref", "waiver-set")
challenge_set = _load_optional_set(root, plan, "challenges_ref", "challenge-set")
exclusion_set = _load_optional_set(root, plan, "exclusions_ref", "exclusion-set")
waivers = waiver_set.get("waivers", []) if waiver_set else []
challenges = challenge_set.get("challenges", []) if challenge_set else []
exclusions = exclusion_set.get("exclusions", []) if exclusion_set else []
applied_expectations = 0
applied_waivers: list[dict[str, Any]] = []
applied_challenges: list[dict[str, Any]] = []
applied_exclusions: list[dict[str, Any]] = []
evidence_by_id = {item["id"]: item for item in evidence}
for finding in findings:
for expectation in expectations.get("expectations", []) if expectations else []:
if _matches_rule(finding, expectation):
finding["expected"] = expectation["expected"]
finding["policy_ref"] = expectation["id"]
finding["review_status"] = "expected" if expectation["expected"] else "unresolved_defect"
_annotate_evidence(evidence_by_id, finding, "expectation_refs", expectation["id"])
applied_expectations += 1
break
@@ -37,20 +53,60 @@ def apply_policy(
finding["waiver_ref"] = waiver["id"]
finding["expected"] = True
finding["policy_ref"] = waiver["id"]
finding["review_status"] = "waived"
finding["remediation"] = f"Waived: {waiver['reason']}"
applied_waivers.append(waiver)
_annotate_evidence(evidence_by_id, finding, "waiver_refs", waiver["id"])
break
for exclusion in exclusions:
if not _review_record_active(exclusion):
continue
if _matches_rule(finding, exclusion):
finding["exclusion_ref"] = exclusion["id"]
if finding.get("review_status") == "unresolved_defect":
finding["review_status"] = "authority_excluded"
applied_exclusions.append(exclusion)
_annotate_evidence(evidence_by_id, finding, "exclusion_refs", exclusion["id"])
break
for challenge in challenges:
if not _review_record_active(challenge):
continue
if _matches_rule(finding, challenge):
finding["challenge_ref"] = challenge["id"]
if finding.get("review_status") == "unresolved_defect":
finding["review_status"] = "challenged"
applied_challenges.append(challenge)
_annotate_evidence(evidence_by_id, finding, "challenge_refs", challenge["id"])
break
policy_summary = {
"expectations_ref": plan["assessment_profile_snapshot"].get("expectations_ref"),
"waivers_ref": plan["assessment_profile_snapshot"].get("waivers_ref"),
"challenges_ref": plan["assessment_profile_snapshot"].get("challenges_ref"),
"exclusions_ref": plan["assessment_profile_snapshot"].get("exclusions_ref"),
"applied_expectations": applied_expectations,
"applied_waivers": len(applied_waivers),
"challenged_findings": _unique_applied_count(findings, "challenge_ref"),
"authority_exclusions": _unique_applied_count(findings, "exclusion_ref"),
"unexpected_findings": sum(
1 for finding in findings if not finding.get("expected") and not finding.get("waiver_ref")
),
"unresolved_defects": sum(
1 for finding in findings if finding.get("review_status") == "unresolved_defect"
),
"unresolved_review_items": sum(
1 for finding in findings if finding.get("review_status") in {"challenged", "authority_excluded"}
),
}
return findings, policy_summary, applied_waivers
return (
findings,
policy_summary,
_dedupe_records(applied_waivers),
_dedupe_records(applied_challenges),
_dedupe_records(applied_exclusions),
)
def _load_optional_set(
@@ -94,6 +150,7 @@ def _matches_rule(finding: dict[str, Any], rule: dict[str, Any]) -> bool:
return (
_matches_any(finding.get("requirement_refs", []), rule.get("requirement_refs", []))
and _matches_any([finding.get("check_id", "")], rule.get("check_refs", []))
and _matches_any(finding.get("evidence_refs", []), rule.get("evidence_refs", []))
and _matches_scalar(finding.get("status"), rule.get("result_refs", []))
and _matches_scalar(finding.get("classification"), rule.get("classification_refs", []))
)
@@ -122,3 +179,57 @@ def _waiver_active(waiver: dict[str, Any]) -> bool:
except ValueError:
return False
return expiry >= date.today()
def _review_record_active(record: dict[str, Any]) -> bool:
status = record.get("review_status")
if status in {"rejected", "withdrawn", "closed", "expired"}:
return False
expires_at = record.get("expires_at")
if not expires_at:
return True
try:
expiry = date.fromisoformat(expires_at)
except ValueError:
return False
return expiry >= date.today()
def _annotate_evidence(
evidence_by_id: dict[str, dict[str, Any]],
finding: dict[str, Any],
ref_key: str,
ref_value: str,
) -> None:
for evidence_ref in finding.get("evidence_refs", []):
item = evidence_by_id.get(evidence_ref)
if item is None:
continue
review = item.setdefault(
"review",
{
"expectation_refs": [],
"waiver_refs": [],
"challenge_refs": [],
"exclusion_refs": [],
},
)
refs = review.setdefault(ref_key, [])
if ref_value not in refs:
refs.append(ref_value)
def _unique_applied_count(findings: list[dict[str, Any]], ref_name: str) -> int:
return sum(1 for finding in findings if finding.get(ref_name))
def _dedupe_records(records: list[dict[str, Any]]) -> list[dict[str, Any]]:
seen = set()
deduped = []
for record in records:
record_id = record.get("id")
if not isinstance(record_id, str) or record_id in seen:
continue
seen.add(record_id)
deduped.append(record)
return deduped

View File

@@ -37,6 +37,10 @@ def build_retention_summary(
"unexpected_findings": policy_summary.get("unexpected_findings", 0),
"expected_findings": sum(1 for finding in findings if finding.get("expected")),
"waived_findings": sum(1 for finding in findings if finding.get("waiver_ref")),
"challenged_findings": policy_summary.get("challenged_findings", 0),
"authority_exclusions": policy_summary.get("authority_exclusions", 0),
"unresolved_defects": policy_summary.get("unresolved_defects", 0),
"unresolved_review_items": policy_summary.get("unresolved_review_items", 0),
"mapping_target_count": len(
assessment_package.get("mapping_summary", {}).get("targets", [])
),
@@ -197,6 +201,10 @@ def _run_projection(run: dict[str, Any]) -> dict[str, Any]:
"unexpected_findings": _summary_int(summary, "unexpected_findings"),
"finding_count": _summary_int(summary, "finding_count"),
"artifact_count": _summary_int(summary, "artifact_count"),
"challenged_findings": _summary_int(summary, "challenged_findings"),
"authority_exclusions": _summary_int(summary, "authority_exclusions"),
"unresolved_defects": _summary_int(summary, "unresolved_defects"),
"unresolved_review_items": _summary_int(summary, "unresolved_review_items"),
"run_dir": run.get("run_dir"),
}
@@ -211,9 +219,10 @@ def _trend_between(
"status_changed": False,
"unexpected_findings_delta": 0,
"finding_count_delta": 0,
"artifact_count_delta": 0,
"evidence_result_deltas": {},
}
"artifact_count_delta": 0,
"unresolved_review_items_delta": 0,
"evidence_result_deltas": {},
}
previous_summary = previous.get("summary", {})
latest_summary = latest.get("summary", {})
@@ -230,6 +239,9 @@ def _trend_between(
artifact_delta = _summary_int(latest_summary, "artifact_count") - _summary_int(
previous_summary, "artifact_count"
)
review_delta = _summary_int(latest_summary, "unresolved_review_items") - _summary_int(
previous_summary, "unresolved_review_items"
)
previous_status = _status_for(previous)
latest_status = _status_for(latest)
@@ -239,6 +251,7 @@ def _trend_between(
"unexpected_findings_delta": unexpected_delta,
"finding_count_delta": finding_delta,
"artifact_count_delta": artifact_delta,
"unresolved_review_items_delta": review_delta,
"evidence_result_deltas": evidence_deltas,
}