Implement MetricsStore for project-scoped agent metrics.

Add ADR-004 storage layer with append-only executions, summary
regeneration, idempotency keys, and retention pruning. Wire memory init
to scaffold .kaizen/metrics/ by default and add unit tests.
This commit is contained in:
2026-06-16 01:35:27 +02:00
parent bd74d7d122
commit 5cd3da3166
5 changed files with 331 additions and 4 deletions

View File

@@ -16,6 +16,7 @@ from .core import Agent, AgentConfig
from .optimization import OptimizationLoop, PerformanceMetrics
from .registry import AgentRegistry, AgentDefinition, AgentCategory
from .installer import AgentInstaller, ProjectInitializer, InstallationConfig
from .metrics import MetricsStore
__all__ = [
"Agent",
@@ -28,4 +29,5 @@ __all__ = [
"AgentInstaller",
"ProjectInitializer",
"InstallationConfig",
"MetricsStore",
]

View File

@@ -10,6 +10,7 @@ from typing import List, Optional
from .registry import AgentRegistry, AgentCategory
from .installer import AgentInstaller, ProjectInitializer, InstallationConfig
from .metrics import MetricsStore
def safe_cli_wrapper():
@@ -781,7 +782,12 @@ def memory_show(agent_name: str, target: str):
@memory.command("init")
@click.argument("agent_name")
@click.option("--target", "-t", default=".", help="Project root (default: current)")
def memory_init(agent_name: str, target: str):
@click.option(
"--no-metrics",
is_flag=True,
help="Skip scaffolding .kaizen/metrics/<agent>/ (default: create metrics dir)",
)
def memory_init(agent_name: str, target: str, no_metrics: bool):
"""Scaffold an empty memory file for an agent."""
memory_path = _memory_path(target, agent_name)
@@ -820,6 +826,10 @@ session_count: 0
memory_path.write_text(content)
click.echo(f"Initialized memory for '{agent_name}': {memory_path}")
if not no_metrics:
metrics_dir = MetricsStore(Path(target), agent_name).scaffold()
click.echo(f"Initialized metrics for '{agent_name}': {metrics_dir}")
# For agents with protocols, note the protocol location
registry = _get_registry()
protocols_dir = registry.agents_dir / "protocols" / agent_name

View File

@@ -0,0 +1,208 @@
"""Project-scoped agent metrics storage (ADR-004)."""
from __future__ import annotations
import json
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
DEFAULT_RETENTION_DAYS = 180
def _utc_now_iso() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def _parse_timestamp(value: str) -> datetime:
normalized = value.replace("Z", "+00:00")
return datetime.fromisoformat(normalized)
def _trend_direction(recent: List[float], prior: List[float]) -> str:
if not recent:
return "unknown"
if not prior:
return "stable"
recent_avg = sum(recent) / len(recent)
prior_avg = sum(prior) / len(prior)
delta = recent_avg - prior_avg
if abs(delta) < 0.05:
return "stable"
return "up" if delta > 0 else "down"
@dataclass
class MetricsStore:
"""Append-only per-agent execution metrics under .kaizen/metrics/."""
project_root: Path
agent_name: str
retention_days: int = DEFAULT_RETENTION_DAYS
def __post_init__(self) -> None:
self.project_root = Path(self.project_root).resolve()
self.agent_dir = self.project_root / ".kaizen" / "metrics" / self.agent_name
self.executions_path = self.agent_dir / "executions.jsonl"
self.summary_path = self.agent_dir / "summary.json"
@classmethod
def list_agents(cls, project_root: Path) -> List[str]:
metrics_root = Path(project_root).resolve() / ".kaizen" / "metrics"
if not metrics_root.exists():
return []
agents = []
for child in sorted(metrics_root.iterdir()):
if child.is_dir() and (child / "executions.jsonl").exists():
agents.append(child.name)
return agents
def scaffold(self) -> Path:
"""Create metrics directory for this agent."""
self.agent_dir.mkdir(parents=True, exist_ok=True)
if not self.executions_path.exists():
self.executions_path.write_text("", encoding="utf-8")
return self.agent_dir
def append(
self,
record: Dict[str, Any],
*,
idempotency_key: Optional[str] = None,
) -> bool:
"""Append an execution record. Returns False if idempotency_key duplicates."""
self.scaffold()
payload = dict(record)
payload.setdefault("agent", self.agent_name)
payload.setdefault("timestamp", _utc_now_iso())
if idempotency_key is not None:
if self._has_idempotency_key(idempotency_key):
return False
payload["idempotency_key"] = idempotency_key
if "success" not in payload:
raise ValueError("execution record requires 'success' field")
with self.executions_path.open("a", encoding="utf-8") as handle:
handle.write(json.dumps(payload, sort_keys=True))
handle.write("\n")
self.prune()
self.write_summary()
return True
def read_executions(self) -> List[Dict[str, Any]]:
if not self.executions_path.exists():
return []
records: List[Dict[str, Any]] = []
with self.executions_path.open(encoding="utf-8") as handle:
for line in handle:
line = line.strip()
if line:
records.append(json.loads(line))
return records
def summarise(self) -> Dict[str, Any]:
records = self.read_executions()
if not records:
return {
"agent": self.agent_name,
"execution_count": 0,
"success_rate": 0.0,
"avg_quality_score": 0.0,
"avg_execution_time_s": 0.0,
"last_execution": None,
"trend": {
"success_rate": "unknown",
"quality_score": "unknown",
},
}
successes = [bool(r["success"]) for r in records]
success_rate = sum(successes) / len(successes)
quality_scores = [
float(r["quality_score"])
for r in records
if r.get("quality_score") is not None
]
execution_times = [
float(r["execution_time_s"])
for r in records
if r.get("execution_time_s") is not None
]
window = 5
recent_success = [1.0 if s else 0.0 for s in successes[-window:]]
prior_success = [1.0 if s else 0.0 for s in successes[:-window][-window:]]
recent_quality = quality_scores[-window:]
prior_quality = quality_scores[:-window][-window:] if len(quality_scores) > window else []
return {
"agent": self.agent_name,
"execution_count": len(records),
"success_rate": round(success_rate, 3),
"avg_quality_score": round(
sum(quality_scores) / len(quality_scores) if quality_scores else 0.0,
3,
),
"avg_execution_time_s": round(
sum(execution_times) / len(execution_times) if execution_times else 0.0,
3,
),
"last_execution": records[-1]["timestamp"],
"trend": {
"success_rate": _trend_direction(recent_success, prior_success),
"quality_score": _trend_direction(recent_quality, prior_quality),
},
}
def write_summary(self) -> Dict[str, Any]:
summary = self.summarise()
self.agent_dir.mkdir(parents=True, exist_ok=True)
self.summary_path.write_text(
json.dumps(summary, indent=2, sort_keys=True) + "\n",
encoding="utf-8",
)
return summary
def read_summary(self) -> Optional[Dict[str, Any]]:
if not self.summary_path.exists():
return None
return json.loads(self.summary_path.read_text(encoding="utf-8"))
def prune(self) -> int:
"""Drop execution records older than retention_days. Returns removed count."""
if not self.executions_path.exists():
return 0
cutoff = datetime.now(timezone.utc) - timedelta(days=self.retention_days)
kept: List[Dict[str, Any]] = []
removed = 0
for record in self.read_executions():
try:
ts = _parse_timestamp(record["timestamp"])
except (KeyError, ValueError):
kept.append(record)
continue
if ts >= cutoff:
kept.append(record)
else:
removed += 1
if removed:
with self.executions_path.open("w", encoding="utf-8") as handle:
for record in kept:
handle.write(json.dumps(record, sort_keys=True))
handle.write("\n")
self.write_summary()
return removed
def _has_idempotency_key(self, key: str) -> bool:
return any(r.get("idempotency_key") == key for r in self.read_executions())

107
tests/test_metrics.py Normal file
View File

@@ -0,0 +1,107 @@
"""Tests for project-scoped metrics storage (ADR-004)."""
from __future__ import annotations
import json
from datetime import datetime, timedelta, timezone
from pathlib import Path
import pytest
from kaizen_agentic.metrics import MetricsStore, DEFAULT_RETENTION_DAYS
def _old_timestamp(days: int) -> str:
dt = datetime.now(timezone.utc) - timedelta(days=days)
return dt.strftime("%Y-%m-%dT%H:%M:%SZ")
@pytest.fixture
def project_dir(tmp_path: Path) -> Path:
root = tmp_path / "demo-project"
root.mkdir()
return root
class TestMetricsStore:
def test_scaffold_creates_directory_and_empty_executions(self, project_dir: Path):
store = MetricsStore(project_dir, "tdd-workflow")
path = store.scaffold()
assert path == project_dir / ".kaizen" / "metrics" / "tdd-workflow"
assert store.executions_path.exists()
assert store.executions_path.read_text() == ""
def test_append_and_read_executions(self, project_dir: Path):
store = MetricsStore(project_dir, "tdd-workflow")
assert store.append({"success": True, "quality_score": 0.9}) is True
assert store.append({"success": False, "execution_time_s": 12.5}) is True
records = store.read_executions()
assert len(records) == 2
assert records[0]["agent"] == "tdd-workflow"
assert records[0]["success"] is True
assert "timestamp" in records[0]
def test_idempotency_key_rejects_duplicate(self, project_dir: Path):
store = MetricsStore(project_dir, "coach")
assert store.append({"success": True}, idempotency_key="sess-1") is True
assert store.append({"success": True}, idempotency_key="sess-1") is False
assert len(store.read_executions()) == 1
def test_write_summary_regenerates_summary_json(self, project_dir: Path):
store = MetricsStore(project_dir, "tdd-workflow")
store.append({"success": True, "quality_score": 0.8, "execution_time_s": 10})
store.append({"success": True, "quality_score": 1.0, "execution_time_s": 20})
summary = store.write_summary()
assert summary["execution_count"] == 2
assert summary["success_rate"] == 1.0
assert summary["avg_quality_score"] == 0.9
assert summary["avg_execution_time_s"] == 15.0
assert store.summary_path.exists()
on_disk = json.loads(store.summary_path.read_text())
assert on_disk["execution_count"] == 2
def test_prune_removes_expired_records(self, project_dir: Path):
store = MetricsStore(project_dir, "tdd-workflow", retention_days=30)
store.scaffold()
old = {
"timestamp": _old_timestamp(45),
"agent": "tdd-workflow",
"success": False,
}
recent = {
"timestamp": _old_timestamp(1),
"agent": "tdd-workflow",
"success": True,
"quality_score": 0.7,
}
with store.executions_path.open("w", encoding="utf-8") as handle:
handle.write(json.dumps(old) + "\n")
handle.write(json.dumps(recent) + "\n")
removed = store.prune()
assert removed == 1
records = store.read_executions()
assert len(records) == 1
assert records[0]["success"] is True
summary = store.read_summary()
assert summary is not None
assert summary["execution_count"] == 1
def test_list_agents_with_metrics(self, project_dir: Path):
MetricsStore(project_dir, "tdd-workflow").scaffold()
MetricsStore(project_dir, "coach").append({"success": True})
agents = MetricsStore.list_agents(project_dir)
assert agents == ["coach", "tdd-workflow"]
def test_default_retention_matches_adr(self):
assert DEFAULT_RETENTION_DAYS == 180

View File

@@ -85,9 +85,9 @@ Optimizer-specific aggregates (per `wiki/AgentKaizenOptimizer.md`):
### Tasks
- [x] T01 — Write ADR-004: project metrics convention (location, schema, lifecycle, retention, Helix Forge correlation)
- [ ] T02 — Implement `MetricsStore` in `src/kaizen_agentic/metrics.py` (append, read, summarise, prune by retention)
- [ ] T03 — Add `memory init` hook to scaffold `.kaizen/metrics/<agent>/` alongside memory (optional flag `--no-metrics`)
- [ ] T04 — Unit tests for `MetricsStore` (append idempotency key, summary regeneration, retention prune)
- [x] T02 — Implement `MetricsStore` in `src/kaizen_agentic/metrics.py` (append, read, summarise, prune by retention)
- [x] T03 — Add `memory init` hook to scaffold `.kaizen/metrics/<agent>/` alongside memory (optional flag `--no-metrics`)
- [x] T04 — Unit tests for `MetricsStore` (append idempotency key, summary regeneration, retention prune)
### Definition of done