"""Project-scoped agent metrics storage (ADR-004).""" from __future__ import annotations import json from dataclasses import dataclass from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any, Dict, List, Optional DEFAULT_RETENTION_DAYS = 180 def _utc_now_iso() -> str: return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") def _parse_timestamp(value: str) -> datetime: normalized = value.replace("Z", "+00:00") return datetime.fromisoformat(normalized) _TREND_ARROWS = {"up": "↑", "down": "↓", "stable": "→", "unknown": "?"} def performance_summary_markdown(summary: Dict[str, Any]) -> str: """Format ADR-004 summary.json as a Coach brief markdown section.""" if not summary or summary.get("execution_count", 0) == 0: return "" trend = summary.get("trend", {}) success_trend = trend.get("success_rate", "unknown") quality_trend = trend.get("quality_score", "unknown") lines = [ "## Performance Summary", "", f"- Executions: {summary['execution_count']}", ( f"- Success rate: {summary['success_rate']:.1%} " f"({_TREND_ARROWS.get(success_trend, '?')} {success_trend})" ), f"- Avg quality: {summary['avg_quality_score']:.2f} " f"({_TREND_ARROWS.get(quality_trend, '?')} {quality_trend})", f"- Avg execution time: {summary['avg_execution_time_s']:.1f}s", ] if summary.get("last_execution"): lines.append(f"- Last execution: {summary['last_execution']}") lines.append("") return "\n".join(lines) def _trend_direction(recent: List[float], prior: List[float]) -> str: if not recent: return "unknown" if not prior: return "stable" recent_avg = sum(recent) / len(recent) prior_avg = sum(prior) / len(prior) delta = recent_avg - prior_avg if abs(delta) < 0.05: return "stable" return "up" if delta > 0 else "down" @dataclass class MetricsStore: """Append-only per-agent execution metrics under .kaizen/metrics/.""" project_root: Path agent_name: str retention_days: int = DEFAULT_RETENTION_DAYS def __post_init__(self) -> None: self.project_root = Path(self.project_root).resolve() self.agent_dir = self.project_root / ".kaizen" / "metrics" / self.agent_name self.executions_path = self.agent_dir / "executions.jsonl" self.summary_path = self.agent_dir / "summary.json" @classmethod def list_agents(cls, project_root: Path) -> List[str]: metrics_root = Path(project_root).resolve() / ".kaizen" / "metrics" if not metrics_root.exists(): return [] agents = [] for child in sorted(metrics_root.iterdir()): if child.is_dir() and (child / "executions.jsonl").exists(): agents.append(child.name) return agents def scaffold(self) -> Path: """Create metrics directory for this agent.""" self.agent_dir.mkdir(parents=True, exist_ok=True) if not self.executions_path.exists(): self.executions_path.write_text("", encoding="utf-8") return self.agent_dir def append( self, record: Dict[str, Any], *, idempotency_key: Optional[str] = None, ) -> bool: """Append an execution record. Returns False if idempotency_key duplicates.""" self.scaffold() payload = dict(record) payload.setdefault("agent", self.agent_name) payload.setdefault("timestamp", _utc_now_iso()) if idempotency_key is not None: if self._has_idempotency_key(idempotency_key): return False payload["idempotency_key"] = idempotency_key if "success" not in payload: raise ValueError("execution record requires 'success' field") with self.executions_path.open("a", encoding="utf-8") as handle: handle.write(json.dumps(payload, sort_keys=True)) handle.write("\n") self.prune() self.write_summary() return True def read_executions(self) -> List[Dict[str, Any]]: if not self.executions_path.exists(): return [] records: List[Dict[str, Any]] = [] with self.executions_path.open(encoding="utf-8") as handle: for line in handle: line = line.strip() if line: records.append(json.loads(line)) return records def summarise(self) -> Dict[str, Any]: records = self.read_executions() if not records: return { "agent": self.agent_name, "execution_count": 0, "success_rate": 0.0, "avg_quality_score": 0.0, "avg_execution_time_s": 0.0, "last_execution": None, "trend": { "success_rate": "unknown", "quality_score": "unknown", }, } successes = [bool(r["success"]) for r in records] success_rate = sum(successes) / len(successes) quality_scores = [ float(r["quality_score"]) for r in records if r.get("quality_score") is not None ] execution_times = [ float(r["execution_time_s"]) for r in records if r.get("execution_time_s") is not None ] window = 5 recent_success = [1.0 if s else 0.0 for s in successes[-window:]] prior_success = [1.0 if s else 0.0 for s in successes[:-window][-window:]] recent_quality = quality_scores[-window:] prior_quality = ( quality_scores[:-window][-window:] if len(quality_scores) > window else [] ) return { "agent": self.agent_name, "execution_count": len(records), "success_rate": round(success_rate, 3), "avg_quality_score": round( sum(quality_scores) / len(quality_scores) if quality_scores else 0.0, 3, ), "avg_execution_time_s": round( sum(execution_times) / len(execution_times) if execution_times else 0.0, 3, ), "last_execution": records[-1]["timestamp"], "trend": { "success_rate": _trend_direction(recent_success, prior_success), "quality_score": _trend_direction(recent_quality, prior_quality), }, } def write_summary(self) -> Dict[str, Any]: summary = self.summarise() self.agent_dir.mkdir(parents=True, exist_ok=True) self.summary_path.write_text( json.dumps(summary, indent=2, sort_keys=True) + "\n", encoding="utf-8", ) return summary def read_summary(self) -> Optional[Dict[str, Any]]: if not self.summary_path.exists(): return None return json.loads(self.summary_path.read_text(encoding="utf-8")) def prune(self) -> int: """Drop execution records older than retention_days. Returns removed count.""" if not self.executions_path.exists(): return 0 cutoff = datetime.now(timezone.utc) - timedelta(days=self.retention_days) kept: List[Dict[str, Any]] = [] removed = 0 for record in self.read_executions(): try: ts = _parse_timestamp(record["timestamp"]) except (KeyError, ValueError): kept.append(record) continue if ts >= cutoff: kept.append(record) else: removed += 1 if removed: with self.executions_path.open("w", encoding="utf-8") as handle: for record in kept: handle.write(json.dumps(record, sort_keys=True)) handle.write("\n") self.write_summary() return removed def _has_idempotency_key(self, key: str) -> bool: return any(r.get("idempotency_key") == key for r in self.read_executions()) @dataclass class OptimizerStore: """Persist optimizer analysis output under .kaizen/metrics/optimizer/.""" project_root: Path def __post_init__(self) -> None: self.project_root = Path(self.project_root).resolve() self.optimizer_dir = self.project_root / ".kaizen" / "metrics" / "optimizer" self.analysis_path = self.optimizer_dir / "analysis.json" self.recommendations_path = self.optimizer_dir / "recommendations.jsonl" def write_analysis(self, report: Dict[str, Any]) -> Path: self.optimizer_dir.mkdir(parents=True, exist_ok=True) self.analysis_path.write_text( json.dumps(report, indent=2, sort_keys=True) + "\n", encoding="utf-8", ) return self.analysis_path def append_recommendations( self, agent_name: str, recommendations: List[Dict[str, Any]], *, metrics_count: int, ) -> None: self.optimizer_dir.mkdir(parents=True, exist_ok=True) entry = { "timestamp": _utc_now_iso(), "agent": agent_name, "metrics_count": metrics_count, "recommendations": recommendations, } with self.recommendations_path.open("a", encoding="utf-8") as handle: handle.write(json.dumps(entry, sort_keys=True)) handle.write("\n")