individual run evidence, retained summaries, and trend comparison

This commit is contained in:
2026-05-07 16:36:46 +02:00
parent e87f7fdd5d
commit 4c44db802d
6 changed files with 272 additions and 3 deletions

View File

@@ -17,7 +17,7 @@ from guide_board.planning import (
validate_assessment_profile,
validate_target_profile,
)
from guide_board.retention import list_retained_runs
from guide_board.retention import build_trend_summary, list_retained_runs
from guide_board.schema import assert_valid
@@ -80,6 +80,9 @@ def build_parser() -> argparse.ArgumentParser:
list_runs = runs_commands.add_parser("list", help="list retained run summaries")
list_runs.add_argument("--runs-dir", type=Path)
list_runs.set_defaults(func=cmd_runs_list)
trend_runs = runs_commands.add_parser("trend", help="summarize retained run trends")
trend_runs.add_argument("--runs-dir", type=Path)
trend_runs.set_defaults(func=cmd_runs_trend)
schema = subcommands.add_parser("schema", help="schema validation")
schema.add_argument("schema_name")
@@ -143,6 +146,13 @@ def cmd_runs_list(args: argparse.Namespace) -> dict[str, Any]:
}
def cmd_runs_trend(args: argparse.Namespace) -> dict[str, Any]:
runs_dir = args.runs_dir or args.root / "runs"
summary = build_trend_summary(runs_dir)
assert_valid(summary, "trend-summary")
return summary
def cmd_schema_validate(args: argparse.Namespace) -> dict[str, Any]:
document = load_json(args.path)
assert_valid(document, args.schema_name)

View File

@@ -3,6 +3,7 @@
from __future__ import annotations
from collections import Counter
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
@@ -72,6 +73,40 @@ def list_retained_runs(runs_dir: Path) -> list[dict[str, Any]]:
return sorted(summaries, key=lambda item: item.get("created_at", ""), reverse=True)
def build_trend_summary(
runs_dir: Path,
retained_runs: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
runs = retained_runs if retained_runs is not None else list_retained_runs(runs_dir)
now = datetime.now(timezone.utc)
groups = []
for group_key, group_runs in _group_runs(runs).items():
latest = group_runs[0]
previous = group_runs[1] if len(group_runs) > 1 else None
groups.append(
{
"id": group_key,
"target_profile_ref": latest.get("target_profile_ref"),
"assessment_profile_ref": latest.get("assessment_profile_ref"),
"run_count": len(group_runs),
"status_counts": dict(
sorted(Counter(_status_for(run) for run in group_runs).items())
),
"latest_run": _run_projection(latest),
"previous_run": _run_projection(previous) if previous else None,
"trend": _trend_between(previous, latest),
}
)
return {
"id": f"trend-summary:{now.strftime('%Y%m%dT%H%M%SZ')}",
"created_at": now.isoformat(),
"runs_dir": str(runs_dir),
"run_count": len(runs),
"groups": sorted(groups, key=lambda item: item["id"]),
}
def _summary_for_run_dir(run_dir: Path) -> dict[str, Any] | None:
summary_path = run_dir / "retention-summary.json"
if summary_path.exists():
@@ -97,3 +132,122 @@ def _summary_for_run_dir(run_dir: Path) -> dict[str, Any] | None:
"report_refs": [],
"artifact_retention": {},
}
def _group_runs(runs: list[dict[str, Any]]) -> dict[str, list[dict[str, Any]]]:
groups: dict[str, list[dict[str, Any]]] = {}
for run in runs:
target = run.get("target_profile_ref") or "unknown-target"
assessment = run.get("assessment_profile_ref") or "unknown-assessment"
groups.setdefault(f"{target}:{assessment}", []).append(run)
for group_runs in groups.values():
group_runs.sort(key=lambda item: item.get("created_at", ""), reverse=True)
return groups
def _run_projection(run: dict[str, Any]) -> dict[str, Any]:
summary = run.get("summary", {})
return {
"run_id": run.get("run_id"),
"created_at": run.get("created_at"),
"status": summary.get("status", "unknown"),
"unexpected_findings": _summary_int(summary, "unexpected_findings"),
"finding_count": _summary_int(summary, "finding_count"),
"artifact_count": _summary_int(summary, "artifact_count"),
"run_dir": run.get("run_dir"),
}
def _trend_between(
previous: dict[str, Any] | None,
latest: dict[str, Any],
) -> dict[str, Any]:
if previous is None:
return {
"direction": "insufficient-history",
"status_changed": False,
"unexpected_findings_delta": 0,
"finding_count_delta": 0,
"artifact_count_delta": 0,
"evidence_result_deltas": {},
}
previous_summary = previous.get("summary", {})
latest_summary = latest.get("summary", {})
evidence_deltas = _dict_deltas(
previous_summary.get("evidence_results", {}),
latest_summary.get("evidence_results", {}),
)
unexpected_delta = _summary_int(latest_summary, "unexpected_findings") - _summary_int(
previous_summary, "unexpected_findings"
)
finding_delta = _summary_int(latest_summary, "finding_count") - _summary_int(
previous_summary, "finding_count"
)
artifact_delta = _summary_int(latest_summary, "artifact_count") - _summary_int(
previous_summary, "artifact_count"
)
previous_status = _status_for(previous)
latest_status = _status_for(latest)
return {
"direction": _trend_direction(previous_status, latest_status, unexpected_delta),
"status_changed": previous_status != latest_status,
"unexpected_findings_delta": unexpected_delta,
"finding_count_delta": finding_delta,
"artifact_count_delta": artifact_delta,
"evidence_result_deltas": evidence_deltas,
}
def _trend_direction(
previous_status: str,
latest_status: str,
unexpected_delta: int,
) -> str:
previous_score = _status_score(previous_status)
latest_score = _status_score(latest_status)
if latest_score < previous_score:
return "improved"
if latest_score > previous_score:
return "regressed"
if unexpected_delta < 0:
return "improved"
if unexpected_delta > 0:
return "regressed"
return "unchanged"
def _status_for(run: dict[str, Any]) -> str:
summary = run.get("summary", {})
status = summary.get("status", "unknown")
return status if isinstance(status, str) else "unknown"
def _status_score(status: str) -> int:
return {
"completed": 0,
"blocked": 1,
"infrastructure_error": 2,
"failed": 3,
}.get(status, 2)
def _summary_int(summary: dict[str, Any], key: str) -> int:
value = summary.get(key, 0)
return value if isinstance(value, int) and not isinstance(value, bool) else 0
def _dict_deltas(previous: Any, latest: Any) -> dict[str, int]:
previous_dict = previous if isinstance(previous, dict) else {}
latest_dict = latest if isinstance(latest, dict) else {}
keys = set(previous_dict) | set(latest_dict)
return {
key: _int_value(latest_dict.get(key, 0)) - _int_value(previous_dict.get(key, 0))
for key in sorted(keys)
}
def _int_value(value: Any) -> int:
return value if isinstance(value, int) and not isinstance(value, bool) else 0