Files
infospace-bench/src/infospace_bench/history.py
2026-05-14 15:35:04 +02:00

255 lines
7.5 KiB
Python

from __future__ import annotations
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import yaml
from .checks import CollectionCheckReport
from .evaluation import EntityEvaluation, EvaluationSnapshot, MetricValue
from .evaluation_io import append_to_history, read_history, write_snapshot
from .lifecycle import load_infospace
from .viability import evaluate_viability
METRICS_PATH = Path("output/metrics/metrics.yaml")
HISTORY_PATH = Path("output/metrics/history.yaml")
VIABILITY_PATH = Path("output/metrics/viability.yaml")
SNAPSHOT_DIR = Path("output/metrics/snapshots")
@dataclass(frozen=True)
class RecordedCheckResult:
snapshot: EvaluationSnapshot
metrics: dict[str, Any]
viability: dict[str, Any] | None = None
def to_dict(self) -> dict[str, Any]:
return {
"snapshot": self.snapshot.to_dict(),
"metrics": self.metrics,
"viability": self.viability,
}
def snapshot_from_checks(
check_report: CollectionCheckReport,
artifact_count: int,
*,
schema_name: str = "default",
metadata: dict[str, Any] | None = None,
artifact_evaluations: list[EntityEvaluation] | None = None,
) -> EvaluationSnapshot:
metrics = _numeric_metrics(check_report.metrics)
collection_metrics = [
MetricValue(name=name, value=value, concern=_concern_for_metric(name))
for name, value in sorted(metrics.items())
]
collection_metrics.extend(
MetricValue(name=name, value=value, concern="evaluation")
for name, value in sorted(
_evaluation_metrics(artifact_evaluations or []).items()
)
)
return EvaluationSnapshot(
snapshot_id=str(uuid.uuid4())[:8],
created_at=datetime.now(timezone.utc),
schema_name=schema_name,
artifact_count=artifact_count,
artifact_evaluations=artifact_evaluations or [],
collection_metrics=collection_metrics,
metadata=metadata or {},
)
def write_metrics_file(metrics: dict[str, Any], path: str | Path) -> None:
target = Path(path)
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(
yaml.safe_dump(
{
key: _normalize_metric_value(value)
for key, value in sorted(metrics.items())
},
sort_keys=True,
),
encoding="utf-8",
)
def read_metrics_file(path: str | Path) -> dict[str, Any]:
source = Path(path)
if not source.is_file():
return {}
data = yaml.safe_load(source.read_text(encoding="utf-8"))
return data if isinstance(data, dict) else {}
def record_check_results(
root: str | Path,
check_report: CollectionCheckReport,
*,
artifact_evaluations: list[EntityEvaluation] | None = None,
schema_name: str = "default",
metadata: dict[str, Any] | None = None,
) -> RecordedCheckResult:
infospace = load_infospace(root)
artifact_count = int(
check_report.details.get("artifact_count", len(infospace.artifacts))
)
snapshot = snapshot_from_checks(
check_report,
artifact_count,
schema_name=schema_name,
metadata={"source": "collection-checks", **(metadata or {})},
artifact_evaluations=artifact_evaluations,
)
metrics_file = infospace.root / METRICS_PATH
existing = read_metrics_file(metrics_file)
merged = {
**existing,
**check_report.metrics,
**_evaluation_metrics(artifact_evaluations or []),
}
write_metrics_file(merged, metrics_file)
history_path = infospace.root / HISTORY_PATH
append_to_history(snapshot, history_path)
write_snapshot(
snapshot,
infospace.root / SNAPSHOT_DIR / f"{snapshot.snapshot_id}.yaml",
)
viability = build_viability_report(infospace.root, merged)
write_viability_report(infospace.root, viability)
return RecordedCheckResult(snapshot=snapshot, metrics=merged, viability=viability)
def get_history(root: str | Path) -> list[EvaluationSnapshot]:
return read_history(Path(root) / HISTORY_PATH)
def get_latest_snapshot(root: str | Path) -> EvaluationSnapshot | None:
history = get_history(root)
return history[-1] if history else None
def find_snapshot(
history: list[EvaluationSnapshot],
ref: str,
) -> EvaluationSnapshot | None:
for snapshot in history:
if snapshot.snapshot_id == ref:
return snapshot
return find_snapshot_by_date(history, ref)
def find_snapshot_by_date(
history: list[EvaluationSnapshot],
date_ref: str,
) -> EvaluationSnapshot | None:
if not history:
return None
try:
target = datetime.fromisoformat(
date_ref if "T" in date_ref else f"{date_ref}T00:00:00"
)
except ValueError:
return None
if target.tzinfo is None:
target = target.replace(tzinfo=timezone.utc)
def delta(snapshot: EvaluationSnapshot) -> float:
created_at = snapshot.created_at
if created_at.tzinfo is None:
created_at = created_at.replace(tzinfo=timezone.utc)
return abs((created_at - target).total_seconds())
return min(history, key=delta)
def metric_trend(
history: list[EvaluationSnapshot],
metric_name: str,
) -> list[dict[str, Any]]:
trend: list[dict[str, Any]] = []
for snapshot in history:
for metric in snapshot.collection_metrics:
if metric.name == metric_name:
trend.append(
{"date": snapshot.created_at.isoformat(), "value": metric.value}
)
break
return trend
def build_viability_report(
root: str | Path,
metrics: dict[str, Any] | None = None,
) -> dict[str, Any]:
infospace = load_infospace(root)
current = (
metrics
if metrics is not None
else read_metrics_file(infospace.root / METRICS_PATH)
)
numeric = _numeric_metrics(current)
report = evaluate_viability(numeric, infospace.config.viability)
return {
"passed": report.passed,
"results": {
name: {
"metric": result.metric,
"value": result.value,
"threshold": result.threshold.to_dict(),
"passed": result.passed,
}
for name, result in report.results.items()
},
}
def write_viability_report(root: str | Path, report: dict[str, Any]) -> None:
target = Path(root) / VIABILITY_PATH
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(yaml.safe_dump(report, sort_keys=False), encoding="utf-8")
def _evaluation_metrics(evaluations: list[EntityEvaluation]) -> dict[str, float | int]:
if not evaluations:
return {}
return {
"evaluated_artifact_count": len(evaluations),
"per_artifact_mean": sum(item.overall_score for item in evaluations)
/ len(evaluations),
}
def _numeric_metrics(metrics: dict[str, Any]) -> dict[str, float]:
return {
str(name): float(value)
for name, value in metrics.items()
if isinstance(value, (int, float)) and not isinstance(value, bool)
}
def _normalize_metric_value(value: Any) -> Any:
if isinstance(value, bool):
return value
if isinstance(value, float):
return round(value, 6)
return value
def _concern_for_metric(name: str) -> str:
mapping = {
"redundancy_ratio": "C1",
"coverage_ratio": "C2",
"coherence_components": "C3",
"consistency_cycles": "C4",
"granularity_entropy": "C5",
}
return mapping.get(name, "")