Files
markitect-main/markitect/infospace/history.py
tegwick d1f57272a4 feat(example): add L2 classifications for 823/988 WoN entities (S3.4)
Batch classification via OpenRouter (claude-sonnet-4). 165 entities
remain unclassified due to credit exhaustion; incremental skip means
a follow-up run will complete them automatically.

Type × VSM matrix (823 entities):
                  S1   S2   S3  S3*   S4   S5
  Element         86   75   58   21   43   32  (315 total, 38%)
  Process         39   42   37   17   67   24  (226 total, 28%)
  Institution      4   12   30   24    .   52  (122 total, 15%)
  Principle        3    7   15    2   43   32  (102 total, 12%)
  Relation         2   14    5    5   22   10   (58 total,  7%)
  Matrix fill: 29/30 cells (Institution/S4 empty — expected)

Metrics updated: type_entropy=2.0936, vsm_type_matrix_cells=29

Also:
- BatchEvaluator gains delay_seconds param for rate-limited providers
- classify CLI gains --rpm option (--rpm 10 for Gemini free tier)
- history.write_metrics_file now handles non-float metric values
  (type_distribution is a dict, was crashing round())
- run_entity_classification forwards delay_seconds to BatchEvaluator
- classify-links and graph commands added by user (entities --by-type,
  graph --format mermaid/dot, classify-links for Relation enrichment)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-23 12:49:11 +01:00

228 lines
7.0 KiB
Python

"""
Metrics history and viability tracking.
Converts check results into timestamped snapshots and maintains a
persistent history file for trend analysis.
"""
from __future__ import annotations
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
import yaml
from markitect.infospace.checks.orchestrator import CheckReport
from markitect.infospace.config import InfospaceConfig
from markitect.infospace.evaluation import EvaluationSnapshot, MetricValue
from markitect.infospace.evaluation_io import (
append_to_history,
diff_snapshots,
read_history,
)
from markitect.infospace.state import ViabilityResult
# ── Snapshot creation ────────────────────────────────────────────────
def _concern_for_metric(name: str) -> str:
"""Map a metric name to its concern label."""
mapping = {
"redundancy_ratio": "C1",
"coverage_ratio": "C2",
"coherence_components": "C3",
"modularity": "C3",
"consistency_cycles": "C4",
"granularity_entropy": "C5",
}
return mapping.get(name, "")
def snapshot_from_checks(
check_report: CheckReport,
entity_count: int,
schema_name: str = "default",
metadata: Optional[Dict[str, Any]] = None,
) -> EvaluationSnapshot:
"""Create an :class:`EvaluationSnapshot` from collection check results.
Args:
check_report: Output from :func:`run_all_checks`.
entity_count: Number of entities checked.
schema_name: Schema identifier for the snapshot.
metadata: Optional extra metadata to attach.
Returns:
A snapshot containing the check metrics as collection_metrics.
"""
metrics_dict = check_report.metrics()
collection_metrics = [
MetricValue(
name=name,
value=value,
concern=_concern_for_metric(name),
)
for name, value in sorted(metrics_dict.items())
]
return EvaluationSnapshot(
snapshot_id=str(uuid.uuid4())[:8],
created_at=datetime.now(timezone.utc),
schema_name=schema_name,
entity_count=entity_count,
collection_metrics=collection_metrics,
metadata=metadata or {},
)
# ── Metrics file I/O ────────────────────────────────────────────────
def write_metrics_file(metrics: Dict[str, float], path: Path) -> None:
"""Write the latest metrics to a simple YAML file.
This file is used by ``markitect infospace viability`` for quick
threshold checking.
"""
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(
yaml.safe_dump(
{k: round(v, 6) if isinstance(v, float) else v
for k, v in sorted(metrics.items())},
default_flow_style=False,
sort_keys=True,
),
encoding="utf-8",
)
def read_metrics_file(path: Path) -> Dict[str, float]:
"""Read the latest metrics from a YAML file."""
if not path.is_file():
return {}
raw = yaml.safe_load(path.read_text(encoding="utf-8"))
if not isinstance(raw, dict):
return {}
return {k: float(v) for k, v in raw.items() if isinstance(v, (int, float))}
# ── History operations ───────────────────────────────────────────────
def record_check_results(
check_report: CheckReport,
config: InfospaceConfig,
root: Path,
entity_count: int,
) -> EvaluationSnapshot:
"""Record check results: save metrics file and append to history.
Args:
check_report: Output from ``run_all_checks()``.
config: The infospace configuration.
root: Project root directory.
entity_count: Number of entities checked.
Returns:
The snapshot that was recorded.
"""
metrics_dir = root / config.metrics_dir
metrics = check_report.metrics()
# Save latest metrics — merge with existing so other metric sources
# (e.g. per-entity evaluation summary) are preserved across check runs.
existing = read_metrics_file(metrics_dir / "metrics.yaml")
merged = {**existing, **metrics} # check results overwrite on key conflict
write_metrics_file(merged, metrics_dir / "metrics.yaml")
# Create and append snapshot
snapshot = snapshot_from_checks(
check_report,
entity_count=entity_count,
metadata={"source": "collection-checks"},
)
append_to_history(snapshot, metrics_dir / "history.yaml")
return snapshot
def get_history(config: InfospaceConfig, root: Path) -> List[EvaluationSnapshot]:
"""Read the full metrics history for an infospace."""
history_path = root / config.metrics_dir / "history.yaml"
if not history_path.is_file():
return []
return read_history(history_path)
def get_latest_snapshot(
config: InfospaceConfig, root: Path
) -> Optional[EvaluationSnapshot]:
"""Get the most recent snapshot from the history."""
history = get_history(config, root)
return history[-1] if history else None
def find_snapshot_by_date(
history: List[EvaluationSnapshot], date_str: str
) -> Optional[EvaluationSnapshot]:
"""Find the snapshot closest to a given date string.
Args:
history: List of snapshots in chronological order.
date_str: Date string in ``YYYY-MM-DD`` or ``YYYY-MM-DDTHH:MM:SS`` format.
Returns:
The snapshot closest to the given date, or ``None`` if history is empty.
"""
if not history:
return None
# Parse the target date
try:
if "T" in date_str:
target = datetime.fromisoformat(date_str)
else:
target = datetime.fromisoformat(date_str + "T00:00:00")
except ValueError:
return None
# Make timezone-aware if needed
if target.tzinfo is None:
target = target.replace(tzinfo=timezone.utc)
best = None
best_delta = None
for snap in history:
snap_dt = snap.created_at
if snap_dt.tzinfo is None:
snap_dt = snap_dt.replace(tzinfo=timezone.utc)
delta = abs((snap_dt - target).total_seconds())
if best_delta is None or delta < best_delta:
best = snap
best_delta = delta
return best
def metric_trend(
history: List[EvaluationSnapshot], metric_name: str
) -> List[Dict[str, Any]]:
"""Extract a single metric's values across the history.
Returns a list of ``{"date": iso_str, "value": float}`` entries
for each snapshot that contains the metric.
"""
trend: List[Dict[str, Any]] = []
for snap in history:
for m in snap.collection_metrics:
if m.name == metric_name:
trend.append({
"date": snap.created_at.isoformat(),
"value": m.value,
})
break
return trend