diff --git a/markitect/infospace/cli.py b/markitect/infospace/cli.py index 2af1d2dd..8f71d8c0 100644 --- a/markitect/infospace/cli.py +++ b/markitect/infospace/cli.py @@ -331,3 +331,91 @@ def check(config_path: Optional[str], concerns: tuple, as_json: bool): click.echo("Metrics summary:") for k, v in sorted(m.items()): click.echo(f" {k}: {v:.4f}") + + # Record to history + if m: + from markitect.infospace.history import record_check_results + snap = record_check_results(report, cfg, root, entity_count=len(entity_list)) + if not as_json: + click.echo(f"\nRecorded snapshot {snap.snapshot_id}") + + +# ── history ───────────────────────────────────────────────────────── + + +@infospace_commands.command() +@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.") +@click.option("--metric", default=None, help="Show trend for a specific metric.") +@click.option("--json", "as_json", is_flag=True, help="Output as JSON.") +def history(config_path: Optional[str], metric: Optional[str], as_json: bool): + """Show metrics history — snapshots over time.""" + cfg, cfg_path = _load_config_or_exit(config_path) + root = cfg_path.parent + + from markitect.infospace.history import get_history, metric_trend + + snapshots = get_history(cfg, root) + if not snapshots: + click.echo("No history found. Run 'markitect infospace check' first.") + return + + if metric: + trend = metric_trend(snapshots, metric) + if not trend: + click.echo(f"No data for metric '{metric}'.") + return + if as_json: + import json + click.echo(json.dumps(trend, indent=2)) + else: + click.echo(f"Trend: {metric}\n") + for entry in trend: + click.echo(f" {entry['date'][:19]} {entry['value']:.4f}") + return + + if as_json: + import json + click.echo(json.dumps([s.to_dict() for s in snapshots], indent=2, default=str)) + return + + click.echo(f"History: {len(snapshots)} snapshot(s)\n") + click.echo(f"{'#':<4} {'Date':<20} {'Entities':>8} {'Metrics':>8}") + click.echo("-" * 42) + for i, snap in enumerate(snapshots, 1): + date_str = snap.created_at.isoformat()[:19] + n_metrics = len(snap.collection_metrics) + click.echo(f"{i:<4} {date_str:<20} {snap.entity_count:>8} {n_metrics:>8}") + + +@infospace_commands.command(name="history-diff") +@click.argument("date_a") +@click.argument("date_b") +@click.option("--config", "config_path", default=None, help="Path to infospace.yaml.") +def history_diff(date_a: str, date_b: str, config_path: Optional[str]): + """Compare two history snapshots by date (YYYY-MM-DD).""" + cfg, cfg_path = _load_config_or_exit(config_path) + root = cfg_path.parent + + from markitect.infospace.history import find_snapshot_by_date, get_history + from markitect.infospace.evaluation_io import diff_snapshots + + snapshots = get_history(cfg, root) + if len(snapshots) < 2: + click.echo("Need at least two snapshots to diff.") + return + + snap_a = find_snapshot_by_date(snapshots, date_a) + snap_b = find_snapshot_by_date(snapshots, date_b) + + if snap_a is None: + click.echo(f"No snapshot found near '{date_a}'.") + return + if snap_b is None: + click.echo(f"No snapshot found near '{date_b}'.") + return + if snap_a.snapshot_id == snap_b.snapshot_id: + click.echo("Both dates resolve to the same snapshot.") + return + + diff = diff_snapshots(snap_a, snap_b) + click.echo(diff.summary()) diff --git a/markitect/infospace/history.py b/markitect/infospace/history.py new file mode 100644 index 00000000..12854d01 --- /dev/null +++ b/markitect/infospace/history.py @@ -0,0 +1,223 @@ +""" +Metrics history and viability tracking. + +Converts check results into timestamped snapshots and maintains a +persistent history file for trend analysis. +""" + +from __future__ import annotations + +import uuid +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional + +import yaml + +from markitect.infospace.checks.orchestrator import CheckReport +from markitect.infospace.config import InfospaceConfig +from markitect.infospace.evaluation import EvaluationSnapshot, MetricValue +from markitect.infospace.evaluation_io import ( + append_to_history, + diff_snapshots, + read_history, +) +from markitect.infospace.state import ViabilityResult + + +# ── Snapshot creation ──────────────────────────────────────────────── + + +def _concern_for_metric(name: str) -> str: + """Map a metric name to its concern label.""" + mapping = { + "redundancy_ratio": "C1", + "coverage_ratio": "C2", + "coherence_components": "C3", + "modularity": "C3", + "consistency_cycles": "C4", + "granularity_entropy": "C5", + } + return mapping.get(name, "") + + +def snapshot_from_checks( + check_report: CheckReport, + entity_count: int, + schema_name: str = "default", + metadata: Optional[Dict[str, Any]] = None, +) -> EvaluationSnapshot: + """Create an :class:`EvaluationSnapshot` from collection check results. + + Args: + check_report: Output from :func:`run_all_checks`. + entity_count: Number of entities checked. + schema_name: Schema identifier for the snapshot. + metadata: Optional extra metadata to attach. + + Returns: + A snapshot containing the check metrics as collection_metrics. + """ + metrics_dict = check_report.metrics() + collection_metrics = [ + MetricValue( + name=name, + value=value, + concern=_concern_for_metric(name), + ) + for name, value in sorted(metrics_dict.items()) + ] + + return EvaluationSnapshot( + snapshot_id=str(uuid.uuid4())[:8], + created_at=datetime.now(timezone.utc), + schema_name=schema_name, + entity_count=entity_count, + collection_metrics=collection_metrics, + metadata=metadata or {}, + ) + + +# ── Metrics file I/O ──────────────────────────────────────────────── + + +def write_metrics_file(metrics: Dict[str, float], path: Path) -> None: + """Write the latest metrics to a simple YAML file. + + This file is used by ``markitect infospace viability`` for quick + threshold checking. + """ + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text( + yaml.safe_dump( + {k: round(v, 6) for k, v in sorted(metrics.items())}, + default_flow_style=False, + sort_keys=True, + ), + encoding="utf-8", + ) + + +def read_metrics_file(path: Path) -> Dict[str, float]: + """Read the latest metrics from a YAML file.""" + if not path.is_file(): + return {} + raw = yaml.safe_load(path.read_text(encoding="utf-8")) + if not isinstance(raw, dict): + return {} + return {k: float(v) for k, v in raw.items() if isinstance(v, (int, float))} + + +# ── History operations ─────────────────────────────────────────────── + + +def record_check_results( + check_report: CheckReport, + config: InfospaceConfig, + root: Path, + entity_count: int, +) -> EvaluationSnapshot: + """Record check results: save metrics file and append to history. + + Args: + check_report: Output from ``run_all_checks()``. + config: The infospace configuration. + root: Project root directory. + entity_count: Number of entities checked. + + Returns: + The snapshot that was recorded. + """ + metrics_dir = root / config.metrics_dir + metrics = check_report.metrics() + + # Save latest metrics + write_metrics_file(metrics, metrics_dir / "metrics.yaml") + + # Create and append snapshot + snapshot = snapshot_from_checks( + check_report, + entity_count=entity_count, + metadata={"source": "collection-checks"}, + ) + append_to_history(snapshot, metrics_dir / "history.yaml") + + return snapshot + + +def get_history(config: InfospaceConfig, root: Path) -> List[EvaluationSnapshot]: + """Read the full metrics history for an infospace.""" + history_path = root / config.metrics_dir / "history.yaml" + if not history_path.is_file(): + return [] + return read_history(history_path) + + +def get_latest_snapshot( + config: InfospaceConfig, root: Path +) -> Optional[EvaluationSnapshot]: + """Get the most recent snapshot from the history.""" + history = get_history(config, root) + return history[-1] if history else None + + +def find_snapshot_by_date( + history: List[EvaluationSnapshot], date_str: str +) -> Optional[EvaluationSnapshot]: + """Find the snapshot closest to a given date string. + + Args: + history: List of snapshots in chronological order. + date_str: Date string in ``YYYY-MM-DD`` or ``YYYY-MM-DDTHH:MM:SS`` format. + + Returns: + The snapshot closest to the given date, or ``None`` if history is empty. + """ + if not history: + return None + + # Parse the target date + try: + if "T" in date_str: + target = datetime.fromisoformat(date_str) + else: + target = datetime.fromisoformat(date_str + "T00:00:00") + except ValueError: + return None + + # Make timezone-aware if needed + if target.tzinfo is None: + target = target.replace(tzinfo=timezone.utc) + + best = None + best_delta = None + for snap in history: + snap_dt = snap.created_at + if snap_dt.tzinfo is None: + snap_dt = snap_dt.replace(tzinfo=timezone.utc) + delta = abs((snap_dt - target).total_seconds()) + if best_delta is None or delta < best_delta: + best = snap + best_delta = delta + + return best + + +def metric_trend( + history: List[EvaluationSnapshot], metric_name: str +) -> List[Dict[str, Any]]: + """Extract a single metric's values across the history. + + Returns a list of ``{"date": iso_str, "value": float}`` entries + for each snapshot that contains the metric. + """ + trend: List[Dict[str, Any]] = [] + for snap in history: + for m in snap.collection_metrics: + if m.name == metric_name: + trend.append({ + "date": snap.created_at.isoformat(), + "value": m.value, + }) + break + return trend diff --git a/tests/unit/infospace/test_history.py b/tests/unit/infospace/test_history.py new file mode 100644 index 00000000..3fe466c3 --- /dev/null +++ b/tests/unit/infospace/test_history.py @@ -0,0 +1,258 @@ +""" +Tests for metrics history and viability tracking (S2.5). +""" + +from __future__ import annotations + +import json +from datetime import datetime, timezone +from pathlib import Path + +import pytest +import yaml + +from markitect.infospace.checks.orchestrator import CheckReport +from markitect.infospace.checks.granularity import GranularityReport +from markitect.infospace.checks.redundancy import RedundancyReport +from markitect.infospace.config import InfospaceConfig, TopicConfig, ViabilityThreshold +from markitect.infospace.evaluation import EvaluationSnapshot, MetricValue +from markitect.infospace.history import ( + find_snapshot_by_date, + get_history, + get_latest_snapshot, + metric_trend, + read_metrics_file, + record_check_results, + snapshot_from_checks, + write_metrics_file, +) + + +# ── helpers ────────────────────────────────────────────────────────── + + +def _check_report() -> CheckReport: + return CheckReport( + redundancy=RedundancyReport(redundancy_ratio=0.1, entity_count=10), + granularity=GranularityReport(domain_entropy=1.5, entity_count=10), + ) + + +def _config(tmp_path: Path) -> InfospaceConfig: + return InfospaceConfig( + topic=TopicConfig(name="Test Topic", domain="Testing"), + metrics_dir=str(tmp_path / "metrics"), + ) + + +def _snapshot(snap_id: str, date_str: str, metrics: dict) -> EvaluationSnapshot: + return EvaluationSnapshot( + snapshot_id=snap_id, + created_at=datetime.fromisoformat(date_str).replace(tzinfo=timezone.utc), + schema_name="default", + entity_count=10, + collection_metrics=[ + MetricValue(name=k, value=v) for k, v in metrics.items() + ], + ) + + +# ── snapshot_from_checks ──────────────────────────────────────────── + + +class TestSnapshotFromChecks: + def test_creates_snapshot(self): + report = _check_report() + snap = snapshot_from_checks(report, entity_count=10) + assert snap.entity_count == 10 + assert snap.snapshot_id # non-empty + assert snap.created_at is not None + + def test_contains_metrics(self): + report = _check_report() + snap = snapshot_from_checks(report, entity_count=10) + metric_names = {m.name for m in snap.collection_metrics} + assert "redundancy_ratio" in metric_names + assert "granularity_entropy" in metric_names + + def test_concern_labels(self): + report = _check_report() + snap = snapshot_from_checks(report, entity_count=10) + by_name = {m.name: m for m in snap.collection_metrics} + assert by_name["redundancy_ratio"].concern == "C1" + assert by_name["granularity_entropy"].concern == "C5" + + def test_custom_schema(self): + report = _check_report() + snap = snapshot_from_checks(report, entity_count=5, schema_name="custom") + assert snap.schema_name == "custom" + + def test_metadata(self): + report = _check_report() + snap = snapshot_from_checks(report, entity_count=5, metadata={"key": "val"}) + assert snap.metadata == {"key": "val"} + + def test_empty_report(self): + report = CheckReport() + snap = snapshot_from_checks(report, entity_count=0) + assert snap.collection_metrics == [] + + +# ── write_metrics_file / read_metrics_file ────────────────────────── + + +class TestMetricsFileIO: + def test_round_trip(self, tmp_path): + path = tmp_path / "metrics.yaml" + metrics = {"redundancy_ratio": 0.05, "coverage_ratio": 0.85} + write_metrics_file(metrics, path) + loaded = read_metrics_file(path) + assert loaded["redundancy_ratio"] == pytest.approx(0.05) + assert loaded["coverage_ratio"] == pytest.approx(0.85) + + def test_creates_parent_dirs(self, tmp_path): + path = tmp_path / "deep" / "nested" / "metrics.yaml" + write_metrics_file({"x": 1.0}, path) + assert path.is_file() + + def test_read_missing_file(self, tmp_path): + path = tmp_path / "nonexistent.yaml" + assert read_metrics_file(path) == {} + + def test_read_invalid_content(self, tmp_path): + path = tmp_path / "bad.yaml" + path.write_text("just a string", encoding="utf-8") + assert read_metrics_file(path) == {} + + +# ── record_check_results ──────────────────────────────────────────── + + +class TestRecordCheckResults: + def test_creates_metrics_file(self, tmp_path): + cfg = _config(tmp_path) + report = _check_report() + record_check_results(report, cfg, tmp_path, entity_count=10) + metrics_path = tmp_path / cfg.metrics_dir / "metrics.yaml" + assert metrics_path.is_file() + + def test_creates_history_file(self, tmp_path): + cfg = _config(tmp_path) + report = _check_report() + record_check_results(report, cfg, tmp_path, entity_count=10) + history_path = tmp_path / cfg.metrics_dir / "history.yaml" + assert history_path.is_file() + + def test_appends_to_history(self, tmp_path): + cfg = _config(tmp_path) + report = _check_report() + record_check_results(report, cfg, tmp_path, entity_count=10) + record_check_results(report, cfg, tmp_path, entity_count=12) + history = get_history(cfg, tmp_path) + assert len(history) == 2 + assert history[0].entity_count == 10 + assert history[1].entity_count == 12 + + def test_returns_snapshot(self, tmp_path): + cfg = _config(tmp_path) + report = _check_report() + snap = record_check_results(report, cfg, tmp_path, entity_count=10) + assert snap.snapshot_id + assert snap.entity_count == 10 + + +# ── get_history / get_latest_snapshot ──────────────────────────────── + + +class TestGetHistory: + def test_empty_history(self, tmp_path): + cfg = _config(tmp_path) + assert get_history(cfg, tmp_path) == [] + + def test_get_latest(self, tmp_path): + cfg = _config(tmp_path) + report = _check_report() + record_check_results(report, cfg, tmp_path, entity_count=5) + record_check_results(report, cfg, tmp_path, entity_count=10) + latest = get_latest_snapshot(cfg, tmp_path) + assert latest is not None + assert latest.entity_count == 10 + + def test_latest_none_when_empty(self, tmp_path): + cfg = _config(tmp_path) + assert get_latest_snapshot(cfg, tmp_path) is None + + +# ── find_snapshot_by_date ──────────────────────────────────────────── + + +class TestFindSnapshotByDate: + def test_finds_closest(self): + history = [ + _snapshot("a", "2026-01-01T10:00:00", {"x": 1.0}), + _snapshot("b", "2026-02-15T10:00:00", {"x": 2.0}), + _snapshot("c", "2026-03-01T10:00:00", {"x": 3.0}), + ] + result = find_snapshot_by_date(history, "2026-02-14") + assert result is not None + assert result.snapshot_id == "b" + + def test_exact_match(self): + history = [ + _snapshot("a", "2026-01-01T00:00:00", {"x": 1.0}), + _snapshot("b", "2026-02-01T00:00:00", {"x": 2.0}), + ] + result = find_snapshot_by_date(history, "2026-02-01") + assert result is not None + assert result.snapshot_id == "b" + + def test_empty_history(self): + assert find_snapshot_by_date([], "2026-01-01") is None + + def test_invalid_date(self): + history = [_snapshot("a", "2026-01-01T00:00:00", {"x": 1.0})] + assert find_snapshot_by_date(history, "not-a-date") is None + + def test_with_timestamp(self): + history = [ + _snapshot("a", "2026-01-01T10:00:00", {"x": 1.0}), + _snapshot("b", "2026-01-01T14:00:00", {"x": 2.0}), + ] + result = find_snapshot_by_date(history, "2026-01-01T13:00:00") + assert result is not None + assert result.snapshot_id == "b" + + +# ── metric_trend ───────────────────────────────────────────────────── + + +class TestMetricTrend: + def test_extracts_trend(self): + history = [ + _snapshot("a", "2026-01-01T00:00:00", {"x": 1.0, "y": 2.0}), + _snapshot("b", "2026-02-01T00:00:00", {"x": 1.5, "y": 2.5}), + ] + trend = metric_trend(history, "x") + assert len(trend) == 2 + assert trend[0]["value"] == 1.0 + assert trend[1]["value"] == 1.5 + + def test_missing_metric(self): + history = [ + _snapshot("a", "2026-01-01T00:00:00", {"x": 1.0}), + ] + assert metric_trend(history, "nonexistent") == [] + + def test_empty_history(self): + assert metric_trend([], "x") == [] + + def test_partial_presence(self): + history = [ + _snapshot("a", "2026-01-01T00:00:00", {"x": 1.0}), + _snapshot("b", "2026-02-01T00:00:00", {"y": 2.0}), # x missing + _snapshot("c", "2026-03-01T00:00:00", {"x": 3.0}), + ] + trend = metric_trend(history, "x") + assert len(trend) == 2 + assert trend[0]["value"] == 1.0 + assert trend[1]["value"] == 3.0