""" Optimization infrastructure for continuous improvement of AI agents. This module implements the kaizen loop for measuring, analyzing, and refining agent performance over time. """ from typing import TYPE_CHECKING, Any, Dict, List, Optional from dataclasses import dataclass from datetime import datetime import statistics if TYPE_CHECKING: from .metrics import MetricsStore MIN_SAMPLES_FOR_RECOMMENDATIONS = 10 @dataclass class PerformanceMetrics: """Container for agent performance metrics.""" timestamp: datetime execution_time: float success_rate: float quality_score: float resource_usage: Dict[str, Any] metadata: Optional[Dict[str, Any]] = None def __post_init__(self): if self.metadata is None: self.metadata = {} class OptimizationLoop: """Implements the kaizen optimization loop for continuous improvement.""" def __init__(self, agent_name: str): self.agent_name = agent_name self.metrics_history: List[PerformanceMetrics] = [] self.optimization_history: List[Dict[str, Any]] = [] @classmethod def from_metrics_store( cls, store: "MetricsStore", *, min_samples: int = 1, ) -> "OptimizationLoop": """Build an optimization loop from project-scoped execution records.""" loop = cls(store.agent_name) records = store.read_executions() if len(records) < min_samples: return loop for record in records: loop.record_metrics(cls._metrics_from_record(record)) return loop @staticmethod def _metrics_from_record(record: Dict[str, Any]) -> PerformanceMetrics: timestamp_raw = record.get("timestamp") try: timestamp = datetime.fromisoformat( str(timestamp_raw).replace("Z", "+00:00") ) except (TypeError, ValueError): timestamp = datetime.now() success = bool(record.get("success", False)) quality = record.get("quality_score") if quality is None: quality = 1.0 if success else 0.0 metadata = { k: v for k, v in record.items() if k not in { "timestamp", "agent", "success", "execution_time_s", "quality_score", "primary_metric", } } return PerformanceMetrics( timestamp=timestamp, execution_time=float(record.get("execution_time_s") or 0.0), success_rate=1.0 if success else 0.0, quality_score=float(quality), resource_usage={}, metadata=metadata or None, ) def record_metrics(self, metrics: PerformanceMetrics) -> None: """Record performance metrics for analysis.""" self.metrics_history.append(metrics) def analyze_performance(self, window_size: int = 10) -> Dict[str, Any]: """Analyze recent performance trends.""" if len(self.metrics_history) < window_size: window_size = len(self.metrics_history) if window_size == 0: return {"status": "insufficient_data"} recent_metrics = self.metrics_history[-window_size:] execution_times = [m.execution_time for m in recent_metrics] success_rates = [m.success_rate for m in recent_metrics] quality_scores = [m.quality_score for m in recent_metrics] analysis = { "window_size": window_size, "avg_execution_time": statistics.mean(execution_times), "avg_success_rate": statistics.mean(success_rates), "avg_quality_score": statistics.mean(quality_scores), "execution_time_trend": self._calculate_trend(execution_times), "success_rate_trend": self._calculate_trend(success_rates), "quality_score_trend": self._calculate_trend(quality_scores), "analysis_timestamp": datetime.now(), } return analysis def generate_improvement_recommendations(self) -> List[Dict[str, Any]]: """Generate recommendations for agent improvement.""" analysis = self.analyze_performance() if analysis.get("status") == "insufficient_data": return [ {"type": "info", "message": "Insufficient data for recommendations"} ] recommendations = [] # Performance-based recommendations if analysis["avg_execution_time"] > 30.0: # seconds recommendations.append( { "type": "performance", "priority": "high", "message": "Consider optimizing execution time", "details": f"Average execution time: {analysis['avg_execution_time']:.2f}s", } ) if analysis["avg_success_rate"] < 0.8: recommendations.append( { "type": "reliability", "priority": "critical", "message": "Success rate below threshold", "details": f"Current success rate: {analysis['avg_success_rate']:.2%}", } ) if analysis["avg_quality_score"] < 0.7: recommendations.append( { "type": "quality", "priority": "medium", "message": "Quality score could be improved", "details": f"Current quality score: {analysis['avg_quality_score']:.2f}", } ) # Trend-based recommendations if analysis["execution_time_trend"] > 0.1: recommendations.append( { "type": "trend", "priority": "medium", "message": "Execution time is trending upward", "details": "Consider performance profiling", } ) return recommendations def _calculate_trend(self, values: List[float]) -> float: """Calculate trend direction (-1 to 1, where 1 is strongly upward).""" if len(values) < 2: return 0.0 # Simple linear trend calculation n = len(values) x_sum = sum(range(n)) y_sum = sum(values) xy_sum = sum(i * values[i] for i in range(n)) x2_sum = sum(i * i for i in range(n)) if n * x2_sum - x_sum * x_sum == 0: return 0.0 slope = (n * xy_sum - x_sum * y_sum) / (n * x2_sum - x_sum * x_sum) # Normalize slope to [-1, 1] range based on value magnitude if y_sum == 0: return 0.0 avg_value = y_sum / n normalized_slope = slope / abs(avg_value) if avg_value != 0 else 0.0 return max(-1.0, min(1.0, normalized_slope)) def get_optimization_report(self) -> Dict[str, Any]: """Generate comprehensive optimization report.""" analysis = self.analyze_performance() recommendations = self.generate_improvement_recommendations() return { "agent_name": self.agent_name, "report_timestamp": datetime.now(), "performance_analysis": analysis, "recommendations": recommendations, "metrics_count": len(self.metrics_history), "optimization_cycles": len(self.optimization_history), } def get_optimization_report_json(self) -> Dict[str, Any]: """JSON-serializable optimization report.""" return _to_json_safe(self.get_optimization_report()) def _to_json_safe(value: Any) -> Any: if isinstance(value, datetime): return value.isoformat() if isinstance(value, dict): return {k: _to_json_safe(v) for k, v in value.items()} if isinstance(value, list): return [_to_json_safe(item) for item in value] return value