Files
markitect-main/markitect/infospace/history.py
tegwick 7f1eecbdb2 feat(infospace): add eval-summary command and improve evaluate pipeline (S3.3)
- Fix evaluate dimensions to match template file:
  definition_precision, source_grounding, domain_placement,
  vsm_relevance, explanatory_value (was domain_relevance,
  discipline_alignment, conceptual_clarity)
- Add VSM background context to evaluation prompt so LLM can
  score vsm_relevance without macro injection
- Fix model_name bug: was sending literal "default" to API (HTTP 400)
- Refactor run_entity_evaluation to write files incrementally via
  callback rather than all at once after the batch — long runs are
  now resumable if interrupted
- Add incremental skip in CLI: entities with existing eval files
  are skipped automatically on re-run (acts as resume)
- Add eval-summary command: reads all eval files, shows per-dimension
  means, optionally writes per_entity_mean to metrics.yaml
- Fix record_check_results to merge rather than overwrite metrics.yaml
  so per_entity_mean survives subsequent check runs
- Add per_entity_mean viability threshold (min: 3.5) to infospace.yaml

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-23 01:26:45 +01:00

227 lines
6.9 KiB
Python

"""
Metrics history and viability tracking.
Converts check results into timestamped snapshots and maintains a
persistent history file for trend analysis.
"""
from __future__ import annotations
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
import yaml
from markitect.infospace.checks.orchestrator import CheckReport
from markitect.infospace.config import InfospaceConfig
from markitect.infospace.evaluation import EvaluationSnapshot, MetricValue
from markitect.infospace.evaluation_io import (
append_to_history,
diff_snapshots,
read_history,
)
from markitect.infospace.state import ViabilityResult
# ── Snapshot creation ────────────────────────────────────────────────
def _concern_for_metric(name: str) -> str:
"""Map a metric name to its concern label."""
mapping = {
"redundancy_ratio": "C1",
"coverage_ratio": "C2",
"coherence_components": "C3",
"modularity": "C3",
"consistency_cycles": "C4",
"granularity_entropy": "C5",
}
return mapping.get(name, "")
def snapshot_from_checks(
check_report: CheckReport,
entity_count: int,
schema_name: str = "default",
metadata: Optional[Dict[str, Any]] = None,
) -> EvaluationSnapshot:
"""Create an :class:`EvaluationSnapshot` from collection check results.
Args:
check_report: Output from :func:`run_all_checks`.
entity_count: Number of entities checked.
schema_name: Schema identifier for the snapshot.
metadata: Optional extra metadata to attach.
Returns:
A snapshot containing the check metrics as collection_metrics.
"""
metrics_dict = check_report.metrics()
collection_metrics = [
MetricValue(
name=name,
value=value,
concern=_concern_for_metric(name),
)
for name, value in sorted(metrics_dict.items())
]
return EvaluationSnapshot(
snapshot_id=str(uuid.uuid4())[:8],
created_at=datetime.now(timezone.utc),
schema_name=schema_name,
entity_count=entity_count,
collection_metrics=collection_metrics,
metadata=metadata or {},
)
# ── Metrics file I/O ────────────────────────────────────────────────
def write_metrics_file(metrics: Dict[str, float], path: Path) -> None:
"""Write the latest metrics to a simple YAML file.
This file is used by ``markitect infospace viability`` for quick
threshold checking.
"""
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(
yaml.safe_dump(
{k: round(v, 6) for k, v in sorted(metrics.items())},
default_flow_style=False,
sort_keys=True,
),
encoding="utf-8",
)
def read_metrics_file(path: Path) -> Dict[str, float]:
"""Read the latest metrics from a YAML file."""
if not path.is_file():
return {}
raw = yaml.safe_load(path.read_text(encoding="utf-8"))
if not isinstance(raw, dict):
return {}
return {k: float(v) for k, v in raw.items() if isinstance(v, (int, float))}
# ── History operations ───────────────────────────────────────────────
def record_check_results(
check_report: CheckReport,
config: InfospaceConfig,
root: Path,
entity_count: int,
) -> EvaluationSnapshot:
"""Record check results: save metrics file and append to history.
Args:
check_report: Output from ``run_all_checks()``.
config: The infospace configuration.
root: Project root directory.
entity_count: Number of entities checked.
Returns:
The snapshot that was recorded.
"""
metrics_dir = root / config.metrics_dir
metrics = check_report.metrics()
# Save latest metrics — merge with existing so other metric sources
# (e.g. per-entity evaluation summary) are preserved across check runs.
existing = read_metrics_file(metrics_dir / "metrics.yaml")
merged = {**existing, **metrics} # check results overwrite on key conflict
write_metrics_file(merged, metrics_dir / "metrics.yaml")
# Create and append snapshot
snapshot = snapshot_from_checks(
check_report,
entity_count=entity_count,
metadata={"source": "collection-checks"},
)
append_to_history(snapshot, metrics_dir / "history.yaml")
return snapshot
def get_history(config: InfospaceConfig, root: Path) -> List[EvaluationSnapshot]:
"""Read the full metrics history for an infospace."""
history_path = root / config.metrics_dir / "history.yaml"
if not history_path.is_file():
return []
return read_history(history_path)
def get_latest_snapshot(
config: InfospaceConfig, root: Path
) -> Optional[EvaluationSnapshot]:
"""Get the most recent snapshot from the history."""
history = get_history(config, root)
return history[-1] if history else None
def find_snapshot_by_date(
history: List[EvaluationSnapshot], date_str: str
) -> Optional[EvaluationSnapshot]:
"""Find the snapshot closest to a given date string.
Args:
history: List of snapshots in chronological order.
date_str: Date string in ``YYYY-MM-DD`` or ``YYYY-MM-DDTHH:MM:SS`` format.
Returns:
The snapshot closest to the given date, or ``None`` if history is empty.
"""
if not history:
return None
# Parse the target date
try:
if "T" in date_str:
target = datetime.fromisoformat(date_str)
else:
target = datetime.fromisoformat(date_str + "T00:00:00")
except ValueError:
return None
# Make timezone-aware if needed
if target.tzinfo is None:
target = target.replace(tzinfo=timezone.utc)
best = None
best_delta = None
for snap in history:
snap_dt = snap.created_at
if snap_dt.tzinfo is None:
snap_dt = snap_dt.replace(tzinfo=timezone.utc)
delta = abs((snap_dt - target).total_seconds())
if best_delta is None or delta < best_delta:
best = snap
best_delta = delta
return best
def metric_trend(
history: List[EvaluationSnapshot], metric_name: str
) -> List[Dict[str, Any]]:
"""Extract a single metric's values across the history.
Returns a list of ``{"date": iso_str, "value": float}`` entries
for each snapshot that contains the metric.
"""
trend: List[Dict[str, Any]] = []
for snap in history:
for m in snap.collection_metrics:
if m.name == metric_name:
trend.append({
"date": snap.created_at.isoformat(),
"value": m.value,
})
break
return trend