feat(infospace): add structured evaluation output with history and diffing (S1.5)

Add data models (ScoreEntry, EntityEvaluation, EvaluationSnapshot,
SnapshotDiff) and I/O utilities for YAML frontmatter evaluation files,
snapshot persistence, history append, and snapshot diffing.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-19 01:35:22 +01:00
parent bad01e32bd
commit f8c9ab33f0
4 changed files with 852 additions and 0 deletions

View File

@@ -21,6 +21,24 @@ from .validator import (
validate_entities,
validate_entity,
)
from .evaluation import (
EntityEvaluation,
EvaluationSnapshot,
MetricChange,
MetricValue,
ScoreChange,
ScoreEntry,
SnapshotDiff,
)
from .evaluation_io import (
append_to_history,
diff_snapshots,
read_entity_evaluation,
read_history,
read_snapshot,
write_entity_evaluation,
write_snapshot,
)
__all__ = [
"EntityMeta",
@@ -38,4 +56,20 @@ __all__ = [
"ComplianceResult",
"validate_entities",
"validate_entity",
# Evaluation models
"EntityEvaluation",
"EvaluationSnapshot",
"MetricChange",
"MetricValue",
"ScoreChange",
"ScoreEntry",
"SnapshotDiff",
# Evaluation I/O
"append_to_history",
"diff_snapshots",
"read_entity_evaluation",
"read_history",
"read_snapshot",
"write_entity_evaluation",
"write_snapshot",
]

View File

@@ -0,0 +1,207 @@
"""
Data models for structured evaluation output.
Provides typed containers for per-entity LLM-evaluated scores and
collection-level metrics. All models support ``to_dict()``/``from_dict()``
round-tripping for YAML serialisation.
"""
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Dict, List, Optional
@dataclass
class ScoreEntry:
"""A single scored dimension (e.g. definition_precision: 4.5/5.0)."""
name: str
value: float
max_value: float = 5.0
rationale: str = ""
def to_dict(self) -> Dict[str, Any]:
d: Dict[str, Any] = {
"name": self.name,
"value": self.value,
"max_value": self.max_value,
}
if self.rationale:
d["rationale"] = self.rationale
return d
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ScoreEntry":
return cls(
name=data["name"],
value=float(data["value"]),
max_value=float(data.get("max_value", 5.0)),
rationale=data.get("rationale", ""),
)
@dataclass
class EntityEvaluation:
"""Per-entity evaluation result."""
entity_slug: str
evaluator: str
scores: List[ScoreEntry]
evaluated_at: datetime
notes: List[str] = field(default_factory=list)
@property
def overall_score(self) -> float:
if not self.scores:
return 0.0
return sum(s.value for s in self.scores) / len(self.scores)
def to_dict(self) -> Dict[str, Any]:
return {
"entity_slug": self.entity_slug,
"evaluator": self.evaluator,
"evaluated_at": self.evaluated_at.isoformat(),
"overall_score": round(self.overall_score, 4),
"scores": [s.to_dict() for s in self.scores],
"notes": self.notes,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "EntityEvaluation":
return cls(
entity_slug=data["entity_slug"],
evaluator=data["evaluator"],
scores=[ScoreEntry.from_dict(s) for s in data["scores"]],
evaluated_at=datetime.fromisoformat(data["evaluated_at"]),
notes=data.get("notes", []),
)
@dataclass
class MetricValue:
"""A single collection-level metric."""
name: str
value: float
concern: str = ""
details: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
d: Dict[str, Any] = {"name": self.name, "value": self.value}
if self.concern:
d["concern"] = self.concern
if self.details:
d["details"] = self.details
return d
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "MetricValue":
return cls(
name=data["name"],
value=float(data["value"]),
concern=data.get("concern", ""),
details=data.get("details", {}),
)
@dataclass
class EvaluationSnapshot:
"""Timestamped snapshot of entity evaluations and collection metrics."""
snapshot_id: str
created_at: datetime
schema_name: str
entity_count: int
entity_evaluations: List[EntityEvaluation] = field(default_factory=list)
collection_metrics: List[MetricValue] = field(default_factory=list)
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
return {
"snapshot_id": self.snapshot_id,
"created_at": self.created_at.isoformat(),
"schema_name": self.schema_name,
"entity_count": self.entity_count,
"entity_evaluations": [e.to_dict() for e in self.entity_evaluations],
"collection_metrics": [m.to_dict() for m in self.collection_metrics],
"metadata": self.metadata,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "EvaluationSnapshot":
return cls(
snapshot_id=data["snapshot_id"],
created_at=datetime.fromisoformat(data["created_at"]),
schema_name=data["schema_name"],
entity_count=data["entity_count"],
entity_evaluations=[
EntityEvaluation.from_dict(e) for e in data.get("entity_evaluations", [])
],
collection_metrics=[
MetricValue.from_dict(m) for m in data.get("collection_metrics", [])
],
metadata=data.get("metadata", {}),
)
@dataclass
class ScoreChange:
"""Delta record for a single score dimension between snapshots."""
entity_slug: str
dimension: str
before: float
after: float
@property
def delta(self) -> float:
return self.after - self.before
@dataclass
class MetricChange:
"""Delta record for a collection metric between snapshots."""
name: str
before: float
after: float
@property
def delta(self) -> float:
return self.after - self.before
@dataclass
class SnapshotDiff:
"""Diff between two evaluation snapshots."""
before_id: str
after_id: str
added_entities: List[str] = field(default_factory=list)
removed_entities: List[str] = field(default_factory=list)
score_changes: List[ScoreChange] = field(default_factory=list)
metric_changes: List[MetricChange] = field(default_factory=list)
def summary(self) -> str:
lines = [f"Diff: {self.before_id} -> {self.after_id}"]
if self.added_entities:
lines.append(f" Added entities: {', '.join(self.added_entities)}")
if self.removed_entities:
lines.append(f" Removed entities: {', '.join(self.removed_entities)}")
if self.score_changes:
lines.append(f" Score changes: {len(self.score_changes)}")
for sc in self.score_changes:
lines.append(
f" {sc.entity_slug}/{sc.dimension}: "
f"{sc.before} -> {sc.after} ({sc.delta:+.2f})"
)
if self.metric_changes:
lines.append(f" Metric changes: {len(self.metric_changes)}")
for mc in self.metric_changes:
lines.append(
f" {mc.name}: {mc.before} -> {mc.after} ({mc.delta:+.2f})"
)
if not any([self.added_entities, self.removed_entities,
self.score_changes, self.metric_changes]):
lines.append(" No changes")
return "\n".join(lines)

View File

@@ -0,0 +1,213 @@
"""
Read/write utilities for evaluation output files.
Per-entity evaluations are stored as markdown with YAML frontmatter.
Snapshots and history are stored as pure YAML files.
"""
from pathlib import Path
from typing import List
import yaml
from .evaluation import (
EntityEvaluation,
EvaluationSnapshot,
MetricChange,
MetricValue,
ScoreChange,
SnapshotDiff,
)
_FRONTMATTER_SEP = "---"
def write_entity_evaluation(evaluation: EntityEvaluation, path: Path) -> None:
"""Write a per-entity evaluation as YAML frontmatter + markdown body."""
frontmatter = {
"entity_slug": evaluation.entity_slug,
"evaluator": evaluation.evaluator,
"evaluated_at": evaluation.evaluated_at.isoformat(),
"overall_score": round(evaluation.overall_score, 4),
"scores": [s.to_dict() for s in evaluation.scores],
}
if evaluation.notes:
frontmatter["notes"] = evaluation.notes
lines: List[str] = []
lines.append(_FRONTMATTER_SEP)
lines.append(yaml.safe_dump(frontmatter, default_flow_style=False, sort_keys=False).rstrip())
lines.append(_FRONTMATTER_SEP)
lines.append("")
# Title
title = evaluation.entity_slug.replace("_", " ").replace("-", " ").title()
lines.append(f"# Evaluation: {title}")
lines.append("")
# One section per score with rationale
for score in evaluation.scores:
lines.append(f"## {score.name}{score.value} / {score.max_value}")
lines.append("")
if score.rationale:
lines.append(score.rationale)
lines.append("")
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text("\n".join(lines), encoding="utf-8")
def read_entity_evaluation(path: Path) -> EntityEvaluation:
"""Read a per-entity evaluation from a YAML frontmatter markdown file."""
text = path.read_text(encoding="utf-8")
parts = text.split(f"{_FRONTMATTER_SEP}\n", maxsplit=2)
# parts: ["", frontmatter_text, body]
if len(parts) < 3:
raise ValueError(f"Invalid frontmatter in {path}")
fm_text = parts[1]
body = parts[2]
fm = yaml.safe_load(fm_text)
# Parse rationales from body
rationales = _parse_rationales(body)
from .evaluation import ScoreEntry
scores = []
for s_data in fm["scores"]:
se = ScoreEntry.from_dict(s_data)
if se.name in rationales:
se.rationale = rationales[se.name]
scores.append(se)
return EntityEvaluation(
entity_slug=fm["entity_slug"],
evaluator=fm["evaluator"],
scores=scores,
evaluated_at=__import__("datetime").datetime.fromisoformat(fm["evaluated_at"]),
notes=fm.get("notes", []),
)
def _parse_rationales(body: str) -> dict:
"""Extract rationale text per dimension from the markdown body."""
rationales: dict = {}
current_name = None
current_lines: List[str] = []
for line in body.splitlines():
if line.startswith("## "):
# Save previous
if current_name is not None:
rationales[current_name] = "\n".join(current_lines).strip()
# Parse "## dimension_name — 4.5 / 5.0"
heading = line[3:].strip()
name = heading.split("")[0].strip() if "" in heading else heading
current_name = name
current_lines = []
elif current_name is not None:
current_lines.append(line)
if current_name is not None:
rationales[current_name] = "\n".join(current_lines).strip()
return rationales
def write_snapshot(snapshot: EvaluationSnapshot, path: Path) -> None:
"""Write an evaluation snapshot as a YAML file."""
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(
yaml.safe_dump(snapshot.to_dict(), default_flow_style=False, sort_keys=False),
encoding="utf-8",
)
def read_snapshot(path: Path) -> EvaluationSnapshot:
"""Read an evaluation snapshot from a YAML file."""
data = yaml.safe_load(path.read_text(encoding="utf-8"))
return EvaluationSnapshot.from_dict(data)
def append_to_history(snapshot: EvaluationSnapshot, history_path: Path) -> None:
"""Append a snapshot to a YAML list file (creates if missing)."""
history_path.parent.mkdir(parents=True, exist_ok=True)
existing: List[dict] = []
if history_path.exists():
loaded = yaml.safe_load(history_path.read_text(encoding="utf-8"))
if loaded is not None:
existing = loaded
existing.append(snapshot.to_dict())
history_path.write_text(
yaml.safe_dump(existing, default_flow_style=False, sort_keys=False),
encoding="utf-8",
)
def read_history(history_path: Path) -> List[EvaluationSnapshot]:
"""Read all snapshots from a YAML history file."""
data = yaml.safe_load(history_path.read_text(encoding="utf-8"))
if data is None:
return []
return [EvaluationSnapshot.from_dict(d) for d in data]
def diff_snapshots(before: EvaluationSnapshot, after: EvaluationSnapshot) -> SnapshotDiff:
"""Compute the diff between two evaluation snapshots."""
before_slugs = {e.entity_slug for e in before.entity_evaluations}
after_slugs = {e.entity_slug for e in after.entity_evaluations}
added = sorted(after_slugs - before_slugs)
removed = sorted(before_slugs - after_slugs)
# Build score lookup: {slug: {dimension: value}}
before_scores: dict = {}
for ev in before.entity_evaluations:
before_scores[ev.entity_slug] = {s.name: s.value for s in ev.scores}
after_scores: dict = {}
for ev in after.entity_evaluations:
after_scores[ev.entity_slug] = {s.name: s.value for s in ev.scores}
score_changes: List[ScoreChange] = []
common_slugs = sorted(before_slugs & after_slugs)
for slug in common_slugs:
b_dims = before_scores[slug]
a_dims = after_scores[slug]
all_dims = sorted(set(b_dims) | set(a_dims))
for dim in all_dims:
bv = b_dims.get(dim)
av = a_dims.get(dim)
if bv != av:
score_changes.append(ScoreChange(
entity_slug=slug,
dimension=dim,
before=bv if bv is not None else 0.0,
after=av if av is not None else 0.0,
))
# Metric changes
before_metrics = {m.name: m.value for m in before.collection_metrics}
after_metrics = {m.name: m.value for m in after.collection_metrics}
all_metric_names = sorted(set(before_metrics) | set(after_metrics))
metric_changes: List[MetricChange] = []
for name in all_metric_names:
bv = before_metrics.get(name)
av = after_metrics.get(name)
if bv != av:
metric_changes.append(MetricChange(
name=name,
before=bv if bv is not None else 0.0,
after=av if av is not None else 0.0,
))
return SnapshotDiff(
before_id=before.snapshot_id,
after_id=after.snapshot_id,
added_entities=added,
removed_entities=removed,
score_changes=score_changes,
metric_changes=metric_changes,
)