From ed0f270278bfd63ed996636310b6ed9c77f9f462 Mon Sep 17 00:00:00 2001 From: tegwick Date: Thu, 7 May 2026 17:00:10 +0200 Subject: [PATCH] quality gate layer --- README.md | 1 + docs/ARCHITECTURE-BLUEPRINT.md | 4 + docs/schemas/gate-summary.schema.json | 28 +++++ src/guide_board/cli.py | 24 ++++ src/guide_board/gates.py | 162 ++++++++++++++++++++++++++ tests/test_core.py | 48 ++++++++ 6 files changed, 267 insertions(+) create mode 100644 docs/schemas/gate-summary.schema.json create mode 100644 src/guide_board/gates.py diff --git a/README.md b/README.md index 621527e..1db4b3f 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,7 @@ PYTHONPATH=src python3 -m guide_board run \ --assessment profiles/assessments/sample-noop.json PYTHONPATH=src python3 -m guide_board runs list PYTHONPATH=src python3 -m guide_board runs trend +PYTHONPATH=src python3 -m guide_board runs gate PYTHONPATH=src python3 -m unittest discover -s tests ``` diff --git a/docs/ARCHITECTURE-BLUEPRINT.md b/docs/ARCHITECTURE-BLUEPRINT.md index 03c642d..cb8b209 100644 --- a/docs/ARCHITECTURE-BLUEPRINT.md +++ b/docs/ARCHITECTURE-BLUEPRINT.md @@ -243,6 +243,10 @@ Architecture lesson: Repository quality packs should be normal extensions. A score is not a certification verdict; it is a normalized finding and trend signal. +Quality gates should be core policy decisions over retained posture, not +extension-specific verdicts. The first gate layer checks latest run status, +unexpected finding count, and whether the latest trend regressed. + Sources: - [OpenSSF Scorecard](https://openssf.org/projects/scorecard/) diff --git a/docs/schemas/gate-summary.schema.json b/docs/schemas/gate-summary.schema.json new file mode 100644 index 0000000..a2c4cb0 --- /dev/null +++ b/docs/schemas/gate-summary.schema.json @@ -0,0 +1,28 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "title": "Guide Board Gate Summary", + "type": "object", + "additionalProperties": false, + "required": [ + "id", + "created_at", + "trend_summary_ref", + "status", + "policy", + "group_count", + "passed_groups", + "failed_groups", + "groups" + ], + "properties": { + "id": { "type": "string" }, + "created_at": { "type": "string" }, + "trend_summary_ref": { "type": "string" }, + "status": { "type": "string" }, + "policy": { "type": "object" }, + "group_count": { "type": "integer" }, + "passed_groups": { "type": "integer" }, + "failed_groups": { "type": "integer" }, + "groups": { "type": "array", "items": { "type": "object" } } + } +} diff --git a/src/guide_board/cli.py b/src/guide_board/cli.py index 03f96f4..755ae4b 100644 --- a/src/guide_board/cli.py +++ b/src/guide_board/cli.py @@ -11,6 +11,7 @@ from typing import Any from guide_board.discovery import discover_extensions from guide_board.errors import GuideBoardError from guide_board.execution import run_assessment +from guide_board.gates import evaluate_trend_gates from guide_board.io import load_json, write_json from guide_board.planning import ( build_run_plan, @@ -83,6 +84,14 @@ def build_parser() -> argparse.ArgumentParser: trend_runs = runs_commands.add_parser("trend", help="summarize retained run trends") trend_runs.add_argument("--runs-dir", type=Path) trend_runs.set_defaults(func=cmd_runs_trend) + gate_runs = runs_commands.add_parser("gate", help="evaluate retained run quality gates") + gate_runs.add_argument("--runs-dir", type=Path) + gate_runs.add_argument("--target") + gate_runs.add_argument("--assessment") + gate_runs.add_argument("--allowed-status", action="append") + gate_runs.add_argument("--max-unexpected-findings", type=int, default=0) + gate_runs.add_argument("--allow-regression", action="store_true") + gate_runs.set_defaults(func=cmd_runs_gate) schema = subcommands.add_parser("schema", help="schema validation") schema.add_argument("schema_name") @@ -153,6 +162,21 @@ def cmd_runs_trend(args: argparse.Namespace) -> dict[str, Any]: return summary +def cmd_runs_gate(args: argparse.Namespace) -> dict[str, Any]: + runs_dir = args.runs_dir or args.root / "runs" + trend_summary = build_trend_summary(runs_dir) + gate_summary = evaluate_trend_gates( + trend_summary, + allowed_statuses=args.allowed_status, + max_unexpected_findings=args.max_unexpected_findings, + fail_on_regression=not args.allow_regression, + target_profile_ref=args.target, + assessment_profile_ref=args.assessment, + ) + assert_valid(gate_summary, "gate-summary") + return gate_summary + + def cmd_schema_validate(args: argparse.Namespace) -> dict[str, Any]: document = load_json(args.path) assert_valid(document, args.schema_name) diff --git a/src/guide_board/gates.py b/src/guide_board/gates.py new file mode 100644 index 0000000..2557edf --- /dev/null +++ b/src/guide_board/gates.py @@ -0,0 +1,162 @@ +"""Quality gate evaluation for retained run trends.""" + +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Any + + +def evaluate_trend_gates( + trend_summary: dict[str, Any], + *, + allowed_statuses: list[str] | None = None, + max_unexpected_findings: int = 0, + fail_on_regression: bool = True, + target_profile_ref: str | None = None, + assessment_profile_ref: str | None = None, +) -> dict[str, Any]: + allowed = allowed_statuses or ["completed"] + selected_groups = [ + group + for group in trend_summary.get("groups", []) + if _matches_group(group, target_profile_ref, assessment_profile_ref) + ] + + group_results = [ + _evaluate_group(group, allowed, max_unexpected_findings, fail_on_regression) + for group in selected_groups + ] + if not group_results: + group_results.append( + { + "id": "no-matching-history", + "target_profile_ref": target_profile_ref, + "assessment_profile_ref": assessment_profile_ref, + "status": "failed", + "latest_run_ref": None, + "checks": [ + { + "id": "history-present", + "status": "failed", + "observed": 0, + "expected": "at least one retained run", + "message": "No retained run history matched the gate selection.", + } + ], + } + ) + + failed_groups = sum(1 for group in group_results if group["status"] == "failed") + passed_groups = len(group_results) - failed_groups + now = datetime.now(timezone.utc) + + return { + "id": f"gate-summary:{now.strftime('%Y%m%dT%H%M%SZ')}", + "created_at": now.isoformat(), + "trend_summary_ref": trend_summary["id"], + "status": "failed" if failed_groups else "passed", + "policy": { + "allowed_statuses": allowed, + "max_unexpected_findings": max_unexpected_findings, + "fail_on_regression": fail_on_regression, + "target_profile_ref": target_profile_ref, + "assessment_profile_ref": assessment_profile_ref, + }, + "group_count": len(group_results), + "passed_groups": passed_groups, + "failed_groups": failed_groups, + "groups": group_results, + } + + +def _matches_group( + group: dict[str, Any], + target_profile_ref: str | None, + assessment_profile_ref: str | None, +) -> bool: + if target_profile_ref and group.get("target_profile_ref") != target_profile_ref: + return False + if ( + assessment_profile_ref + and group.get("assessment_profile_ref") != assessment_profile_ref + ): + return False + return True + + +def _evaluate_group( + group: dict[str, Any], + allowed_statuses: list[str], + max_unexpected_findings: int, + fail_on_regression: bool, +) -> dict[str, Any]: + latest = group.get("latest_run", {}) + trend = group.get("trend", {}) + checks = [ + _latest_status_check(latest, allowed_statuses), + _unexpected_findings_check(latest, max_unexpected_findings), + ] + if fail_on_regression: + checks.append(_regression_check(trend)) + + failed = any(check["status"] == "failed" for check in checks) + return { + "id": group.get("id"), + "target_profile_ref": group.get("target_profile_ref"), + "assessment_profile_ref": group.get("assessment_profile_ref"), + "status": "failed" if failed else "passed", + "latest_run_ref": latest.get("run_id"), + "checks": checks, + } + + +def _latest_status_check( + latest: dict[str, Any], + allowed_statuses: list[str], +) -> dict[str, Any]: + observed = latest.get("status", "unknown") + passed = observed in allowed_statuses + return { + "id": "latest-status", + "status": "passed" if passed else "failed", + "observed": observed, + "expected": allowed_statuses, + "message": "Latest retained run status is acceptable." + if passed + else "Latest retained run status is outside the gate policy.", + } + + +def _unexpected_findings_check( + latest: dict[str, Any], + max_unexpected_findings: int, +) -> dict[str, Any]: + observed = _int_value(latest.get("unexpected_findings", 0)) + passed = observed <= max_unexpected_findings + return { + "id": "unexpected-findings", + "status": "passed" if passed else "failed", + "observed": observed, + "expected": f"<= {max_unexpected_findings}", + "message": "Unexpected finding count is within policy." + if passed + else "Unexpected finding count exceeds policy.", + } + + +def _regression_check(trend: dict[str, Any]) -> dict[str, Any]: + observed = trend.get("direction", "insufficient-history") + passed = observed != "regressed" + return { + "id": "trend-regression", + "status": "passed" if passed else "failed", + "observed": observed, + "expected": "not regressed", + "message": "Latest trend has not regressed." + if passed + else "Latest trend regressed compared with the previous retained run.", + } + + +def _int_value(value: Any) -> int: + return value if isinstance(value, int) and not isinstance(value, bool) else 0 diff --git a/tests/test_core.py b/tests/test_core.py index ad8d52c..f9ae70e 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -9,6 +9,7 @@ from pathlib import Path from guide_board.discovery import discover_extensions from guide_board.execution import run_assessment +from guide_board.gates import evaluate_trend_gates from guide_board.planning import ( build_run_plan, validate_assessment_profile, @@ -145,6 +146,53 @@ class CoreArchitectureTests(unittest.TestCase): {"blocked": -1, "manual": 1, "skipped": 1}, ) + gate = evaluate_trend_gates( + trend, + target_profile_ref="sample-repository", + assessment_profile_ref="sample-noop-assessment", + ) + assert_valid(gate, "gate-summary") + self.assertEqual(gate["status"], "passed") + self.assertEqual(gate["passed_groups"], 1) + + missing_gate = evaluate_trend_gates( + trend, + target_profile_ref="missing-target", + ) + self.assertEqual(missing_gate["status"], "failed") + self.assertEqual(missing_gate["groups"][0]["checks"][0]["id"], "history-present") + + def test_fails_gate_for_regressed_run_history(self) -> None: + with TemporaryDirectory() as temporary_directory: + runs_dir = Path(temporary_directory) + _write_retention_summary( + runs_dir / "run-old", + "run-old", + "2026-05-07T10:00:00+00:00", + "completed", + {"manual": 1}, + 0, + 1, + ) + _write_retention_summary( + runs_dir / "run-new", + "run-new", + "2026-05-07T11:00:00+00:00", + "blocked", + {"blocked": 1}, + 2, + 1, + ) + + gate = evaluate_trend_gates(build_trend_summary(runs_dir)) + assert_valid(gate, "gate-summary") + + self.assertEqual(gate["status"], "failed") + checks = {check["id"]: check for check in gate["groups"][0]["checks"]} + self.assertEqual(checks["latest-status"]["status"], "failed") + self.assertEqual(checks["unexpected-findings"]["status"], "failed") + self.assertEqual(checks["trend-regression"]["status"], "failed") + def test_runs_cmis_preflight_against_local_endpoint(self) -> None: server = HTTPServer(("127.0.0.1", 0), _CmisHandler) thread = threading.Thread(target=server.serve_forever)