From 5cd3da3166c6474a4e5ea4c9840314134a2ab1b5 Mon Sep 17 00:00:00 2001 From: tegwick Date: Tue, 16 Jun 2026 01:35:27 +0200 Subject: [PATCH] Implement MetricsStore for project-scoped agent metrics. Add ADR-004 storage layer with append-only executions, summary regeneration, idempotency keys, and retention pruning. Wire memory init to scaffold .kaizen/metrics/ by default and add unit tests. --- src/kaizen_agentic/__init__.py | 2 + src/kaizen_agentic/cli.py | 12 +- src/kaizen_agentic/metrics.py | 208 ++++++++++++++++++ tests/test_metrics.py | 107 +++++++++ ...kaizen-agentic-WP-0003-measurement-loop.md | 6 +- 5 files changed, 331 insertions(+), 4 deletions(-) create mode 100644 src/kaizen_agentic/metrics.py create mode 100644 tests/test_metrics.py diff --git a/src/kaizen_agentic/__init__.py b/src/kaizen_agentic/__init__.py index 48dfc2b..a704238 100644 --- a/src/kaizen_agentic/__init__.py +++ b/src/kaizen_agentic/__init__.py @@ -16,6 +16,7 @@ from .core import Agent, AgentConfig from .optimization import OptimizationLoop, PerformanceMetrics from .registry import AgentRegistry, AgentDefinition, AgentCategory from .installer import AgentInstaller, ProjectInitializer, InstallationConfig +from .metrics import MetricsStore __all__ = [ "Agent", @@ -28,4 +29,5 @@ __all__ = [ "AgentInstaller", "ProjectInitializer", "InstallationConfig", + "MetricsStore", ] diff --git a/src/kaizen_agentic/cli.py b/src/kaizen_agentic/cli.py index 980e049..ef80a65 100644 --- a/src/kaizen_agentic/cli.py +++ b/src/kaizen_agentic/cli.py @@ -10,6 +10,7 @@ from typing import List, Optional from .registry import AgentRegistry, AgentCategory from .installer import AgentInstaller, ProjectInitializer, InstallationConfig +from .metrics import MetricsStore def safe_cli_wrapper(): @@ -781,7 +782,12 @@ def memory_show(agent_name: str, target: str): @memory.command("init") @click.argument("agent_name") @click.option("--target", "-t", default=".", help="Project root (default: current)") -def memory_init(agent_name: str, target: str): +@click.option( + "--no-metrics", + is_flag=True, + help="Skip scaffolding .kaizen/metrics// (default: create metrics dir)", +) +def memory_init(agent_name: str, target: str, no_metrics: bool): """Scaffold an empty memory file for an agent.""" memory_path = _memory_path(target, agent_name) @@ -820,6 +826,10 @@ session_count: 0 memory_path.write_text(content) click.echo(f"Initialized memory for '{agent_name}': {memory_path}") + if not no_metrics: + metrics_dir = MetricsStore(Path(target), agent_name).scaffold() + click.echo(f"Initialized metrics for '{agent_name}': {metrics_dir}") + # For agents with protocols, note the protocol location registry = _get_registry() protocols_dir = registry.agents_dir / "protocols" / agent_name diff --git a/src/kaizen_agentic/metrics.py b/src/kaizen_agentic/metrics.py new file mode 100644 index 0000000..f549220 --- /dev/null +++ b/src/kaizen_agentic/metrics.py @@ -0,0 +1,208 @@ +"""Project-scoped agent metrics storage (ADR-004).""" + +from __future__ import annotations + +import json +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional + + +DEFAULT_RETENTION_DAYS = 180 + + +def _utc_now_iso() -> str: + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + +def _parse_timestamp(value: str) -> datetime: + normalized = value.replace("Z", "+00:00") + return datetime.fromisoformat(normalized) + + +def _trend_direction(recent: List[float], prior: List[float]) -> str: + if not recent: + return "unknown" + if not prior: + return "stable" + recent_avg = sum(recent) / len(recent) + prior_avg = sum(prior) / len(prior) + delta = recent_avg - prior_avg + if abs(delta) < 0.05: + return "stable" + return "up" if delta > 0 else "down" + + +@dataclass +class MetricsStore: + """Append-only per-agent execution metrics under .kaizen/metrics/.""" + + project_root: Path + agent_name: str + retention_days: int = DEFAULT_RETENTION_DAYS + + def __post_init__(self) -> None: + self.project_root = Path(self.project_root).resolve() + self.agent_dir = self.project_root / ".kaizen" / "metrics" / self.agent_name + self.executions_path = self.agent_dir / "executions.jsonl" + self.summary_path = self.agent_dir / "summary.json" + + @classmethod + def list_agents(cls, project_root: Path) -> List[str]: + metrics_root = Path(project_root).resolve() / ".kaizen" / "metrics" + if not metrics_root.exists(): + return [] + agents = [] + for child in sorted(metrics_root.iterdir()): + if child.is_dir() and (child / "executions.jsonl").exists(): + agents.append(child.name) + return agents + + def scaffold(self) -> Path: + """Create metrics directory for this agent.""" + self.agent_dir.mkdir(parents=True, exist_ok=True) + if not self.executions_path.exists(): + self.executions_path.write_text("", encoding="utf-8") + return self.agent_dir + + def append( + self, + record: Dict[str, Any], + *, + idempotency_key: Optional[str] = None, + ) -> bool: + """Append an execution record. Returns False if idempotency_key duplicates.""" + self.scaffold() + + payload = dict(record) + payload.setdefault("agent", self.agent_name) + payload.setdefault("timestamp", _utc_now_iso()) + + if idempotency_key is not None: + if self._has_idempotency_key(idempotency_key): + return False + payload["idempotency_key"] = idempotency_key + + if "success" not in payload: + raise ValueError("execution record requires 'success' field") + + with self.executions_path.open("a", encoding="utf-8") as handle: + handle.write(json.dumps(payload, sort_keys=True)) + handle.write("\n") + + self.prune() + self.write_summary() + return True + + def read_executions(self) -> List[Dict[str, Any]]: + if not self.executions_path.exists(): + return [] + records: List[Dict[str, Any]] = [] + with self.executions_path.open(encoding="utf-8") as handle: + for line in handle: + line = line.strip() + if line: + records.append(json.loads(line)) + return records + + def summarise(self) -> Dict[str, Any]: + records = self.read_executions() + if not records: + return { + "agent": self.agent_name, + "execution_count": 0, + "success_rate": 0.0, + "avg_quality_score": 0.0, + "avg_execution_time_s": 0.0, + "last_execution": None, + "trend": { + "success_rate": "unknown", + "quality_score": "unknown", + }, + } + + successes = [bool(r["success"]) for r in records] + success_rate = sum(successes) / len(successes) + + quality_scores = [ + float(r["quality_score"]) + for r in records + if r.get("quality_score") is not None + ] + execution_times = [ + float(r["execution_time_s"]) + for r in records + if r.get("execution_time_s") is not None + ] + + window = 5 + recent_success = [1.0 if s else 0.0 for s in successes[-window:]] + prior_success = [1.0 if s else 0.0 for s in successes[:-window][-window:]] + recent_quality = quality_scores[-window:] + prior_quality = quality_scores[:-window][-window:] if len(quality_scores) > window else [] + + return { + "agent": self.agent_name, + "execution_count": len(records), + "success_rate": round(success_rate, 3), + "avg_quality_score": round( + sum(quality_scores) / len(quality_scores) if quality_scores else 0.0, + 3, + ), + "avg_execution_time_s": round( + sum(execution_times) / len(execution_times) if execution_times else 0.0, + 3, + ), + "last_execution": records[-1]["timestamp"], + "trend": { + "success_rate": _trend_direction(recent_success, prior_success), + "quality_score": _trend_direction(recent_quality, prior_quality), + }, + } + + def write_summary(self) -> Dict[str, Any]: + summary = self.summarise() + self.agent_dir.mkdir(parents=True, exist_ok=True) + self.summary_path.write_text( + json.dumps(summary, indent=2, sort_keys=True) + "\n", + encoding="utf-8", + ) + return summary + + def read_summary(self) -> Optional[Dict[str, Any]]: + if not self.summary_path.exists(): + return None + return json.loads(self.summary_path.read_text(encoding="utf-8")) + + def prune(self) -> int: + """Drop execution records older than retention_days. Returns removed count.""" + if not self.executions_path.exists(): + return 0 + + cutoff = datetime.now(timezone.utc) - timedelta(days=self.retention_days) + kept: List[Dict[str, Any]] = [] + removed = 0 + + for record in self.read_executions(): + try: + ts = _parse_timestamp(record["timestamp"]) + except (KeyError, ValueError): + kept.append(record) + continue + if ts >= cutoff: + kept.append(record) + else: + removed += 1 + + if removed: + with self.executions_path.open("w", encoding="utf-8") as handle: + for record in kept: + handle.write(json.dumps(record, sort_keys=True)) + handle.write("\n") + self.write_summary() + + return removed + + def _has_idempotency_key(self, key: str) -> bool: + return any(r.get("idempotency_key") == key for r in self.read_executions()) \ No newline at end of file diff --git a/tests/test_metrics.py b/tests/test_metrics.py new file mode 100644 index 0000000..164f331 --- /dev/null +++ b/tests/test_metrics.py @@ -0,0 +1,107 @@ +"""Tests for project-scoped metrics storage (ADR-004).""" + +from __future__ import annotations + +import json +from datetime import datetime, timedelta, timezone +from pathlib import Path + +import pytest + +from kaizen_agentic.metrics import MetricsStore, DEFAULT_RETENTION_DAYS + + +def _old_timestamp(days: int) -> str: + dt = datetime.now(timezone.utc) - timedelta(days=days) + return dt.strftime("%Y-%m-%dT%H:%M:%SZ") + + +@pytest.fixture +def project_dir(tmp_path: Path) -> Path: + root = tmp_path / "demo-project" + root.mkdir() + return root + + +class TestMetricsStore: + def test_scaffold_creates_directory_and_empty_executions(self, project_dir: Path): + store = MetricsStore(project_dir, "tdd-workflow") + path = store.scaffold() + + assert path == project_dir / ".kaizen" / "metrics" / "tdd-workflow" + assert store.executions_path.exists() + assert store.executions_path.read_text() == "" + + def test_append_and_read_executions(self, project_dir: Path): + store = MetricsStore(project_dir, "tdd-workflow") + + assert store.append({"success": True, "quality_score": 0.9}) is True + assert store.append({"success": False, "execution_time_s": 12.5}) is True + + records = store.read_executions() + assert len(records) == 2 + assert records[0]["agent"] == "tdd-workflow" + assert records[0]["success"] is True + assert "timestamp" in records[0] + + def test_idempotency_key_rejects_duplicate(self, project_dir: Path): + store = MetricsStore(project_dir, "coach") + + assert store.append({"success": True}, idempotency_key="sess-1") is True + assert store.append({"success": True}, idempotency_key="sess-1") is False + assert len(store.read_executions()) == 1 + + def test_write_summary_regenerates_summary_json(self, project_dir: Path): + store = MetricsStore(project_dir, "tdd-workflow") + store.append({"success": True, "quality_score": 0.8, "execution_time_s": 10}) + store.append({"success": True, "quality_score": 1.0, "execution_time_s": 20}) + + summary = store.write_summary() + + assert summary["execution_count"] == 2 + assert summary["success_rate"] == 1.0 + assert summary["avg_quality_score"] == 0.9 + assert summary["avg_execution_time_s"] == 15.0 + assert store.summary_path.exists() + on_disk = json.loads(store.summary_path.read_text()) + assert on_disk["execution_count"] == 2 + + def test_prune_removes_expired_records(self, project_dir: Path): + store = MetricsStore(project_dir, "tdd-workflow", retention_days=30) + store.scaffold() + + old = { + "timestamp": _old_timestamp(45), + "agent": "tdd-workflow", + "success": False, + } + recent = { + "timestamp": _old_timestamp(1), + "agent": "tdd-workflow", + "success": True, + "quality_score": 0.7, + } + with store.executions_path.open("w", encoding="utf-8") as handle: + handle.write(json.dumps(old) + "\n") + handle.write(json.dumps(recent) + "\n") + + removed = store.prune() + + assert removed == 1 + records = store.read_executions() + assert len(records) == 1 + assert records[0]["success"] is True + summary = store.read_summary() + assert summary is not None + assert summary["execution_count"] == 1 + + def test_list_agents_with_metrics(self, project_dir: Path): + MetricsStore(project_dir, "tdd-workflow").scaffold() + MetricsStore(project_dir, "coach").append({"success": True}) + + agents = MetricsStore.list_agents(project_dir) + + assert agents == ["coach", "tdd-workflow"] + + def test_default_retention_matches_adr(self): + assert DEFAULT_RETENTION_DAYS == 180 \ No newline at end of file diff --git a/workplans/kaizen-agentic-WP-0003-measurement-loop.md b/workplans/kaizen-agentic-WP-0003-measurement-loop.md index 4a9667a..9fd5341 100644 --- a/workplans/kaizen-agentic-WP-0003-measurement-loop.md +++ b/workplans/kaizen-agentic-WP-0003-measurement-loop.md @@ -85,9 +85,9 @@ Optimizer-specific aggregates (per `wiki/AgentKaizenOptimizer.md`): ### Tasks - [x] T01 — Write ADR-004: project metrics convention (location, schema, lifecycle, retention, Helix Forge correlation) -- [ ] T02 — Implement `MetricsStore` in `src/kaizen_agentic/metrics.py` (append, read, summarise, prune by retention) -- [ ] T03 — Add `memory init` hook to scaffold `.kaizen/metrics//` alongside memory (optional flag `--no-metrics`) -- [ ] T04 — Unit tests for `MetricsStore` (append idempotency key, summary regeneration, retention prune) +- [x] T02 — Implement `MetricsStore` in `src/kaizen_agentic/metrics.py` (append, read, summarise, prune by retention) +- [x] T03 — Add `memory init` hook to scaffold `.kaizen/metrics//` alongside memory (optional flag `--no-metrics`) +- [x] T04 — Unit tests for `MetricsStore` (append idempotency key, summary regeneration, retention prune) ### Definition of done