Files
tegwick c0615c2d50
Some checks failed
Test Suite / code-quality (push) Has been cancelled
Test Suite / security-scan (push) Has been cancelled
Test Suite / unit-tests (3.11) (push) Has been cancelled
Test Suite / unit-tests (3.12) (push) Has been cancelled
Test Suite / integration-tests (push) Has been cancelled
Test Suite / e2e-tests (push) Has been cancelled
Test Suite / performance-tests (push) Has been cancelled
Test Suite / test-summary (push) Has been cancelled
feat(infospace,llm): stabilize free-tier eval workflow
Five improvements that eliminate most of the agent-in-the-loop friction
observed while closing out the 988-entity WoN evaluation (C.1):

1. Gemini adapter now retries on 429 + 5xx with exponential backoff
   (same pattern already used by OpenRouter/OpenAI). Removes the need
   for shell-level retry wrappers when hitting free-tier rate limits.

2. evaluate CLI prints the underlying error ("ERROR — HTTP 503 …")
   instead of a bare "ERROR", so agents don't have to drop into Python
   to diagnose transient failures.

3. --entity/--chapter now respect existing evaluation files by default
   (previously only the full-collection pass did). New --force flag
   opts into re-evaluation. Stops silently burning free-tier quota on
   re-runs of the same slug.

4. --entity accepts hyphenated slugs (matching entity filenames) and
   normalizes them to the underscore form used on disk. On a miss the
   CLI suggests near matches instead of a bare "not found".

5. eval-summary --update-metrics is no longer destructive:
   read_metrics_file/write_metrics_file preserve structured values
   (type_distribution) and don't flatten ints to floats. Fixes a
   silent data loss observed on every run.

Bonus: the evaluator field in written evaluation frontmatter now
falls back from run_config.model_name to the adapter's resolved model
(or the model echoed back in the API response), so rows no longer
show `evaluator: null` when --model is omitted.

Tests: new tests/unit/llm/test_gemini.py covers retry behavior;
tests/unit/infospace/test_history.py gains a round-trip test that
pins the type_distribution / int-preservation invariants.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-22 00:51:00 +02:00

243 lines
7.5 KiB
Python

