diff --git a/README.md b/README.md index d302487..621527e 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,7 @@ PYTHONPATH=src python3 -m guide_board run \ --target profiles/targets/sample-repository.json \ --assessment profiles/assessments/sample-noop.json PYTHONPATH=src python3 -m guide_board runs list +PYTHONPATH=src python3 -m guide_board runs trend PYTHONPATH=src python3 -m unittest discover -s tests ``` diff --git a/docs/ARCHITECTURE-BLUEPRINT.md b/docs/ARCHITECTURE-BLUEPRINT.md index 0e516f1..03c642d 100644 --- a/docs/ARCHITECTURE-BLUEPRINT.md +++ b/docs/ARCHITECTURE-BLUEPRINT.md @@ -398,7 +398,8 @@ Builds human and machine-readable outputs: ### Retention Index Keeps compact summaries over time while allowing raw artifact retention to be -bounded by policy. +bounded by policy. The first implementation writes `retention-summary.json` for +each run and can build a trend summary grouped by target and assessment profile. ## Extension Archetypes diff --git a/docs/schemas/trend-summary.schema.json b/docs/schemas/trend-summary.schema.json new file mode 100644 index 0000000..33eab6f --- /dev/null +++ b/docs/schemas/trend-summary.schema.json @@ -0,0 +1,20 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "title": "Guide Board Trend Summary", + "type": "object", + "additionalProperties": false, + "required": [ + "id", + "created_at", + "runs_dir", + "run_count", + "groups" + ], + "properties": { + "id": { "type": "string" }, + "created_at": { "type": "string" }, + "runs_dir": { "type": "string" }, + "run_count": { "type": "integer" }, + "groups": { "type": "array", "items": { "type": "object" } } + } +} diff --git a/src/guide_board/cli.py b/src/guide_board/cli.py index 62de7ae..03f96f4 100644 --- a/src/guide_board/cli.py +++ b/src/guide_board/cli.py @@ -17,7 +17,7 @@ from guide_board.planning import ( validate_assessment_profile, validate_target_profile, ) -from guide_board.retention import list_retained_runs +from guide_board.retention import build_trend_summary, list_retained_runs from guide_board.schema import assert_valid @@ -80,6 +80,9 @@ def build_parser() -> argparse.ArgumentParser: list_runs = runs_commands.add_parser("list", help="list retained run summaries") list_runs.add_argument("--runs-dir", type=Path) list_runs.set_defaults(func=cmd_runs_list) + trend_runs = runs_commands.add_parser("trend", help="summarize retained run trends") + trend_runs.add_argument("--runs-dir", type=Path) + trend_runs.set_defaults(func=cmd_runs_trend) schema = subcommands.add_parser("schema", help="schema validation") schema.add_argument("schema_name") @@ -143,6 +146,13 @@ def cmd_runs_list(args: argparse.Namespace) -> dict[str, Any]: } +def cmd_runs_trend(args: argparse.Namespace) -> dict[str, Any]: + runs_dir = args.runs_dir or args.root / "runs" + summary = build_trend_summary(runs_dir) + assert_valid(summary, "trend-summary") + return summary + + def cmd_schema_validate(args: argparse.Namespace) -> dict[str, Any]: document = load_json(args.path) assert_valid(document, args.schema_name) diff --git a/src/guide_board/retention.py b/src/guide_board/retention.py index 729c5f1..6b85493 100644 --- a/src/guide_board/retention.py +++ b/src/guide_board/retention.py @@ -3,6 +3,7 @@ from __future__ import annotations from collections import Counter +from datetime import datetime, timezone from pathlib import Path from typing import Any @@ -72,6 +73,40 @@ def list_retained_runs(runs_dir: Path) -> list[dict[str, Any]]: return sorted(summaries, key=lambda item: item.get("created_at", ""), reverse=True) +def build_trend_summary( + runs_dir: Path, + retained_runs: list[dict[str, Any]] | None = None, +) -> dict[str, Any]: + runs = retained_runs if retained_runs is not None else list_retained_runs(runs_dir) + now = datetime.now(timezone.utc) + groups = [] + for group_key, group_runs in _group_runs(runs).items(): + latest = group_runs[0] + previous = group_runs[1] if len(group_runs) > 1 else None + groups.append( + { + "id": group_key, + "target_profile_ref": latest.get("target_profile_ref"), + "assessment_profile_ref": latest.get("assessment_profile_ref"), + "run_count": len(group_runs), + "status_counts": dict( + sorted(Counter(_status_for(run) for run in group_runs).items()) + ), + "latest_run": _run_projection(latest), + "previous_run": _run_projection(previous) if previous else None, + "trend": _trend_between(previous, latest), + } + ) + + return { + "id": f"trend-summary:{now.strftime('%Y%m%dT%H%M%SZ')}", + "created_at": now.isoformat(), + "runs_dir": str(runs_dir), + "run_count": len(runs), + "groups": sorted(groups, key=lambda item: item["id"]), + } + + def _summary_for_run_dir(run_dir: Path) -> dict[str, Any] | None: summary_path = run_dir / "retention-summary.json" if summary_path.exists(): @@ -97,3 +132,122 @@ def _summary_for_run_dir(run_dir: Path) -> dict[str, Any] | None: "report_refs": [], "artifact_retention": {}, } + + +def _group_runs(runs: list[dict[str, Any]]) -> dict[str, list[dict[str, Any]]]: + groups: dict[str, list[dict[str, Any]]] = {} + for run in runs: + target = run.get("target_profile_ref") or "unknown-target" + assessment = run.get("assessment_profile_ref") or "unknown-assessment" + groups.setdefault(f"{target}:{assessment}", []).append(run) + + for group_runs in groups.values(): + group_runs.sort(key=lambda item: item.get("created_at", ""), reverse=True) + return groups + + +def _run_projection(run: dict[str, Any]) -> dict[str, Any]: + summary = run.get("summary", {}) + return { + "run_id": run.get("run_id"), + "created_at": run.get("created_at"), + "status": summary.get("status", "unknown"), + "unexpected_findings": _summary_int(summary, "unexpected_findings"), + "finding_count": _summary_int(summary, "finding_count"), + "artifact_count": _summary_int(summary, "artifact_count"), + "run_dir": run.get("run_dir"), + } + + +def _trend_between( + previous: dict[str, Any] | None, + latest: dict[str, Any], +) -> dict[str, Any]: + if previous is None: + return { + "direction": "insufficient-history", + "status_changed": False, + "unexpected_findings_delta": 0, + "finding_count_delta": 0, + "artifact_count_delta": 0, + "evidence_result_deltas": {}, + } + + previous_summary = previous.get("summary", {}) + latest_summary = latest.get("summary", {}) + evidence_deltas = _dict_deltas( + previous_summary.get("evidence_results", {}), + latest_summary.get("evidence_results", {}), + ) + unexpected_delta = _summary_int(latest_summary, "unexpected_findings") - _summary_int( + previous_summary, "unexpected_findings" + ) + finding_delta = _summary_int(latest_summary, "finding_count") - _summary_int( + previous_summary, "finding_count" + ) + artifact_delta = _summary_int(latest_summary, "artifact_count") - _summary_int( + previous_summary, "artifact_count" + ) + previous_status = _status_for(previous) + latest_status = _status_for(latest) + + return { + "direction": _trend_direction(previous_status, latest_status, unexpected_delta), + "status_changed": previous_status != latest_status, + "unexpected_findings_delta": unexpected_delta, + "finding_count_delta": finding_delta, + "artifact_count_delta": artifact_delta, + "evidence_result_deltas": evidence_deltas, + } + + +def _trend_direction( + previous_status: str, + latest_status: str, + unexpected_delta: int, +) -> str: + previous_score = _status_score(previous_status) + latest_score = _status_score(latest_status) + if latest_score < previous_score: + return "improved" + if latest_score > previous_score: + return "regressed" + if unexpected_delta < 0: + return "improved" + if unexpected_delta > 0: + return "regressed" + return "unchanged" + + +def _status_for(run: dict[str, Any]) -> str: + summary = run.get("summary", {}) + status = summary.get("status", "unknown") + return status if isinstance(status, str) else "unknown" + + +def _status_score(status: str) -> int: + return { + "completed": 0, + "blocked": 1, + "infrastructure_error": 2, + "failed": 3, + }.get(status, 2) + + +def _summary_int(summary: dict[str, Any], key: str) -> int: + value = summary.get(key, 0) + return value if isinstance(value, int) and not isinstance(value, bool) else 0 + + +def _dict_deltas(previous: Any, latest: Any) -> dict[str, int]: + previous_dict = previous if isinstance(previous, dict) else {} + latest_dict = latest if isinstance(latest, dict) else {} + keys = set(previous_dict) | set(latest_dict) + return { + key: _int_value(latest_dict.get(key, 0)) - _int_value(previous_dict.get(key, 0)) + for key in sorted(keys) + } + + +def _int_value(value: Any) -> int: + return value if isinstance(value, int) and not isinstance(value, bool) else 0 diff --git a/tests/test_core.py b/tests/test_core.py index 90db5fe..ad8d52c 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -14,7 +14,8 @@ from guide_board.planning import ( validate_assessment_profile, validate_target_profile, ) -from guide_board.retention import list_retained_runs +from guide_board.retention import build_trend_summary, list_retained_runs +from guide_board.schema import assert_valid ROOT = Path(__file__).resolve().parents[1] @@ -106,6 +107,44 @@ class CoreArchitectureTests(unittest.TestCase): self.assertEqual(len(mappings), 1) self.assertEqual(mappings[0]["target_id"], "profile-readiness") + def test_builds_retained_run_trends(self) -> None: + with TemporaryDirectory() as temporary_directory: + runs_dir = Path(temporary_directory) + _write_retention_summary( + runs_dir / "run-old", + "run-old", + "2026-05-07T10:00:00+00:00", + "blocked", + {"blocked": 1}, + 1, + 1, + ) + _write_retention_summary( + runs_dir / "run-new", + "run-new", + "2026-05-07T11:00:00+00:00", + "completed", + {"manual": 1, "skipped": 1}, + 0, + 2, + ) + + trend = build_trend_summary(runs_dir) + assert_valid(trend, "trend-summary") + + self.assertEqual(trend["run_count"], 2) + self.assertEqual(len(trend["groups"]), 1) + group = trend["groups"][0] + self.assertEqual(group["latest_run"]["run_id"], "run-new") + self.assertEqual(group["previous_run"]["run_id"], "run-old") + self.assertEqual(group["trend"]["direction"], "improved") + self.assertTrue(group["trend"]["status_changed"]) + self.assertEqual(group["trend"]["unexpected_findings_delta"], -1) + self.assertEqual( + group["trend"]["evidence_result_deltas"], + {"blocked": -1, "manual": 1, "skipped": 1}, + ) + def test_runs_cmis_preflight_against_local_endpoint(self) -> None: server = HTTPServer(("127.0.0.1", 0), _CmisHandler) thread = threading.Thread(target=server.serve_forever) @@ -469,5 +508,49 @@ class _CmisHandler(BaseHTTPRequestHandler): return +def _write_retention_summary( + run_dir: Path, + run_id: str, + created_at: str, + status: str, + evidence_results: dict[str, int], + unexpected_findings: int, + artifact_count: int, +) -> None: + run_dir.mkdir(parents=True, exist_ok=True) + (run_dir / "retention-summary.json").write_text( + json.dumps( + { + "id": f"retention-summary:{run_id}", + "run_id": run_id, + "target_profile_ref": "sample-repository", + "assessment_profile_ref": "sample-noop-assessment", + "created_at": created_at, + "summary": { + "status": status, + "evidence_results": evidence_results, + "finding_count": unexpected_findings, + "unexpected_findings": unexpected_findings, + "expected_findings": 0, + "waived_findings": 0, + "mapping_target_count": 1, + "artifact_count": artifact_count, + }, + "report_refs": [ + "reports/assessment-package.json", + "reports/report.md", + ], + "artifact_retention": { + "policy": {"raw_artifact_days": 0, "summary_days": 365}, + "output_artifact_retention": "summary-only", + "retention_class_counts": {"raw": artifact_count}, + "raw_artifact_count": artifact_count, + }, + } + ), + encoding="utf-8", + ) + + if __name__ == "__main__": unittest.main()