Add data models (ScoreEntry, EntityEvaluation, EvaluationSnapshot, SnapshotDiff) and I/O utilities for YAML frontmatter evaluation files, snapshot persistence, history append, and snapshot diffing. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
214 lines
7.0 KiB
Python
214 lines
7.0 KiB
Python
"""
|
|
Read/write utilities for evaluation output files.
|
|
|
|
Per-entity evaluations are stored as markdown with YAML frontmatter.
|
|
Snapshots and history are stored as pure YAML files.
|
|
"""
|
|
|
|
from pathlib import Path
|
|
from typing import List
|
|
|
|
import yaml
|
|
|
|
from .evaluation import (
|
|
EntityEvaluation,
|
|
EvaluationSnapshot,
|
|
MetricChange,
|
|
MetricValue,
|
|
ScoreChange,
|
|
SnapshotDiff,
|
|
)
|
|
|
|
_FRONTMATTER_SEP = "---"
|
|
|
|
|
|
def write_entity_evaluation(evaluation: EntityEvaluation, path: Path) -> None:
|
|
"""Write a per-entity evaluation as YAML frontmatter + markdown body."""
|
|
frontmatter = {
|
|
"entity_slug": evaluation.entity_slug,
|
|
"evaluator": evaluation.evaluator,
|
|
"evaluated_at": evaluation.evaluated_at.isoformat(),
|
|
"overall_score": round(evaluation.overall_score, 4),
|
|
"scores": [s.to_dict() for s in evaluation.scores],
|
|
}
|
|
if evaluation.notes:
|
|
frontmatter["notes"] = evaluation.notes
|
|
|
|
lines: List[str] = []
|
|
lines.append(_FRONTMATTER_SEP)
|
|
lines.append(yaml.safe_dump(frontmatter, default_flow_style=False, sort_keys=False).rstrip())
|
|
lines.append(_FRONTMATTER_SEP)
|
|
lines.append("")
|
|
|
|
# Title
|
|
title = evaluation.entity_slug.replace("_", " ").replace("-", " ").title()
|
|
lines.append(f"# Evaluation: {title}")
|
|
lines.append("")
|
|
|
|
# One section per score with rationale
|
|
for score in evaluation.scores:
|
|
lines.append(f"## {score.name} — {score.value} / {score.max_value}")
|
|
lines.append("")
|
|
if score.rationale:
|
|
lines.append(score.rationale)
|
|
lines.append("")
|
|
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text("\n".join(lines), encoding="utf-8")
|
|
|
|
|
|
def read_entity_evaluation(path: Path) -> EntityEvaluation:
|
|
"""Read a per-entity evaluation from a YAML frontmatter markdown file."""
|
|
text = path.read_text(encoding="utf-8")
|
|
parts = text.split(f"{_FRONTMATTER_SEP}\n", maxsplit=2)
|
|
# parts: ["", frontmatter_text, body]
|
|
if len(parts) < 3:
|
|
raise ValueError(f"Invalid frontmatter in {path}")
|
|
fm_text = parts[1]
|
|
body = parts[2]
|
|
|
|
fm = yaml.safe_load(fm_text)
|
|
|
|
# Parse rationales from body
|
|
rationales = _parse_rationales(body)
|
|
|
|
from .evaluation import ScoreEntry
|
|
|
|
scores = []
|
|
for s_data in fm["scores"]:
|
|
se = ScoreEntry.from_dict(s_data)
|
|
if se.name in rationales:
|
|
se.rationale = rationales[se.name]
|
|
scores.append(se)
|
|
|
|
return EntityEvaluation(
|
|
entity_slug=fm["entity_slug"],
|
|
evaluator=fm["evaluator"],
|
|
scores=scores,
|
|
evaluated_at=__import__("datetime").datetime.fromisoformat(fm["evaluated_at"]),
|
|
notes=fm.get("notes", []),
|
|
)
|
|
|
|
|
|
def _parse_rationales(body: str) -> dict:
|
|
"""Extract rationale text per dimension from the markdown body."""
|
|
rationales: dict = {}
|
|
current_name = None
|
|
current_lines: List[str] = []
|
|
|
|
for line in body.splitlines():
|
|
if line.startswith("## "):
|
|
# Save previous
|
|
if current_name is not None:
|
|
rationales[current_name] = "\n".join(current_lines).strip()
|
|
# Parse "## dimension_name — 4.5 / 5.0"
|
|
heading = line[3:].strip()
|
|
name = heading.split("—")[0].strip() if "—" in heading else heading
|
|
current_name = name
|
|
current_lines = []
|
|
elif current_name is not None:
|
|
current_lines.append(line)
|
|
|
|
if current_name is not None:
|
|
rationales[current_name] = "\n".join(current_lines).strip()
|
|
|
|
return rationales
|
|
|
|
|
|
def write_snapshot(snapshot: EvaluationSnapshot, path: Path) -> None:
|
|
"""Write an evaluation snapshot as a YAML file."""
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(
|
|
yaml.safe_dump(snapshot.to_dict(), default_flow_style=False, sort_keys=False),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
|
|
def read_snapshot(path: Path) -> EvaluationSnapshot:
|
|
"""Read an evaluation snapshot from a YAML file."""
|
|
data = yaml.safe_load(path.read_text(encoding="utf-8"))
|
|
return EvaluationSnapshot.from_dict(data)
|
|
|
|
|
|
def append_to_history(snapshot: EvaluationSnapshot, history_path: Path) -> None:
|
|
"""Append a snapshot to a YAML list file (creates if missing)."""
|
|
history_path.parent.mkdir(parents=True, exist_ok=True)
|
|
existing: List[dict] = []
|
|
if history_path.exists():
|
|
loaded = yaml.safe_load(history_path.read_text(encoding="utf-8"))
|
|
if loaded is not None:
|
|
existing = loaded
|
|
|
|
existing.append(snapshot.to_dict())
|
|
history_path.write_text(
|
|
yaml.safe_dump(existing, default_flow_style=False, sort_keys=False),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
|
|
def read_history(history_path: Path) -> List[EvaluationSnapshot]:
|
|
"""Read all snapshots from a YAML history file."""
|
|
data = yaml.safe_load(history_path.read_text(encoding="utf-8"))
|
|
if data is None:
|
|
return []
|
|
return [EvaluationSnapshot.from_dict(d) for d in data]
|
|
|
|
|
|
def diff_snapshots(before: EvaluationSnapshot, after: EvaluationSnapshot) -> SnapshotDiff:
|
|
"""Compute the diff between two evaluation snapshots."""
|
|
before_slugs = {e.entity_slug for e in before.entity_evaluations}
|
|
after_slugs = {e.entity_slug for e in after.entity_evaluations}
|
|
|
|
added = sorted(after_slugs - before_slugs)
|
|
removed = sorted(before_slugs - after_slugs)
|
|
|
|
# Build score lookup: {slug: {dimension: value}}
|
|
before_scores: dict = {}
|
|
for ev in before.entity_evaluations:
|
|
before_scores[ev.entity_slug] = {s.name: s.value for s in ev.scores}
|
|
|
|
after_scores: dict = {}
|
|
for ev in after.entity_evaluations:
|
|
after_scores[ev.entity_slug] = {s.name: s.value for s in ev.scores}
|
|
|
|
score_changes: List[ScoreChange] = []
|
|
common_slugs = sorted(before_slugs & after_slugs)
|
|
for slug in common_slugs:
|
|
b_dims = before_scores[slug]
|
|
a_dims = after_scores[slug]
|
|
all_dims = sorted(set(b_dims) | set(a_dims))
|
|
for dim in all_dims:
|
|
bv = b_dims.get(dim)
|
|
av = a_dims.get(dim)
|
|
if bv != av:
|
|
score_changes.append(ScoreChange(
|
|
entity_slug=slug,
|
|
dimension=dim,
|
|
before=bv if bv is not None else 0.0,
|
|
after=av if av is not None else 0.0,
|
|
))
|
|
|
|
# Metric changes
|
|
before_metrics = {m.name: m.value for m in before.collection_metrics}
|
|
after_metrics = {m.name: m.value for m in after.collection_metrics}
|
|
all_metric_names = sorted(set(before_metrics) | set(after_metrics))
|
|
metric_changes: List[MetricChange] = []
|
|
for name in all_metric_names:
|
|
bv = before_metrics.get(name)
|
|
av = after_metrics.get(name)
|
|
if bv != av:
|
|
metric_changes.append(MetricChange(
|
|
name=name,
|
|
before=bv if bv is not None else 0.0,
|
|
after=av if av is not None else 0.0,
|
|
))
|
|
|
|
return SnapshotDiff(
|
|
before_id=before.snapshot_id,
|
|
after_id=after.snapshot_id,
|
|
added_entities=added,
|
|
removed_entities=removed,
|
|
score_changes=score_changes,
|
|
metric_changes=metric_changes,
|
|
)
|