Files
guide-board/src/guide_board/gates.py

163 lines
5.2 KiB
Python

"""Quality gate evaluation for retained run trends."""
from __future__ import annotations
from datetime import datetime, timezone
from typing import Any
def evaluate_trend_gates(
trend_summary: dict[str, Any],
*,
allowed_statuses: list[str] | None = None,
max_unexpected_findings: int = 0,
fail_on_regression: bool = True,
target_profile_ref: str | None = None,
assessment_profile_ref: str | None = None,
) -> dict[str, Any]:
allowed = allowed_statuses or ["completed"]
selected_groups = [
group
for group in trend_summary.get("groups", [])
if _matches_group(group, target_profile_ref, assessment_profile_ref)
]
group_results = [
_evaluate_group(group, allowed, max_unexpected_findings, fail_on_regression)
for group in selected_groups
]
if not group_results:
group_results.append(
{
"id": "no-matching-history",
"target_profile_ref": target_profile_ref,
"assessment_profile_ref": assessment_profile_ref,
"status": "failed",
"latest_run_ref": None,
"checks": [
{
"id": "history-present",
"status": "failed",
"observed": 0,
"expected": "at least one retained run",
"message": "No retained run history matched the gate selection.",
}
],
}
)
failed_groups = sum(1 for group in group_results if group["status"] == "failed")
passed_groups = len(group_results) - failed_groups
now = datetime.now(timezone.utc)
return {
"id": f"gate-summary:{now.strftime('%Y%m%dT%H%M%SZ')}",
"created_at": now.isoformat(),
"trend_summary_ref": trend_summary["id"],
"status": "failed" if failed_groups else "passed",
"policy": {
"allowed_statuses": allowed,
"max_unexpected_findings": max_unexpected_findings,
"fail_on_regression": fail_on_regression,
"target_profile_ref": target_profile_ref,
"assessment_profile_ref": assessment_profile_ref,
},
"group_count": len(group_results),
"passed_groups": passed_groups,
"failed_groups": failed_groups,
"groups": group_results,
}
def _matches_group(
group: dict[str, Any],
target_profile_ref: str | None,
assessment_profile_ref: str | None,
) -> bool:
if target_profile_ref and group.get("target_profile_ref") != target_profile_ref:
return False
if (
assessment_profile_ref
and group.get("assessment_profile_ref") != assessment_profile_ref
):
return False
return True
def _evaluate_group(
group: dict[str, Any],
allowed_statuses: list[str],
max_unexpected_findings: int,
fail_on_regression: bool,
) -> dict[str, Any]:
latest = group.get("latest_run", {})
trend = group.get("trend", {})
checks = [
_latest_status_check(latest, allowed_statuses),
_unexpected_findings_check(latest, max_unexpected_findings),
]
if fail_on_regression:
checks.append(_regression_check(trend))
failed = any(check["status"] == "failed" for check in checks)
return {
"id": group.get("id"),
"target_profile_ref": group.get("target_profile_ref"),
"assessment_profile_ref": group.get("assessment_profile_ref"),
"status": "failed" if failed else "passed",
"latest_run_ref": latest.get("run_id"),
"checks": checks,
}
def _latest_status_check(
latest: dict[str, Any],
allowed_statuses: list[str],
) -> dict[str, Any]:
observed = latest.get("status", "unknown")
passed = observed in allowed_statuses
return {
"id": "latest-status",
"status": "passed" if passed else "failed",
"observed": observed,
"expected": allowed_statuses,
"message": "Latest retained run status is acceptable."
if passed
else "Latest retained run status is outside the gate policy.",
}
def _unexpected_findings_check(
latest: dict[str, Any],
max_unexpected_findings: int,
) -> dict[str, Any]:
observed = _int_value(latest.get("unexpected_findings", 0))
passed = observed <= max_unexpected_findings
return {
"id": "unexpected-findings",
"status": "passed" if passed else "failed",
"observed": observed,
"expected": f"<= {max_unexpected_findings}",
"message": "Unexpected finding count is within policy."
if passed
else "Unexpected finding count exceeds policy.",
}
def _regression_check(trend: dict[str, Any]) -> dict[str, Any]:
observed = trend.get("direction", "insufficient-history")
passed = observed != "regressed"
return {
"id": "trend-regression",
"status": "passed" if passed else "failed",
"observed": observed,
"expected": "not regressed",
"message": "Latest trend has not regressed."
if passed
else "Latest trend regressed compared with the previous retained run.",
}
def _int_value(value: Any) -> int:
return value if isinstance(value, int) and not isinstance(value, bool) else 0