Some checks failed
Test Suite / code-quality (push) Has been cancelled
Test Suite / security-scan (push) Has been cancelled
Test Suite / unit-tests (3.11) (push) Has been cancelled
Test Suite / unit-tests (3.12) (push) Has been cancelled
Test Suite / integration-tests (push) Has been cancelled
Test Suite / e2e-tests (push) Has been cancelled
Test Suite / performance-tests (push) Has been cancelled
Test Suite / test-summary (push) Has been cancelled
Five improvements that eliminate most of the agent-in-the-loop friction
observed while closing out the 988-entity WoN evaluation (C.1):
1. Gemini adapter now retries on 429 + 5xx with exponential backoff
(same pattern already used by OpenRouter/OpenAI). Removes the need
for shell-level retry wrappers when hitting free-tier rate limits.
2. evaluate CLI prints the underlying error ("ERROR — HTTP 503 …")
instead of a bare "ERROR", so agents don't have to drop into Python
to diagnose transient failures.
3. --entity/--chapter now respect existing evaluation files by default
(previously only the full-collection pass did). New --force flag
opts into re-evaluation. Stops silently burning free-tier quota on
re-runs of the same slug.
4. --entity accepts hyphenated slugs (matching entity filenames) and
normalizes them to the underscore form used on disk. On a miss the
CLI suggests near matches instead of a bare "not found".
5. eval-summary --update-metrics is no longer destructive:
read_metrics_file/write_metrics_file preserve structured values
(type_distribution) and don't flatten ints to floats. Fixes a
silent data loss observed on every run.
Bonus: the evaluator field in written evaluation frontmatter now
falls back from run_config.model_name to the adapter's resolved model
(or the model echoed back in the API response), so rows no longer
show `evaluator: null` when --model is omitted.
Tests: new tests/unit/llm/test_gemini.py covers retry behavior;
tests/unit/infospace/test_history.py gains a round-trip test that
pins the type_distribution / int-preservation invariants.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
243 lines
7.5 KiB
Python
243 lines
7.5 KiB
Python
"""
|
|
Metrics history and viability tracking.
|
|
|
|
Converts check results into timestamped snapshots and maintains a
|
|
persistent history file for trend analysis.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
import yaml
|
|
|
|
from markitect.infospace.checks.orchestrator import CheckReport
|
|
from markitect.infospace.config import InfospaceConfig
|
|
from markitect.infospace.evaluation import EvaluationSnapshot, MetricValue
|
|
from markitect.infospace.evaluation_io import (
|
|
append_to_history,
|
|
diff_snapshots,
|
|
read_history,
|
|
)
|
|
from markitect.infospace.state import ViabilityResult
|
|
|
|
|
|
# ── Snapshot creation ────────────────────────────────────────────────
|
|
|
|
|
|
def _concern_for_metric(name: str) -> str:
|
|
"""Map a metric name to its concern label."""
|
|
mapping = {
|
|
"redundancy_ratio": "C1",
|
|
"coverage_ratio": "C2",
|
|
"coherence_components": "C3",
|
|
"modularity": "C3",
|
|
"consistency_cycles": "C4",
|
|
"granularity_entropy": "C5",
|
|
}
|
|
return mapping.get(name, "")
|
|
|
|
|
|
def snapshot_from_checks(
|
|
check_report: CheckReport,
|
|
entity_count: int,
|
|
schema_name: str = "default",
|
|
metadata: Optional[Dict[str, Any]] = None,
|
|
) -> EvaluationSnapshot:
|
|
"""Create an :class:`EvaluationSnapshot` from collection check results.
|
|
|
|
Args:
|
|
check_report: Output from :func:`run_all_checks`.
|
|
entity_count: Number of entities checked.
|
|
schema_name: Schema identifier for the snapshot.
|
|
metadata: Optional extra metadata to attach.
|
|
|
|
Returns:
|
|
A snapshot containing the check metrics as collection_metrics.
|
|
"""
|
|
metrics_dict = check_report.metrics()
|
|
collection_metrics = [
|
|
MetricValue(
|
|
name=name,
|
|
value=value,
|
|
concern=_concern_for_metric(name),
|
|
)
|
|
for name, value in sorted(metrics_dict.items())
|
|
]
|
|
|
|
return EvaluationSnapshot(
|
|
snapshot_id=str(uuid.uuid4())[:8],
|
|
created_at=datetime.now(timezone.utc),
|
|
schema_name=schema_name,
|
|
entity_count=entity_count,
|
|
collection_metrics=collection_metrics,
|
|
metadata=metadata or {},
|
|
)
|
|
|
|
|
|
# ── Metrics file I/O ────────────────────────────────────────────────
|
|
|
|
|
|
def write_metrics_file(metrics: Dict[str, Any], path: Path) -> None:
|
|
"""Write the latest metrics to a simple YAML file.
|
|
|
|
This file is used by ``markitect infospace viability`` for quick
|
|
threshold checking. Non-numeric values (e.g. ``type_distribution``)
|
|
are passed through unchanged; floats are rounded to 6 dp; ints are
|
|
preserved as ints so external consumers don't see ``29`` silently
|
|
become ``29.0`` on every round-trip.
|
|
"""
|
|
def _normalize(v: Any) -> Any:
|
|
if isinstance(v, bool):
|
|
return v
|
|
if isinstance(v, float):
|
|
return round(v, 6)
|
|
return v
|
|
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(
|
|
yaml.safe_dump(
|
|
{k: _normalize(v) for k, v in sorted(metrics.items())},
|
|
default_flow_style=False,
|
|
sort_keys=True,
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
|
|
def read_metrics_file(path: Path) -> Dict[str, Any]:
|
|
"""Read the latest metrics from a YAML file.
|
|
|
|
Returns all keys as written on disk, preserving types verbatim so a
|
|
round-trip via :func:`write_metrics_file` does not silently drop
|
|
structured values (e.g. ``type_distribution``) or flatten ints to
|
|
floats.
|
|
"""
|
|
if not path.is_file():
|
|
return {}
|
|
raw = yaml.safe_load(path.read_text(encoding="utf-8"))
|
|
if not isinstance(raw, dict):
|
|
return {}
|
|
return raw
|
|
|
|
|
|
# ── History operations ───────────────────────────────────────────────
|
|
|
|
|
|
def record_check_results(
|
|
check_report: CheckReport,
|
|
config: InfospaceConfig,
|
|
root: Path,
|
|
entity_count: int,
|
|
) -> EvaluationSnapshot:
|
|
"""Record check results: save metrics file and append to history.
|
|
|
|
Args:
|
|
check_report: Output from ``run_all_checks()``.
|
|
config: The infospace configuration.
|
|
root: Project root directory.
|
|
entity_count: Number of entities checked.
|
|
|
|
Returns:
|
|
The snapshot that was recorded.
|
|
"""
|
|
metrics_dir = root / config.metrics_dir
|
|
metrics = check_report.metrics()
|
|
|
|
# Save latest metrics — merge with existing so other metric sources
|
|
# (e.g. per-entity evaluation summary) are preserved across check runs.
|
|
existing = read_metrics_file(metrics_dir / "metrics.yaml")
|
|
merged = {**existing, **metrics} # check results overwrite on key conflict
|
|
write_metrics_file(merged, metrics_dir / "metrics.yaml")
|
|
|
|
# Create and append snapshot
|
|
snapshot = snapshot_from_checks(
|
|
check_report,
|
|
entity_count=entity_count,
|
|
metadata={"source": "collection-checks"},
|
|
)
|
|
append_to_history(snapshot, metrics_dir / "history.yaml")
|
|
|
|
return snapshot
|
|
|
|
|
|
def get_history(config: InfospaceConfig, root: Path) -> List[EvaluationSnapshot]:
|
|
"""Read the full metrics history for an infospace."""
|
|
history_path = root / config.metrics_dir / "history.yaml"
|
|
if not history_path.is_file():
|
|
return []
|
|
return read_history(history_path)
|
|
|
|
|
|
def get_latest_snapshot(
|
|
config: InfospaceConfig, root: Path
|
|
) -> Optional[EvaluationSnapshot]:
|
|
"""Get the most recent snapshot from the history."""
|
|
history = get_history(config, root)
|
|
return history[-1] if history else None
|
|
|
|
|
|
def find_snapshot_by_date(
|
|
history: List[EvaluationSnapshot], date_str: str
|
|
) -> Optional[EvaluationSnapshot]:
|
|
"""Find the snapshot closest to a given date string.
|
|
|
|
Args:
|
|
history: List of snapshots in chronological order.
|
|
date_str: Date string in ``YYYY-MM-DD`` or ``YYYY-MM-DDTHH:MM:SS`` format.
|
|
|
|
Returns:
|
|
The snapshot closest to the given date, or ``None`` if history is empty.
|
|
"""
|
|
if not history:
|
|
return None
|
|
|
|
# Parse the target date
|
|
try:
|
|
if "T" in date_str:
|
|
target = datetime.fromisoformat(date_str)
|
|
else:
|
|
target = datetime.fromisoformat(date_str + "T00:00:00")
|
|
except ValueError:
|
|
return None
|
|
|
|
# Make timezone-aware if needed
|
|
if target.tzinfo is None:
|
|
target = target.replace(tzinfo=timezone.utc)
|
|
|
|
best = None
|
|
best_delta = None
|
|
for snap in history:
|
|
snap_dt = snap.created_at
|
|
if snap_dt.tzinfo is None:
|
|
snap_dt = snap_dt.replace(tzinfo=timezone.utc)
|
|
delta = abs((snap_dt - target).total_seconds())
|
|
if best_delta is None or delta < best_delta:
|
|
best = snap
|
|
best_delta = delta
|
|
|
|
return best
|
|
|
|
|
|
def metric_trend(
|
|
history: List[EvaluationSnapshot], metric_name: str
|
|
) -> List[Dict[str, Any]]:
|
|
"""Extract a single metric's values across the history.
|
|
|
|
Returns a list of ``{"date": iso_str, "value": float}`` entries
|
|
for each snapshot that contains the metric.
|
|
"""
|
|
trend: List[Dict[str, Any]] = []
|
|
for snap in history:
|
|
for m in snap.collection_metrics:
|
|
if m.name == metric_name:
|
|
trend.append({
|
|
"date": snap.created_at.isoformat(),
|
|
"value": m.value,
|
|
})
|
|
break
|
|
return trend
|