quality gate layer

This commit is contained in:
2026-05-07 17:00:10 +02:00
parent 4c44db802d
commit ed0f270278
6 changed files with 267 additions and 0 deletions

View File

@@ -24,6 +24,7 @@ PYTHONPATH=src python3 -m guide_board run \
--assessment profiles/assessments/sample-noop.json
PYTHONPATH=src python3 -m guide_board runs list
PYTHONPATH=src python3 -m guide_board runs trend
PYTHONPATH=src python3 -m guide_board runs gate
PYTHONPATH=src python3 -m unittest discover -s tests
```

View File

@@ -243,6 +243,10 @@ Architecture lesson:
Repository quality packs should be normal extensions. A score is not a
certification verdict; it is a normalized finding and trend signal.
Quality gates should be core policy decisions over retained posture, not
extension-specific verdicts. The first gate layer checks latest run status,
unexpected finding count, and whether the latest trend regressed.
Sources:
- [OpenSSF Scorecard](https://openssf.org/projects/scorecard/)

View File

@@ -0,0 +1,28 @@
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "Guide Board Gate Summary",
"type": "object",
"additionalProperties": false,
"required": [
"id",
"created_at",
"trend_summary_ref",
"status",
"policy",
"group_count",
"passed_groups",
"failed_groups",
"groups"
],
"properties": {
"id": { "type": "string" },
"created_at": { "type": "string" },
"trend_summary_ref": { "type": "string" },
"status": { "type": "string" },
"policy": { "type": "object" },
"group_count": { "type": "integer" },
"passed_groups": { "type": "integer" },
"failed_groups": { "type": "integer" },
"groups": { "type": "array", "items": { "type": "object" } }
}
}

View File

@@ -11,6 +11,7 @@ from typing import Any
from guide_board.discovery import discover_extensions
from guide_board.errors import GuideBoardError
from guide_board.execution import run_assessment
from guide_board.gates import evaluate_trend_gates
from guide_board.io import load_json, write_json
from guide_board.planning import (
build_run_plan,
@@ -83,6 +84,14 @@ def build_parser() -> argparse.ArgumentParser:
trend_runs = runs_commands.add_parser("trend", help="summarize retained run trends")
trend_runs.add_argument("--runs-dir", type=Path)
trend_runs.set_defaults(func=cmd_runs_trend)
gate_runs = runs_commands.add_parser("gate", help="evaluate retained run quality gates")
gate_runs.add_argument("--runs-dir", type=Path)
gate_runs.add_argument("--target")
gate_runs.add_argument("--assessment")
gate_runs.add_argument("--allowed-status", action="append")
gate_runs.add_argument("--max-unexpected-findings", type=int, default=0)
gate_runs.add_argument("--allow-regression", action="store_true")
gate_runs.set_defaults(func=cmd_runs_gate)
schema = subcommands.add_parser("schema", help="schema validation")
schema.add_argument("schema_name")
@@ -153,6 +162,21 @@ def cmd_runs_trend(args: argparse.Namespace) -> dict[str, Any]:
return summary
def cmd_runs_gate(args: argparse.Namespace) -> dict[str, Any]:
runs_dir = args.runs_dir or args.root / "runs"
trend_summary = build_trend_summary(runs_dir)
gate_summary = evaluate_trend_gates(
trend_summary,
allowed_statuses=args.allowed_status,
max_unexpected_findings=args.max_unexpected_findings,
fail_on_regression=not args.allow_regression,
target_profile_ref=args.target,
assessment_profile_ref=args.assessment,
)
assert_valid(gate_summary, "gate-summary")
return gate_summary
def cmd_schema_validate(args: argparse.Namespace) -> dict[str, Any]:
document = load_json(args.path)
assert_valid(document, args.schema_name)

162
src/guide_board/gates.py Normal file
View File

@@ -0,0 +1,162 @@
"""Quality gate evaluation for retained run trends."""
from __future__ import annotations
from datetime import datetime, timezone
from typing import Any
def evaluate_trend_gates(
trend_summary: dict[str, Any],
*,
allowed_statuses: list[str] | None = None,
max_unexpected_findings: int = 0,
fail_on_regression: bool = True,
target_profile_ref: str | None = None,
assessment_profile_ref: str | None = None,
) -> dict[str, Any]:
allowed = allowed_statuses or ["completed"]
selected_groups = [
group
for group in trend_summary.get("groups", [])
if _matches_group(group, target_profile_ref, assessment_profile_ref)
]
group_results = [
_evaluate_group(group, allowed, max_unexpected_findings, fail_on_regression)
for group in selected_groups
]
if not group_results:
group_results.append(
{
"id": "no-matching-history",
"target_profile_ref": target_profile_ref,
"assessment_profile_ref": assessment_profile_ref,
"status": "failed",
"latest_run_ref": None,
"checks": [
{
"id": "history-present",
"status": "failed",
"observed": 0,
"expected": "at least one retained run",
"message": "No retained run history matched the gate selection.",
}
],
}
)
failed_groups = sum(1 for group in group_results if group["status"] == "failed")
passed_groups = len(group_results) - failed_groups
now = datetime.now(timezone.utc)
return {
"id": f"gate-summary:{now.strftime('%Y%m%dT%H%M%SZ')}",
"created_at": now.isoformat(),
"trend_summary_ref": trend_summary["id"],
"status": "failed" if failed_groups else "passed",
"policy": {
"allowed_statuses": allowed,
"max_unexpected_findings": max_unexpected_findings,
"fail_on_regression": fail_on_regression,
"target_profile_ref": target_profile_ref,
"assessment_profile_ref": assessment_profile_ref,
},
"group_count": len(group_results),
"passed_groups": passed_groups,
"failed_groups": failed_groups,
"groups": group_results,
}
def _matches_group(
group: dict[str, Any],
target_profile_ref: str | None,
assessment_profile_ref: str | None,
) -> bool:
if target_profile_ref and group.get("target_profile_ref") != target_profile_ref:
return False
if (
assessment_profile_ref
and group.get("assessment_profile_ref") != assessment_profile_ref
):
return False
return True
def _evaluate_group(
group: dict[str, Any],
allowed_statuses: list[str],
max_unexpected_findings: int,
fail_on_regression: bool,
) -> dict[str, Any]:
latest = group.get("latest_run", {})
trend = group.get("trend", {})
checks = [
_latest_status_check(latest, allowed_statuses),
_unexpected_findings_check(latest, max_unexpected_findings),
]
if fail_on_regression:
checks.append(_regression_check(trend))
failed = any(check["status"] == "failed" for check in checks)
return {
"id": group.get("id"),
"target_profile_ref": group.get("target_profile_ref"),
"assessment_profile_ref": group.get("assessment_profile_ref"),
"status": "failed" if failed else "passed",
"latest_run_ref": latest.get("run_id"),
"checks": checks,
}
def _latest_status_check(
latest: dict[str, Any],
allowed_statuses: list[str],
) -> dict[str, Any]:
observed = latest.get("status", "unknown")
passed = observed in allowed_statuses
return {
"id": "latest-status",
"status": "passed" if passed else "failed",
"observed": observed,
"expected": allowed_statuses,
"message": "Latest retained run status is acceptable."
if passed
else "Latest retained run status is outside the gate policy.",
}
def _unexpected_findings_check(
latest: dict[str, Any],
max_unexpected_findings: int,
) -> dict[str, Any]:
observed = _int_value(latest.get("unexpected_findings", 0))
passed = observed <= max_unexpected_findings
return {
"id": "unexpected-findings",
"status": "passed" if passed else "failed",
"observed": observed,
"expected": f"<= {max_unexpected_findings}",
"message": "Unexpected finding count is within policy."
if passed
else "Unexpected finding count exceeds policy.",
}
def _regression_check(trend: dict[str, Any]) -> dict[str, Any]:
observed = trend.get("direction", "insufficient-history")
passed = observed != "regressed"
return {
"id": "trend-regression",
"status": "passed" if passed else "failed",
"observed": observed,
"expected": "not regressed",
"message": "Latest trend has not regressed."
if passed
else "Latest trend regressed compared with the previous retained run.",
}
def _int_value(value: Any) -> int:
return value if isinstance(value, int) and not isinstance(value, bool) else 0

View File

@@ -9,6 +9,7 @@ from pathlib import Path
from guide_board.discovery import discover_extensions
from guide_board.execution import run_assessment
from guide_board.gates import evaluate_trend_gates
from guide_board.planning import (
build_run_plan,
validate_assessment_profile,
@@ -145,6 +146,53 @@ class CoreArchitectureTests(unittest.TestCase):
{"blocked": -1, "manual": 1, "skipped": 1},
)
gate = evaluate_trend_gates(
trend,
target_profile_ref="sample-repository",
assessment_profile_ref="sample-noop-assessment",
)
assert_valid(gate, "gate-summary")
self.assertEqual(gate["status"], "passed")
self.assertEqual(gate["passed_groups"], 1)
missing_gate = evaluate_trend_gates(
trend,
target_profile_ref="missing-target",
)
self.assertEqual(missing_gate["status"], "failed")
self.assertEqual(missing_gate["groups"][0]["checks"][0]["id"], "history-present")
def test_fails_gate_for_regressed_run_history(self) -> None:
with TemporaryDirectory() as temporary_directory:
runs_dir = Path(temporary_directory)
_write_retention_summary(
runs_dir / "run-old",
"run-old",
"2026-05-07T10:00:00+00:00",
"completed",
{"manual": 1},
0,
1,
)
_write_retention_summary(
runs_dir / "run-new",
"run-new",
"2026-05-07T11:00:00+00:00",
"blocked",
{"blocked": 1},
2,
1,
)
gate = evaluate_trend_gates(build_trend_summary(runs_dir))
assert_valid(gate, "gate-summary")
self.assertEqual(gate["status"], "failed")
checks = {check["id"]: check for check in gate["groups"][0]["checks"]}
self.assertEqual(checks["latest-status"]["status"], "failed")
self.assertEqual(checks["unexpected-findings"]["status"], "failed")
self.assertEqual(checks["trend-regression"]["status"], "failed")
def test_runs_cmis_preflight_against_local_endpoint(self) -> None:
server = HTTPServer(("127.0.0.1", 0), _CmisHandler)
thread = threading.Thread(target=server.serve_forever)