Files
markitect-main/markitect/infospace/evaluation_io.py
tegwick f8c9ab33f0 feat(infospace): add structured evaluation output with history and diffing (S1.5)
Add data models (ScoreEntry, EntityEvaluation, EvaluationSnapshot,
SnapshotDiff) and I/O utilities for YAML frontmatter evaluation files,
snapshot persistence, history append, and snapshot diffing.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 01:35:22 +01:00

214 lines
7.0 KiB
Python

"""
Read/write utilities for evaluation output files.
Per-entity evaluations are stored as markdown with YAML frontmatter.
Snapshots and history are stored as pure YAML files.
"""
from pathlib import Path
from typing import List
import yaml
from .evaluation import (
EntityEvaluation,
EvaluationSnapshot,
MetricChange,
MetricValue,
ScoreChange,
SnapshotDiff,
)
_FRONTMATTER_SEP = "---"
def write_entity_evaluation(evaluation: EntityEvaluation, path: Path) -> None:
"""Write a per-entity evaluation as YAML frontmatter + markdown body."""
frontmatter = {
"entity_slug": evaluation.entity_slug,
"evaluator": evaluation.evaluator,
"evaluated_at": evaluation.evaluated_at.isoformat(),
"overall_score": round(evaluation.overall_score, 4),
"scores": [s.to_dict() for s in evaluation.scores],
}
if evaluation.notes:
frontmatter["notes"] = evaluation.notes
lines: List[str] = []
lines.append(_FRONTMATTER_SEP)
lines.append(yaml.safe_dump(frontmatter, default_flow_style=False, sort_keys=False).rstrip())
lines.append(_FRONTMATTER_SEP)
lines.append("")
# Title
title = evaluation.entity_slug.replace("_", " ").replace("-", " ").title()
lines.append(f"# Evaluation: {title}")
lines.append("")
# One section per score with rationale
for score in evaluation.scores:
lines.append(f"## {score.name}{score.value} / {score.max_value}")
lines.append("")
if score.rationale:
lines.append(score.rationale)
lines.append("")
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text("\n".join(lines), encoding="utf-8")
def read_entity_evaluation(path: Path) -> EntityEvaluation:
"""Read a per-entity evaluation from a YAML frontmatter markdown file."""
text = path.read_text(encoding="utf-8")
parts = text.split(f"{_FRONTMATTER_SEP}\n", maxsplit=2)
# parts: ["", frontmatter_text, body]
if len(parts) < 3:
raise ValueError(f"Invalid frontmatter in {path}")
fm_text = parts[1]
body = parts[2]
fm = yaml.safe_load(fm_text)
# Parse rationales from body
rationales = _parse_rationales(body)
from .evaluation import ScoreEntry
scores = []
for s_data in fm["scores"]:
se = ScoreEntry.from_dict(s_data)
if se.name in rationales:
se.rationale = rationales[se.name]
scores.append(se)
return EntityEvaluation(
entity_slug=fm["entity_slug"],
evaluator=fm["evaluator"],
scores=scores,
evaluated_at=__import__("datetime").datetime.fromisoformat(fm["evaluated_at"]),
notes=fm.get("notes", []),
)
def _parse_rationales(body: str) -> dict:
"""Extract rationale text per dimension from the markdown body."""
rationales: dict = {}
current_name = None
current_lines: List[str] = []
for line in body.splitlines():
if line.startswith("## "):
# Save previous
if current_name is not None:
rationales[current_name] = "\n".join(current_lines).strip()
# Parse "## dimension_name — 4.5 / 5.0"
heading = line[3:].strip()
name = heading.split("")[0].strip() if "" in heading else heading
current_name = name
current_lines = []
elif current_name is not None:
current_lines.append(line)
if current_name is not None:
rationales[current_name] = "\n".join(current_lines).strip()
return rationales
def write_snapshot(snapshot: EvaluationSnapshot, path: Path) -> None:
"""Write an evaluation snapshot as a YAML file."""
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(
yaml.safe_dump(snapshot.to_dict(), default_flow_style=False, sort_keys=False),
encoding="utf-8",
)
def read_snapshot(path: Path) -> EvaluationSnapshot:
"""Read an evaluation snapshot from a YAML file."""
data = yaml.safe_load(path.read_text(encoding="utf-8"))
return EvaluationSnapshot.from_dict(data)
def append_to_history(snapshot: EvaluationSnapshot, history_path: Path) -> None:
"""Append a snapshot to a YAML list file (creates if missing)."""
history_path.parent.mkdir(parents=True, exist_ok=True)
existing: List[dict] = []
if history_path.exists():
loaded = yaml.safe_load(history_path.read_text(encoding="utf-8"))
if loaded is not None:
existing = loaded
existing.append(snapshot.to_dict())
history_path.write_text(
yaml.safe_dump(existing, default_flow_style=False, sort_keys=False),
encoding="utf-8",
)
def read_history(history_path: Path) -> List[EvaluationSnapshot]:
"""Read all snapshots from a YAML history file."""
data = yaml.safe_load(history_path.read_text(encoding="utf-8"))
if data is None:
return []
return [EvaluationSnapshot.from_dict(d) for d in data]
def diff_snapshots(before: EvaluationSnapshot, after: EvaluationSnapshot) -> SnapshotDiff:
"""Compute the diff between two evaluation snapshots."""
before_slugs = {e.entity_slug for e in before.entity_evaluations}
after_slugs = {e.entity_slug for e in after.entity_evaluations}
added = sorted(after_slugs - before_slugs)
removed = sorted(before_slugs - after_slugs)
# Build score lookup: {slug: {dimension: value}}
before_scores: dict = {}
for ev in before.entity_evaluations:
before_scores[ev.entity_slug] = {s.name: s.value for s in ev.scores}
after_scores: dict = {}
for ev in after.entity_evaluations:
after_scores[ev.entity_slug] = {s.name: s.value for s in ev.scores}
score_changes: List[ScoreChange] = []
common_slugs = sorted(before_slugs & after_slugs)
for slug in common_slugs:
b_dims = before_scores[slug]
a_dims = after_scores[slug]
all_dims = sorted(set(b_dims) | set(a_dims))
for dim in all_dims:
bv = b_dims.get(dim)
av = a_dims.get(dim)
if bv != av:
score_changes.append(ScoreChange(
entity_slug=slug,
dimension=dim,
before=bv if bv is not None else 0.0,
after=av if av is not None else 0.0,
))
# Metric changes
before_metrics = {m.name: m.value for m in before.collection_metrics}
after_metrics = {m.name: m.value for m in after.collection_metrics}
all_metric_names = sorted(set(before_metrics) | set(after_metrics))
metric_changes: List[MetricChange] = []
for name in all_metric_names:
bv = before_metrics.get(name)
av = after_metrics.get(name)
if bv != av:
metric_changes.append(MetricChange(
name=name,
before=bv if bv is not None else 0.0,
after=av if av is not None else 0.0,
))
return SnapshotDiff(
before_id=before.snapshot_id,
after_id=after.snapshot_id,
added_entities=added,
removed_entities=removed,
score_changes=score_changes,
metric_changes=metric_changes,
)