feat(infospace): add metrics history and viability tracking (S2.5)

History module with snapshot creation from check results, metrics file
I/O, auto-append to history after checks, date-based snapshot lookup,
and metric trend extraction. CLI commands: history, history-diff.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-19 02:01:00 +01:00
parent 11585e6968
commit ce7f78d57d
3 changed files with 569 additions and 0 deletions

View File

@@ -331,3 +331,91 @@ def check(config_path: Optional[str], concerns: tuple, as_json: bool):
click.echo("Metrics summary:")
for k, v in sorted(m.items()):
click.echo(f" {k}: {v:.4f}")
# Record to history
if m:
from markitect.infospace.history import record_check_results
snap = record_check_results(report, cfg, root, entity_count=len(entity_list))
if not as_json:
click.echo(f"\nRecorded snapshot {snap.snapshot_id}")
# ── history ─────────────────────────────────────────────────────────
@infospace_commands.command()
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
@click.option("--metric", default=None, help="Show trend for a specific metric.")
@click.option("--json", "as_json", is_flag=True, help="Output as JSON.")
def history(config_path: Optional[str], metric: Optional[str], as_json: bool):
"""Show metrics history — snapshots over time."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
from markitect.infospace.history import get_history, metric_trend
snapshots = get_history(cfg, root)
if not snapshots:
click.echo("No history found. Run 'markitect infospace check' first.")
return
if metric:
trend = metric_trend(snapshots, metric)
if not trend:
click.echo(f"No data for metric '{metric}'.")
return
if as_json:
import json
click.echo(json.dumps(trend, indent=2))
else:
click.echo(f"Trend: {metric}\n")
for entry in trend:
click.echo(f" {entry['date'][:19]} {entry['value']:.4f}")
return
if as_json:
import json
click.echo(json.dumps([s.to_dict() for s in snapshots], indent=2, default=str))
return
click.echo(f"History: {len(snapshots)} snapshot(s)\n")
click.echo(f"{'#':<4} {'Date':<20} {'Entities':>8} {'Metrics':>8}")
click.echo("-" * 42)
for i, snap in enumerate(snapshots, 1):
date_str = snap.created_at.isoformat()[:19]
n_metrics = len(snap.collection_metrics)
click.echo(f"{i:<4} {date_str:<20} {snap.entity_count:>8} {n_metrics:>8}")
@infospace_commands.command(name="history-diff")
@click.argument("date_a")
@click.argument("date_b")
@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.")
def history_diff(date_a: str, date_b: str, config_path: Optional[str]):
"""Compare two history snapshots by date (YYYY-MM-DD)."""
cfg, cfg_path = _load_config_or_exit(config_path)
root = cfg_path.parent
from markitect.infospace.history import find_snapshot_by_date, get_history
from markitect.infospace.evaluation_io import diff_snapshots
snapshots = get_history(cfg, root)
if len(snapshots) < 2:
click.echo("Need at least two snapshots to diff.")
return
snap_a = find_snapshot_by_date(snapshots, date_a)
snap_b = find_snapshot_by_date(snapshots, date_b)
if snap_a is None:
click.echo(f"No snapshot found near '{date_a}'.")
return
if snap_b is None:
click.echo(f"No snapshot found near '{date_b}'.")
return
if snap_a.snapshot_id == snap_b.snapshot_id:
click.echo("Both dates resolve to the same snapshot.")
return
diff = diff_snapshots(snap_a, snap_b)
click.echo(diff.summary())

View File

@@ -0,0 +1,223 @@
"""
Metrics history and viability tracking.
Converts check results into timestamped snapshots and maintains a
persistent history file for trend analysis.
"""
from __future__ import annotations
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
import yaml
from markitect.infospace.checks.orchestrator import CheckReport
from markitect.infospace.config import InfospaceConfig
from markitect.infospace.evaluation import EvaluationSnapshot, MetricValue
from markitect.infospace.evaluation_io import (
append_to_history,
diff_snapshots,
read_history,
)
from markitect.infospace.state import ViabilityResult
# ── Snapshot creation ────────────────────────────────────────────────
def _concern_for_metric(name: str) -> str:
"""Map a metric name to its concern label."""
mapping = {
"redundancy_ratio": "C1",
"coverage_ratio": "C2",
"coherence_components": "C3",
"modularity": "C3",
"consistency_cycles": "C4",
"granularity_entropy": "C5",
}
return mapping.get(name, "")
def snapshot_from_checks(
check_report: CheckReport,
entity_count: int,
schema_name: str = "default",
metadata: Optional[Dict[str, Any]] = None,
) -> EvaluationSnapshot:
"""Create an :class:`EvaluationSnapshot` from collection check results.
Args:
check_report: Output from :func:`run_all_checks`.
entity_count: Number of entities checked.
schema_name: Schema identifier for the snapshot.
metadata: Optional extra metadata to attach.
Returns:
A snapshot containing the check metrics as collection_metrics.
"""
metrics_dict = check_report.metrics()
collection_metrics = [
MetricValue(
name=name,
value=value,
concern=_concern_for_metric(name),
)
for name, value in sorted(metrics_dict.items())
]
return EvaluationSnapshot(
snapshot_id=str(uuid.uuid4())[:8],
created_at=datetime.now(timezone.utc),
schema_name=schema_name,
entity_count=entity_count,
collection_metrics=collection_metrics,
metadata=metadata or {},
)
# ── Metrics file I/O ────────────────────────────────────────────────
def write_metrics_file(metrics: Dict[str, float], path: Path) -> None:
"""Write the latest metrics to a simple YAML file.
This file is used by ``markitect infospace viability`` for quick
threshold checking.
"""
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(
yaml.safe_dump(
{k: round(v, 6) for k, v in sorted(metrics.items())},
default_flow_style=False,
sort_keys=True,
),
encoding="utf-8",
)
def read_metrics_file(path: Path) -> Dict[str, float]:
"""Read the latest metrics from a YAML file."""
if not path.is_file():
return {}
raw = yaml.safe_load(path.read_text(encoding="utf-8"))
if not isinstance(raw, dict):
return {}
return {k: float(v) for k, v in raw.items() if isinstance(v, (int, float))}
# ── History operations ───────────────────────────────────────────────
def record_check_results(
check_report: CheckReport,
config: InfospaceConfig,
root: Path,
entity_count: int,
) -> EvaluationSnapshot:
"""Record check results: save metrics file and append to history.
Args:
check_report: Output from ``run_all_checks()``.
config: The infospace configuration.
root: Project root directory.
entity_count: Number of entities checked.
Returns:
The snapshot that was recorded.
"""
metrics_dir = root / config.metrics_dir
metrics = check_report.metrics()
# Save latest metrics
write_metrics_file(metrics, metrics_dir / "metrics.yaml")
# Create and append snapshot
snapshot = snapshot_from_checks(
check_report,
entity_count=entity_count,
metadata={"source": "collection-checks"},
)
append_to_history(snapshot, metrics_dir / "history.yaml")
return snapshot
def get_history(config: InfospaceConfig, root: Path) -> List[EvaluationSnapshot]:
"""Read the full metrics history for an infospace."""
history_path = root / config.metrics_dir / "history.yaml"
if not history_path.is_file():
return []
return read_history(history_path)
def get_latest_snapshot(
config: InfospaceConfig, root: Path
) -> Optional[EvaluationSnapshot]:
"""Get the most recent snapshot from the history."""
history = get_history(config, root)
return history[-1] if history else None
def find_snapshot_by_date(
history: List[EvaluationSnapshot], date_str: str
) -> Optional[EvaluationSnapshot]:
"""Find the snapshot closest to a given date string.
Args:
history: List of snapshots in chronological order.
date_str: Date string in ``YYYY-MM-DD`` or ``YYYY-MM-DDTHH:MM:SS`` format.
Returns:
The snapshot closest to the given date, or ``None`` if history is empty.
"""
if not history:
return None
# Parse the target date
try:
if "T" in date_str:
target = datetime.fromisoformat(date_str)
else:
target = datetime.fromisoformat(date_str + "T00:00:00")
except ValueError:
return None
# Make timezone-aware if needed
if target.tzinfo is None:
target = target.replace(tzinfo=timezone.utc)
best = None
best_delta = None
for snap in history:
snap_dt = snap.created_at
if snap_dt.tzinfo is None:
snap_dt = snap_dt.replace(tzinfo=timezone.utc)
delta = abs((snap_dt - target).total_seconds())
if best_delta is None or delta < best_delta:
best = snap
best_delta = delta
return best
def metric_trend(
history: List[EvaluationSnapshot], metric_name: str
) -> List[Dict[str, Any]]:
"""Extract a single metric's values across the history.
Returns a list of ``{"date": iso_str, "value": float}`` entries
for each snapshot that contains the metric.
"""
trend: List[Dict[str, Any]] = []
for snap in history:
for m in snap.collection_metrics:
if m.name == metric_name:
trend.append({
"date": snap.created_at.isoformat(),
"value": m.value,
})
break
return trend

View File

@@ -0,0 +1,258 @@
"""
Tests for metrics history and viability tracking (S2.5).
"""
from __future__ import annotations
import json
from datetime import datetime, timezone
from pathlib import Path
import pytest
import yaml
from markitect.infospace.checks.orchestrator import CheckReport
from markitect.infospace.checks.granularity import GranularityReport
from markitect.infospace.checks.redundancy import RedundancyReport
from markitect.infospace.config import InfospaceConfig, TopicConfig, ViabilityThreshold
from markitect.infospace.evaluation import EvaluationSnapshot, MetricValue
from markitect.infospace.history import (
find_snapshot_by_date,
get_history,
get_latest_snapshot,
metric_trend,
read_metrics_file,
record_check_results,
snapshot_from_checks,
write_metrics_file,
)
# ── helpers ──────────────────────────────────────────────────────────
def _check_report() -> CheckReport:
return CheckReport(
redundancy=RedundancyReport(redundancy_ratio=0.1, entity_count=10),
granularity=GranularityReport(domain_entropy=1.5, entity_count=10),
)
def _config(tmp_path: Path) -> InfospaceConfig:
return InfospaceConfig(
topic=TopicConfig(name="Test Topic", domain="Testing"),
metrics_dir=str(tmp_path / "metrics"),
)
def _snapshot(snap_id: str, date_str: str, metrics: dict) -> EvaluationSnapshot:
return EvaluationSnapshot(
snapshot_id=snap_id,
created_at=datetime.fromisoformat(date_str).replace(tzinfo=timezone.utc),
schema_name="default",
entity_count=10,
collection_metrics=[
MetricValue(name=k, value=v) for k, v in metrics.items()
],
)
# ── snapshot_from_checks ────────────────────────────────────────────
class TestSnapshotFromChecks:
def test_creates_snapshot(self):
report = _check_report()
snap = snapshot_from_checks(report, entity_count=10)
assert snap.entity_count == 10
assert snap.snapshot_id # non-empty
assert snap.created_at is not None
def test_contains_metrics(self):
report = _check_report()
snap = snapshot_from_checks(report, entity_count=10)
metric_names = {m.name for m in snap.collection_metrics}
assert "redundancy_ratio" in metric_names
assert "granularity_entropy" in metric_names
def test_concern_labels(self):
report = _check_report()
snap = snapshot_from_checks(report, entity_count=10)
by_name = {m.name: m for m in snap.collection_metrics}
assert by_name["redundancy_ratio"].concern == "C1"
assert by_name["granularity_entropy"].concern == "C5"
def test_custom_schema(self):
report = _check_report()
snap = snapshot_from_checks(report, entity_count=5, schema_name="custom")
assert snap.schema_name == "custom"
def test_metadata(self):
report = _check_report()
snap = snapshot_from_checks(report, entity_count=5, metadata={"key": "val"})
assert snap.metadata == {"key": "val"}
def test_empty_report(self):
report = CheckReport()
snap = snapshot_from_checks(report, entity_count=0)
assert snap.collection_metrics == []
# ── write_metrics_file / read_metrics_file ──────────────────────────
class TestMetricsFileIO:
def test_round_trip(self, tmp_path):
path = tmp_path / "metrics.yaml"
metrics = {"redundancy_ratio": 0.05, "coverage_ratio": 0.85}
write_metrics_file(metrics, path)
loaded = read_metrics_file(path)
assert loaded["redundancy_ratio"] == pytest.approx(0.05)
assert loaded["coverage_ratio"] == pytest.approx(0.85)
def test_creates_parent_dirs(self, tmp_path):
path = tmp_path / "deep" / "nested" / "metrics.yaml"
write_metrics_file({"x": 1.0}, path)
assert path.is_file()
def test_read_missing_file(self, tmp_path):
path = tmp_path / "nonexistent.yaml"
assert read_metrics_file(path) == {}
def test_read_invalid_content(self, tmp_path):
path = tmp_path / "bad.yaml"
path.write_text("just a string", encoding="utf-8")
assert read_metrics_file(path) == {}
# ── record_check_results ────────────────────────────────────────────
class TestRecordCheckResults:
def test_creates_metrics_file(self, tmp_path):
cfg = _config(tmp_path)
report = _check_report()
record_check_results(report, cfg, tmp_path, entity_count=10)
metrics_path = tmp_path / cfg.metrics_dir / "metrics.yaml"
assert metrics_path.is_file()
def test_creates_history_file(self, tmp_path):
cfg = _config(tmp_path)
report = _check_report()
record_check_results(report, cfg, tmp_path, entity_count=10)
history_path = tmp_path / cfg.metrics_dir / "history.yaml"
assert history_path.is_file()
def test_appends_to_history(self, tmp_path):
cfg = _config(tmp_path)
report = _check_report()
record_check_results(report, cfg, tmp_path, entity_count=10)
record_check_results(report, cfg, tmp_path, entity_count=12)
history = get_history(cfg, tmp_path)
assert len(history) == 2
assert history[0].entity_count == 10
assert history[1].entity_count == 12
def test_returns_snapshot(self, tmp_path):
cfg = _config(tmp_path)
report = _check_report()
snap = record_check_results(report, cfg, tmp_path, entity_count=10)
assert snap.snapshot_id
assert snap.entity_count == 10
# ── get_history / get_latest_snapshot ────────────────────────────────
class TestGetHistory:
def test_empty_history(self, tmp_path):
cfg = _config(tmp_path)
assert get_history(cfg, tmp_path) == []
def test_get_latest(self, tmp_path):
cfg = _config(tmp_path)
report = _check_report()
record_check_results(report, cfg, tmp_path, entity_count=5)
record_check_results(report, cfg, tmp_path, entity_count=10)
latest = get_latest_snapshot(cfg, tmp_path)
assert latest is not None
assert latest.entity_count == 10
def test_latest_none_when_empty(self, tmp_path):
cfg = _config(tmp_path)
assert get_latest_snapshot(cfg, tmp_path) is None
# ── find_snapshot_by_date ────────────────────────────────────────────
class TestFindSnapshotByDate:
def test_finds_closest(self):
history = [
_snapshot("a", "2026-01-01T10:00:00", {"x": 1.0}),
_snapshot("b", "2026-02-15T10:00:00", {"x": 2.0}),
_snapshot("c", "2026-03-01T10:00:00", {"x": 3.0}),
]
result = find_snapshot_by_date(history, "2026-02-14")
assert result is not None
assert result.snapshot_id == "b"
def test_exact_match(self):
history = [
_snapshot("a", "2026-01-01T00:00:00", {"x": 1.0}),
_snapshot("b", "2026-02-01T00:00:00", {"x": 2.0}),
]
result = find_snapshot_by_date(history, "2026-02-01")
assert result is not None
assert result.snapshot_id == "b"
def test_empty_history(self):
assert find_snapshot_by_date([], "2026-01-01") is None
def test_invalid_date(self):
history = [_snapshot("a", "2026-01-01T00:00:00", {"x": 1.0})]
assert find_snapshot_by_date(history, "not-a-date") is None
def test_with_timestamp(self):
history = [
_snapshot("a", "2026-01-01T10:00:00", {"x": 1.0}),
_snapshot("b", "2026-01-01T14:00:00", {"x": 2.0}),
]
result = find_snapshot_by_date(history, "2026-01-01T13:00:00")
assert result is not None
assert result.snapshot_id == "b"
# ── metric_trend ─────────────────────────────────────────────────────
class TestMetricTrend:
def test_extracts_trend(self):
history = [
_snapshot("a", "2026-01-01T00:00:00", {"x": 1.0, "y": 2.0}),
_snapshot("b", "2026-02-01T00:00:00", {"x": 1.5, "y": 2.5}),
]
trend = metric_trend(history, "x")
assert len(trend) == 2
assert trend[0]["value"] == 1.0
assert trend[1]["value"] == 1.5
def test_missing_metric(self):
history = [
_snapshot("a", "2026-01-01T00:00:00", {"x": 1.0}),
]
assert metric_trend(history, "nonexistent") == []
def test_empty_history(self):
assert metric_trend([], "x") == []
def test_partial_presence(self):
history = [
_snapshot("a", "2026-01-01T00:00:00", {"x": 1.0}),
_snapshot("b", "2026-02-01T00:00:00", {"y": 2.0}), # x missing
_snapshot("c", "2026-03-01T00:00:00", {"x": 3.0}),
]
trend = metric_trend(history, "x")
assert len(trend) == 2
assert trend[0]["value"] == 1.0
assert trend[1]["value"] == 3.0