"""
Metrics history and viability tracking.
Converts check results into timestamped snapshots and maintains a
persistent history file for trend analysis.
"""
from __future__ import annotations
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
import yaml
from markitect.infospace.checks.orchestrator import CheckReport
from markitect.infospace.config import InfospaceConfig
from markitect.infospace.evaluation import EvaluationSnapshot, MetricValue
from markitect.infospace.evaluation_io import (
append_to_history,
diff_snapshots,
read_history,
)
from markitect.infospace.state import ViabilityResult
# ── Snapshot creation ────────────────────────────────────────────────
def _concern_for_metric(name: str) -> str:
"""Map a metric name to its concern label."""
mapping = {
"redundancy_ratio": "C1",
"coverage_ratio": "C2",
"coherence_components": "C3",
"modularity": "C3",
"consistency_cycles": "C4",
"granularity_entropy": "C5",
}
return mapping.get(name, "")
def snapshot_from_checks(
check_report: CheckReport,
entity_count: int,
schema_name: str = "default",
metadata: Optional[Dict[str, Any]] = None,
) -> EvaluationSnapshot:
"""Create an :class:`EvaluationSnapshot` from collection check results.
Args:
check_report: Output from :func:`run_all_checks`.
entity_count: Number of entities checked.
schema_name: Schema identifier for the snapshot.
metadata: Optional extra metadata to attach.
Returns:
A snapshot containing the check metrics as collection_metrics.
"""
metrics_dict = check_report.metrics()
collection_metrics = [
MetricValue(
name=name,
value=value,
concern=_concern_for_metric(name),
)
for name, value in sorted(metrics_dict.items())
]
return EvaluationSnapshot(
snapshot_id=str(uuid.uuid4())[:8],
created_at=datetime.now(timezone.utc),
schema_name=schema_name,
entity_count=entity_count,
collection_metrics=collection_metrics,
metadata=metadata or {},
)
# ── Metrics file I/O ────────────────────────────────────────────────
def write_metrics_file(metrics: Dict[str, Any], path: Path) -> None:
"""Write the latest metrics to a simple YAML file.
This file is used by ``markitect infospace viability`` for quick
threshold checking. Non-numeric values (e.g. ``type_distribution``)
are passed through unchanged; floats are rounded to 6 dp; ints are
preserved as ints so external consumers don't see ``29`` silently
become ``29.0`` on every round-trip.
"""
def _normalize(v: Any) -> Any:
if isinstance(v, bool):
return v
if isinstance(v, float):
return round(v, 6)
return v
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(
yaml.safe_dump(
{k: _normalize(v) for k, v in sorted(metrics.items())},
default_flow_style=False,
sort_keys=True,
),
encoding="utf-8",
)
def read_metrics_file(path: Path) -> Dict[str, Any]:
"""Read the latest metrics from a YAML file.
Returns all keys as written on disk, preserving types verbatim so a
round-trip via :func:`write_metrics_file` does not silently drop
structured values (e.g. ``type_distribution``) or flatten ints to
floats.
"""
if not path.is_file():
return {}
raw = yaml.safe_load(path.read_text(encoding="utf-8"))
if not isinstance(raw, dict):
return {}
return raw
# ── History operations ───────────────────────────────────────────────
def record_check_results(
check_report: CheckReport,
config: InfospaceConfig,
root: Path,
entity_count: int,
) -> EvaluationSnapshot:
"""Record check results: save metrics file and append to history.
Args:
check_report: Output from ``run_all_checks()``.
config: The infospace configuration.
root: Project root directory.
entity_count: Number of entities checked.
Returns:
The snapshot that was recorded.
"""
metrics_dir = root / config.metrics_dir
metrics = check_report.metrics()
# Save latest metrics — merge with existing so other metric sources
# (e.g. per-entity evaluation summary) are preserved across check runs.
existing = read_metrics_file(metrics_dir / "metrics.yaml")
merged = {**existing, **metrics} # check results overwrite on key conflict
write_metrics_file(merged, metrics_dir / "metrics.yaml")
# Create and append snapshot
snapshot = snapshot_from_checks(
check_report,
entity_count=entity_count,
metadata={"source": "collection-checks"},
)
append_to_history(snapshot, metrics_dir / "history.yaml")
return snapshot
def get_history(config: InfospaceConfig, root: Path) -> List[EvaluationSnapshot]:
"""Read the full metrics history for an infospace."""
history_path = root / config.metrics_dir / "history.yaml"
if not history_path.is_file():
return []
return read_history(history_path)
def get_latest_snapshot(
config: InfospaceConfig, root: Path
) -> Optional[EvaluationSnapshot]:
"""Get the most recent snapshot from the history."""
history = get_history(config, root)
return history[-1] if history else None
def find_snapshot_by_date(
history: List[EvaluationSnapshot], date_str: str
) -> Optional[EvaluationSnapshot]:
"""Find the snapshot closest to a given date string.
Args:
history: List of snapshots in chronological order.
date_str: Date string in ``YYYY-MM-DD`` or ``YYYY-MM-DDTHH:MM:SS`` format.
Returns:
The snapshot closest to the given date, or ``None`` if history is empty.
"""
if not history:
return None
# Parse the target date
try:
if "T" in date_str:
target = datetime.fromisoformat(date_str)
else:
target = datetime.fromisoformat(date_str + "T00:00:00")
except ValueError:
return None
# Make timezone-aware if needed
if target.tzinfo is None:
target = target.replace(tzinfo=timezone.utc)
best = None
best_delta = None
for snap in history:
snap_dt = snap.created_at
if snap_dt.tzinfo is None:
snap_dt = snap_dt.replace(tzinfo=timezone.utc)
delta = abs((snap_dt - target).total_seconds())
if best_delta is None or delta < best_delta:
best = snap
best_delta = delta
return best
def metric_trend(
history: List[EvaluationSnapshot], metric_name: str
) -> List[Dict[str, Any]]:
"""Extract a single metric's values across the history.
Returns a list of ``{"date": iso_str, "value": float}`` entries
for each snapshot that contains the metric.
"""
trend: List[Dict[str, Any]] = []
for snap in history:
for m in snap.collection_metrics:
if m.name == metric_name:
trend.append({
"date": snap.created_at.isoformat(),
"value": m.value,
})
break
return trend