Files
guide-board/src/guide_board/retention.py

295 lines
10 KiB
Python

"""Retention summaries and run history helpers."""
from __future__ import annotations
from collections import Counter
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from guide_board.io import load_json
def build_retention_summary(
run_metadata: dict[str, Any],
plan: dict[str, Any],
assessment_package: dict[str, Any],
) -> dict[str, Any]:
artifact_manifest = assessment_package.get("artifact_manifest", [])
retention_class_counts = Counter(
artifact.get("retention_class", "unknown")
for artifact in artifact_manifest
if isinstance(artifact, dict)
)
policy_summary = assessment_package.get("policy_summary", {})
findings = assessment_package.get("findings", [])
return {
"id": f"retention-summary:{run_metadata['id']}",
"run_id": run_metadata["id"],
"target_profile_ref": run_metadata["target_profile_ref"],
"assessment_profile_ref": run_metadata["assessment_profile_ref"],
"created_at": run_metadata["created_at"],
"summary": {
"status": run_metadata["status"],
"evidence_results": assessment_package.get("summary", {}),
"finding_count": len(findings),
"unexpected_findings": policy_summary.get("unexpected_findings", 0),
"expected_findings": sum(1 for finding in findings if finding.get("expected")),
"waived_findings": sum(1 for finding in findings if finding.get("waiver_ref")),
"mapping_target_count": len(
assessment_package.get("mapping_summary", {}).get("targets", [])
),
"artifact_count": len(artifact_manifest),
},
"report_refs": [
"reports/assessment-package.json",
"reports/report.md",
],
"artifact_retention": {
"policy": plan["assessment_profile_snapshot"].get("retention_policy", {}),
"output_artifact_retention": plan["assessment_profile_snapshot"]
.get("output_policy", {})
.get("artifact_retention"),
"retention_class_counts": dict(sorted(retention_class_counts.items())),
"raw_artifact_count": retention_class_counts.get("raw", 0),
},
}
def list_retained_runs(runs_dir: Path) -> list[dict[str, Any]]:
if not runs_dir.exists():
return []
summaries = []
for run_dir in sorted(path for path in runs_dir.iterdir() if path.is_dir()):
try:
summary = _summary_for_run_dir(run_dir)
except OSError:
continue
if summary is not None:
summaries.append(summary)
return sorted(summaries, key=lambda item: item.get("created_at", ""), reverse=True)
def select_retained_run(
runs_dir: Path,
run_id: str | None = None,
target_profile_ref: str | None = None,
assessment_profile_ref: str | None = None,
) -> dict[str, Any] | None:
"""Return the exact or latest retained run matching the optional selection."""
for run in list_retained_runs(runs_dir):
if run_id and run.get("run_id") != run_id:
continue
if target_profile_ref and run.get("target_profile_ref") != target_profile_ref:
continue
if assessment_profile_ref and run.get("assessment_profile_ref") != assessment_profile_ref:
continue
return run
return None
def retained_run_report_paths(run: dict[str, Any]) -> dict[str, str]:
"""Return stable report paths for a retained run summary."""
run_dir_value = run.get("run_dir")
if not isinstance(run_dir_value, str) or not run_dir_value:
raise ValueError("retained run summary is missing run_dir")
run_dir = Path(run_dir_value)
paths: dict[str, str] = {}
report_refs = run.get("report_refs", [])
if isinstance(report_refs, list):
for raw_ref in report_refs:
if not isinstance(raw_ref, str) or not raw_ref:
continue
ref = Path(raw_ref)
key = ref.stem.replace("-", "_")
paths[key] = str(run_dir / ref)
paths.setdefault("assessment_package", str(run_dir / "reports" / "assessment-package.json"))
paths.setdefault("report", str(run_dir / "reports" / "report.md"))
paths.setdefault("retention_summary", str(run_dir / "retention-summary.json"))
return dict(sorted(paths.items()))
def build_trend_summary(
runs_dir: Path,
retained_runs: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
runs = retained_runs if retained_runs is not None else list_retained_runs(runs_dir)
now = datetime.now(timezone.utc)
groups = []
for group_key, group_runs in _group_runs(runs).items():
latest = group_runs[0]
previous = group_runs[1] if len(group_runs) > 1 else None
groups.append(
{
"id": group_key,
"target_profile_ref": latest.get("target_profile_ref"),
"assessment_profile_ref": latest.get("assessment_profile_ref"),
"run_count": len(group_runs),
"status_counts": dict(
sorted(Counter(_status_for(run) for run in group_runs).items())
),
"latest_run": _run_projection(latest),
"previous_run": _run_projection(previous) if previous else None,
"trend": _trend_between(previous, latest),
}
)
return {
"id": f"trend-summary:{now.strftime('%Y%m%dT%H%M%SZ')}",
"created_at": now.isoformat(),
"runs_dir": str(runs_dir),
"run_count": len(runs),
"groups": sorted(groups, key=lambda item: item["id"]),
}
def _summary_for_run_dir(run_dir: Path) -> dict[str, Any] | None:
summary_path = run_dir / "retention-summary.json"
if summary_path.exists():
summary = load_json(summary_path)
summary["run_dir"] = str(run_dir)
return summary
metadata_path = run_dir / "run.json"
if not metadata_path.exists():
return None
metadata = load_json(metadata_path)
return {
"id": f"retention-summary:{metadata.get('id', run_dir.name)}",
"run_id": metadata.get("id", run_dir.name),
"run_dir": str(run_dir),
"target_profile_ref": metadata.get("target_profile_ref"),
"assessment_profile_ref": metadata.get("assessment_profile_ref"),
"created_at": metadata.get("created_at"),
"summary": {
"status": metadata.get("status", "unknown"),
},
"report_refs": [],
"artifact_retention": {},
}
def _group_runs(runs: list[dict[str, Any]]) -> dict[str, list[dict[str, Any]]]:
groups: dict[str, list[dict[str, Any]]] = {}
for run in runs:
target = run.get("target_profile_ref") or "unknown-target"
assessment = run.get("assessment_profile_ref") or "unknown-assessment"
groups.setdefault(f"{target}:{assessment}", []).append(run)
for group_runs in groups.values():
group_runs.sort(key=lambda item: item.get("created_at", ""), reverse=True)
return groups
def _run_projection(run: dict[str, Any]) -> dict[str, Any]:
summary = run.get("summary", {})
return {
"run_id": run.get("run_id"),
"created_at": run.get("created_at"),
"status": summary.get("status", "unknown"),
"unexpected_findings": _summary_int(summary, "unexpected_findings"),
"finding_count": _summary_int(summary, "finding_count"),
"artifact_count": _summary_int(summary, "artifact_count"),
"run_dir": run.get("run_dir"),
}
def _trend_between(
previous: dict[str, Any] | None,
latest: dict[str, Any],
) -> dict[str, Any]:
if previous is None:
return {
"direction": "insufficient-history",
"status_changed": False,
"unexpected_findings_delta": 0,
"finding_count_delta": 0,
"artifact_count_delta": 0,
"evidence_result_deltas": {},
}
previous_summary = previous.get("summary", {})
latest_summary = latest.get("summary", {})
evidence_deltas = _dict_deltas(
previous_summary.get("evidence_results", {}),
latest_summary.get("evidence_results", {}),
)
unexpected_delta = _summary_int(latest_summary, "unexpected_findings") - _summary_int(
previous_summary, "unexpected_findings"
)
finding_delta = _summary_int(latest_summary, "finding_count") - _summary_int(
previous_summary, "finding_count"
)
artifact_delta = _summary_int(latest_summary, "artifact_count") - _summary_int(
previous_summary, "artifact_count"
)
previous_status = _status_for(previous)
latest_status = _status_for(latest)
return {
"direction": _trend_direction(previous_status, latest_status, unexpected_delta),
"status_changed": previous_status != latest_status,
"unexpected_findings_delta": unexpected_delta,
"finding_count_delta": finding_delta,
"artifact_count_delta": artifact_delta,
"evidence_result_deltas": evidence_deltas,
}
def _trend_direction(
previous_status: str,
latest_status: str,
unexpected_delta: int,
) -> str:
previous_score = _status_score(previous_status)
latest_score = _status_score(latest_status)
if latest_score < previous_score:
return "improved"
if latest_score > previous_score:
return "regressed"
if unexpected_delta < 0:
return "improved"
if unexpected_delta > 0:
return "regressed"
return "unchanged"
def _status_for(run: dict[str, Any]) -> str:
summary = run.get("summary", {})
status = summary.get("status", "unknown")
return status if isinstance(status, str) else "unknown"
def _status_score(status: str) -> int:
return {
"completed": 0,
"blocked": 1,
"infrastructure_error": 2,
"failed": 3,
}.get(status, 2)
def _summary_int(summary: dict[str, Any], key: str) -> int:
value = summary.get(key, 0)
return value if isinstance(value, int) and not isinstance(value, bool) else 0
def _dict_deltas(previous: Any, latest: Any) -> dict[str, int]:
previous_dict = previous if isinstance(previous, dict) else {}
latest_dict = latest if isinstance(latest, dict) else {}
keys = set(previous_dict) | set(latest_dict)
return {
key: _int_value(latest_dict.get(key, 0)) - _int_value(previous_dict.get(key, 0))
for key in sorted(keys)
}
def _int_value(value: Any) -> int:
return value if isinstance(value, int) and not isinstance(value, bool) else 0