Add from_metrics_store factory, OptimizerStore persistence, metrics optimize CLI, consolidate duplicate optimization agent, and add integration tests.
1235 lines
44 KiB
Python
1235 lines
44 KiB
Python
"""Command-line interface for Kaizen Agentic agent management."""
|
||
|
||
import json
|
||
import sys
|
||
import subprocess
|
||
import contextlib
|
||
import io
|
||
import click
|
||
from pathlib import Path
|
||
from typing import List, Optional
|
||
|
||
from .registry import AgentRegistry, AgentCategory
|
||
from .installer import AgentInstaller, ProjectInitializer, InstallationConfig
|
||
from .metrics import MetricsStore, OptimizerStore
|
||
from .optimization import OptimizationLoop, MIN_SAMPLES_FOR_RECOMMENDATIONS
|
||
|
||
|
||
def safe_cli_wrapper():
|
||
"""
|
||
Wrapper to handle Click errors gracefully and provide clean user experience.
|
||
|
||
WORKAROUND FOR CLICK LIBRARY ISSUE:
|
||
===================================
|
||
|
||
This function addresses a spurious error message that appears when using Click
|
||
with certain argument configurations. The issue manifests as:
|
||
|
||
"Error: Got unexpected extra argument (agent-name)"
|
||
|
||
Despite this error message, the underlying CLI function executes correctly.
|
||
This appears to be a Click library display/buffering issue where error handling
|
||
interferes with normal execution flow.
|
||
|
||
AFFECTED COMMANDS: install, update
|
||
|
||
ISSUE DETAILS:
|
||
- Affects: Click library (tested with Click 8.x series)
|
||
- Symptom: Misleading error messages during successful command execution
|
||
- Impact: Confusing user experience despite functional CLI
|
||
- Root cause: Click's argument validation timing/display mechanism
|
||
|
||
WORKAROUND APPROACH:
|
||
- Capture stdout/stderr streams during CLI execution
|
||
- Detect spurious error patterns specific to known issues
|
||
- Filter misleading messages while preserving legitimate errors
|
||
- Provide clean output for successful operations
|
||
|
||
TODO: REVISIT WHEN CLICK UPDATES
|
||
================================
|
||
Monitor Click library releases and test removal of this workaround:
|
||
- Test with Click 9.x+ releases
|
||
- Remove this wrapper if the underlying issue is resolved
|
||
- Update entry point back to direct CLI function: kaizen_agentic.cli:cli
|
||
|
||
TESTING:
|
||
This workaround is covered by tests in test_cli_error_handling.py
|
||
"""
|
||
# Capture stderr to intercept spurious error messages
|
||
stderr_capture = io.StringIO()
|
||
stdout_capture = io.StringIO()
|
||
|
||
# Check if this is an install or update command before processing
|
||
affected_commands = len(sys.argv) >= 2 and sys.argv[1] in ["install", "update"]
|
||
|
||
try:
|
||
with contextlib.redirect_stderr(stderr_capture), contextlib.redirect_stdout(stdout_capture):
|
||
cli(standalone_mode=False)
|
||
except click.UsageError as e:
|
||
if affected_commands and "Got unexpected extra argument" in str(e):
|
||
# This is the spurious error for install/update commands
|
||
# Check if we got some stdout output indicating success
|
||
captured_stdout = stdout_capture.getvalue()
|
||
success_indicators = ["Installing agents to:", "Updating all installed agents:"]
|
||
if any(indicator in captured_stdout for indicator in success_indicators):
|
||
# The command was actually executing, show the real output
|
||
print(captured_stdout, end='')
|
||
sys.exit(0)
|
||
else:
|
||
# This might be a real error
|
||
print(f"Error: {e}")
|
||
sys.exit(2)
|
||
else:
|
||
# Legitimate error for other commands
|
||
print(f"Error: {e}")
|
||
sys.exit(2)
|
||
except SystemExit as e:
|
||
# Show captured output and handle exits
|
||
captured_stdout = stdout_capture.getvalue()
|
||
captured_stderr = stderr_capture.getvalue()
|
||
|
||
if e.code == 0:
|
||
# Successful exit
|
||
print(captured_stdout, end='')
|
||
else:
|
||
# Error exit - show both stdout and stderr unless it's the spurious error
|
||
if affected_commands and "Got unexpected extra argument" in captured_stderr:
|
||
# Show only stdout for install/update commands with spurious errors
|
||
print(captured_stdout, end='')
|
||
success_indicators = ["Installing agents to:", "Updating all installed agents:"]
|
||
if any(indicator in captured_stdout for indicator in success_indicators):
|
||
sys.exit(0) # Override error exit if we see success indicators
|
||
else:
|
||
# Show everything for other commands
|
||
print(captured_stdout, end='')
|
||
print(captured_stderr, end='', file=sys.stderr)
|
||
sys.exit(e.code)
|
||
except Exception as e:
|
||
print(f"Error: {e}")
|
||
sys.exit(1)
|
||
|
||
# If we get here, show captured output
|
||
print(stdout_capture.getvalue(), end='')
|
||
stderr_content = stderr_capture.getvalue()
|
||
if stderr_content and not (affected_commands and "Got unexpected extra argument" in stderr_content):
|
||
print(stderr_content, end='', file=sys.stderr)
|
||
|
||
@click.group()
|
||
@click.version_option()
|
||
def cli():
|
||
"""Kaizen Agentic - AI agent development framework."""
|
||
pass
|
||
|
||
|
||
@cli.command("list")
|
||
@click.option(
|
||
"--category",
|
||
type=click.Choice([c.value for c in AgentCategory]),
|
||
help="Filter by category",
|
||
)
|
||
@click.option("--verbose", "-v", is_flag=True, help="Show detailed information")
|
||
def list_agents(category: Optional[str], verbose: bool):
|
||
"""List available agents."""
|
||
registry = _get_registry()
|
||
|
||
if category:
|
||
cat_enum = AgentCategory(category)
|
||
agents = registry.list_agents(cat_enum)
|
||
click.echo(f"\n{category.replace('-', ' ').title()} Agents:")
|
||
click.echo("=" * 40)
|
||
else:
|
||
if verbose:
|
||
categories = registry.get_categories()
|
||
for cat, agents in categories.items():
|
||
click.echo(
|
||
f"\n{cat.value.replace('-', ' ').title()} ({len(agents)} agents):"
|
||
)
|
||
click.echo("=" * 50)
|
||
for agent in agents:
|
||
click.echo(f" • {agent.name}: {agent.description}")
|
||
return
|
||
else:
|
||
agents = registry.list_agents()
|
||
click.echo(f"\nAvailable Agents ({len(agents)} total):")
|
||
click.echo("=" * 40)
|
||
|
||
for agent in agents:
|
||
if verbose:
|
||
click.echo(f"\n{agent.name}")
|
||
click.echo(f" Description: {agent.description}")
|
||
click.echo(f" Category: {agent.category.value}")
|
||
if agent.dependencies:
|
||
click.echo(f" Dependencies: {', '.join(agent.dependencies)}")
|
||
else:
|
||
click.echo(f" • {agent.name}: {agent.description}")
|
||
|
||
|
||
@cli.command()
|
||
@click.argument("agents", nargs=-1, required=True)
|
||
@click.option("--target", "-t", default=".", help="Target directory (default: current)")
|
||
@click.option("--no-backup", is_flag=True, help="Skip creating backup")
|
||
@click.option("--no-docs", is_flag=True, help="Skip updating documentation")
|
||
def install(agents: List[str], target: str, no_backup: bool, no_docs: bool):
|
||
"""
|
||
Install agents into a project.
|
||
|
||
NOTE: This command is affected by a Click library issue that causes spurious
|
||
"Got unexpected extra argument" messages. This is handled by safe_cli_wrapper().
|
||
See safe_cli_wrapper() docstring for details and removal timeline.
|
||
"""
|
||
try:
|
||
registry = _get_registry()
|
||
installer = AgentInstaller(registry)
|
||
target_path = Path(target).resolve()
|
||
|
||
config = InstallationConfig(
|
||
target_dir=target_path,
|
||
claude_config_path=target_path / "CLAUDE.md",
|
||
makefile_path=target_path / "Makefile",
|
||
update_docs=not no_docs,
|
||
create_backup=not no_backup,
|
||
)
|
||
|
||
click.echo(f"Installing agents to: {target_path}")
|
||
|
||
# Resolve dependencies with fallback
|
||
try:
|
||
resolved = registry.resolve_dependencies(list(agents))
|
||
if len(resolved) > len(agents):
|
||
additional = [a for a in resolved if a not in agents]
|
||
click.echo(f"Including dependencies: {', '.join(additional)}")
|
||
except Exception:
|
||
# Fall back to original agent list if dependency resolution fails
|
||
resolved = list(agents)
|
||
|
||
results = installer.install_agents(resolved, config)
|
||
|
||
# Display results
|
||
success_count = 0
|
||
for agent_name, status in results.items():
|
||
if status == "INSTALLED":
|
||
click.echo(f" ✅ {agent_name}")
|
||
success_count += 1
|
||
else:
|
||
click.echo(f" ❌ {agent_name}: {status}")
|
||
|
||
click.echo(f"\nInstalled {success_count}/{len(results)} agents successfully")
|
||
|
||
# Force successful exit to override any Click error handling
|
||
sys.exit(0)
|
||
|
||
except Exception as e:
|
||
click.echo(f"Installation failed: {e}")
|
||
sys.exit(1)
|
||
|
||
|
||
@cli.command()
|
||
@click.option("--target", "-t", default=".", help="Target directory (default: current)")
|
||
@click.argument("agents", nargs=-1)
|
||
def update(target: str, agents: List[str]):
|
||
"""
|
||
Update installed agents.
|
||
|
||
NOTE: This command is affected by a Click library issue that causes spurious
|
||
"Got unexpected extra argument" messages. This is handled by safe_cli_wrapper().
|
||
See safe_cli_wrapper() docstring for details and removal timeline.
|
||
"""
|
||
registry = _get_registry()
|
||
installer = AgentInstaller(registry)
|
||
|
||
target_path = Path(target).resolve()
|
||
|
||
if not agents:
|
||
agents = installer.list_installed_agents(target_path)
|
||
if not agents:
|
||
click.echo("No agents installed in this project")
|
||
return
|
||
click.echo(f"Updating all installed agents: {', '.join(agents)}")
|
||
else:
|
||
click.echo(f"Updating specific agents: {', '.join(agents)}")
|
||
|
||
results = installer.update_agents(target_path, list(agents))
|
||
|
||
# Display results
|
||
success_count = 0
|
||
for agent_name, status in results.items():
|
||
if status == "INSTALLED":
|
||
click.echo(f" ✅ {agent_name}")
|
||
success_count += 1
|
||
else:
|
||
click.echo(f" ❌ {agent_name}: {status}")
|
||
|
||
click.echo(f"\nUpdated {success_count}/{len(results)} agents successfully")
|
||
|
||
|
||
@cli.command()
|
||
@click.argument("agents", nargs=-1, required=True)
|
||
@click.option("--target", "-t", default=".", help="Target directory (default: current)")
|
||
def remove(agents: List[str], target: str):
|
||
"""Remove agents from a project."""
|
||
registry = _get_registry()
|
||
installer = AgentInstaller(registry)
|
||
|
||
target_path = Path(target).resolve()
|
||
|
||
click.echo(f"Removing agents from: {target_path}")
|
||
results = installer.remove_agents(list(agents), target_path)
|
||
|
||
# Display results
|
||
for agent_name, status in results.items():
|
||
if status == "REMOVED":
|
||
click.echo(f" ✅ {agent_name}")
|
||
elif status == "NOT_FOUND":
|
||
click.echo(f" ⚠️ {agent_name}: Not installed")
|
||
else:
|
||
click.echo(f" ❌ {agent_name}: {status}")
|
||
|
||
|
||
@cli.command()
|
||
@click.argument("project_name")
|
||
@click.option(
|
||
"--template",
|
||
"-t",
|
||
default="python-basic",
|
||
help="Project template (python-basic, python-web, python-cli, python-data)",
|
||
)
|
||
@click.option("--agents", "-a", help="Comma-separated list of agents to install")
|
||
@click.option(
|
||
"--parent-dir", default=".", help="Parent directory for project (default: current)"
|
||
)
|
||
def init(project_name: str, template: str, agents: Optional[str], parent_dir: str):
|
||
"""Initialize a new project with agents."""
|
||
registry = _get_registry()
|
||
initializer = ProjectInitializer(registry)
|
||
|
||
project_path = Path(parent_dir) / project_name
|
||
|
||
if project_path.exists():
|
||
click.echo(f"Error: Directory {project_path} already exists")
|
||
sys.exit(1)
|
||
|
||
# Parse agent list
|
||
agent_list = None
|
||
if agents:
|
||
agent_list = [a.strip() for a in agents.split(",")]
|
||
|
||
click.echo(f"Initializing project: {project_name}")
|
||
click.echo(f"Template: {template}")
|
||
|
||
# Show available templates
|
||
templates = registry.get_agent_templates()
|
||
if template not in templates:
|
||
click.echo(f"Error: Unknown template '{template}'")
|
||
click.echo(f"Available templates: {', '.join(templates.keys())}")
|
||
sys.exit(1)
|
||
|
||
if not agent_list:
|
||
agent_list = templates[template]
|
||
click.echo(f"Using template agents: {', '.join(agent_list)}")
|
||
else:
|
||
click.echo(f"Using custom agents: {', '.join(agent_list)}")
|
||
|
||
results = initializer.init_project(project_path, template, agent_list, project_name)
|
||
|
||
# Display results
|
||
success_count = sum(1 for status in results.values() if status == "INSTALLED")
|
||
click.echo(f"\nProject initialized with {success_count}/{len(results)} agents")
|
||
|
||
click.echo("\nNext steps:")
|
||
click.echo(f" cd {project_name}")
|
||
click.echo(" make setup-complete # Set up development environment")
|
||
click.echo(" make test # Run tests")
|
||
|
||
|
||
@cli.command()
|
||
@click.option("--target", "-t", default=".", help="Target directory (default: current)")
|
||
def validate(target: str):
|
||
"""Validate agents in a project."""
|
||
registry = _get_registry()
|
||
installer = AgentInstaller(registry)
|
||
|
||
target_path = Path(target).resolve()
|
||
|
||
# Validate registry agents
|
||
click.echo("Validating agent registry...")
|
||
registry_errors = registry.validate_agents()
|
||
|
||
if registry_errors:
|
||
click.echo("Registry validation errors:")
|
||
for agent, errors in registry_errors.items():
|
||
click.echo(f" {agent}:")
|
||
for error in errors:
|
||
click.echo(f" ❌ {error}")
|
||
else:
|
||
click.echo(" ✅ Registry validation passed")
|
||
|
||
# Validate installed agents
|
||
click.echo(f"\nValidating installed agents in: {target_path}")
|
||
install_errors = installer.validate_installation(target_path)
|
||
|
||
if install_errors:
|
||
click.echo("Installation validation errors:")
|
||
for agent, errors in install_errors.items():
|
||
click.echo(f" {agent}:")
|
||
for error in errors:
|
||
click.echo(f" ❌ {error}")
|
||
else:
|
||
click.echo(" ✅ Installation validation passed")
|
||
|
||
# Show installed agents
|
||
installed = installer.list_installed_agents(target_path)
|
||
if installed:
|
||
click.echo(f"\nInstalled agents ({len(installed)}):")
|
||
for agent in installed:
|
||
click.echo(f" • {agent}")
|
||
else:
|
||
click.echo("\nNo agents installed in this project")
|
||
|
||
|
||
@cli.command()
|
||
def templates():
|
||
"""List available project templates."""
|
||
registry = _get_registry()
|
||
templates = registry.get_agent_templates()
|
||
|
||
click.echo("Available Project Templates:")
|
||
click.echo("=" * 40)
|
||
|
||
for template_name, agent_list in templates.items():
|
||
click.echo(f"\n{template_name}:")
|
||
click.echo(f" Agents ({len(agent_list)}): {', '.join(agent_list)}")
|
||
|
||
|
||
@cli.command()
|
||
@click.option("--target", "-t", default=".", help="Target directory (default: current)")
|
||
def status(target: str):
|
||
"""Show status of agents in a project."""
|
||
registry = _get_registry()
|
||
installer = AgentInstaller(registry)
|
||
|
||
target_path = Path(target).resolve()
|
||
|
||
click.echo(f"Project: {target_path.name}")
|
||
click.echo(f"Path: {target_path}")
|
||
click.echo("=" * 50)
|
||
|
||
# Check if agents directory exists
|
||
agents_dir = target_path / "agents"
|
||
if not agents_dir.exists():
|
||
click.echo("❌ No agents directory found")
|
||
click.echo("\nRun 'kaizen-agentic init' to initialize a new project")
|
||
click.echo("or 'kaizen-agentic install <agents>' to add agents")
|
||
return
|
||
|
||
# List installed agents
|
||
installed = installer.list_installed_agents(target_path)
|
||
if installed:
|
||
click.echo(f"✅ Agents installed ({len(installed)}):")
|
||
|
||
# Group by category
|
||
categories = {}
|
||
for agent_name in installed:
|
||
agent = registry.get_agent(agent_name)
|
||
if agent:
|
||
cat = agent.category.value
|
||
if cat not in categories:
|
||
categories[cat] = []
|
||
categories[cat].append(agent_name)
|
||
else:
|
||
if "unknown" not in categories:
|
||
categories["unknown"] = []
|
||
categories["unknown"].append(agent_name)
|
||
|
||
for category, agents in categories.items():
|
||
click.echo(f"\n {category.replace('-', ' ').title()}:")
|
||
for agent in agents:
|
||
click.echo(f" • {agent}")
|
||
else:
|
||
click.echo("❌ No agents installed")
|
||
|
||
# Check for configuration files
|
||
click.echo("\nConfiguration files:")
|
||
config_files = ["CLAUDE.md", "Makefile", "pyproject.toml", ".gitignore"]
|
||
for config_file in config_files:
|
||
file_path = target_path / config_file
|
||
if file_path.exists():
|
||
click.echo(f" ✅ {config_file}")
|
||
else:
|
||
click.echo(f" ❌ {config_file}")
|
||
|
||
|
||
@cli.command()
|
||
@click.option("--target", "-t", default=".", help="Target directory (default: current)")
|
||
@click.option("--detailed", "-d", is_flag=True, help="Show detailed analysis")
|
||
def detect(target: str, detailed: bool):
|
||
"""Detect existing agent systems in a project."""
|
||
from .detection import AgentSystemDetector
|
||
|
||
target_path = Path(target).resolve()
|
||
|
||
if not target_path.exists():
|
||
click.echo(f"Error: Directory not found: {target_path}")
|
||
sys.exit(1)
|
||
|
||
click.echo(f"Detecting agent systems in: {target_path}")
|
||
click.echo("=" * 50)
|
||
|
||
detector = AgentSystemDetector()
|
||
result = detector.detect_agent_systems(target_path)
|
||
|
||
# Show detected systems
|
||
if result.detected_systems:
|
||
click.echo(f"\n🔍 Detected Agent Systems ({len(result.detected_systems)}):")
|
||
for system in result.detected_systems:
|
||
click.echo(f" • {system.value}")
|
||
else:
|
||
click.echo("\n🔍 No existing agent systems detected")
|
||
|
||
# Show detected agents
|
||
if result.agents:
|
||
click.echo(f"\n🤖 Detected Agents ({len(result.agents)}):")
|
||
for agent in result.agents:
|
||
status = "✅" if agent.can_migrate else "⚠️"
|
||
click.echo(f" {status} {agent.name} ({agent.type.value})")
|
||
if detailed and agent.description:
|
||
click.echo(f" {agent.description}")
|
||
if not agent.can_migrate and agent.migration_notes:
|
||
click.echo(f" Note: {agent.migration_notes}")
|
||
|
||
# Show config files
|
||
if result.config_files:
|
||
click.echo(f"\n📄 Configuration Files ({len(result.config_files)}):")
|
||
for config_file in result.config_files:
|
||
click.echo(f" • {config_file.relative_to(target_path)}")
|
||
|
||
# Show conflicts
|
||
if result.conflicts:
|
||
click.echo(f"\n⚠️ Potential Conflicts ({len(result.conflicts)}):")
|
||
for agent1, agent2, reason in result.conflicts:
|
||
click.echo(f" • {agent1} vs {agent2}: {reason}")
|
||
|
||
# Show integration strategy
|
||
if result.integration_strategy:
|
||
click.echo(
|
||
f"\n💡 Recommended Integration Strategy: {result.integration_strategy}"
|
||
)
|
||
|
||
# Show migration recommendations
|
||
if result.migration_recommendations:
|
||
click.echo("\n📋 Migration Recommendations:")
|
||
for recommendation in result.migration_recommendations:
|
||
if recommendation.startswith(" "):
|
||
click.echo(f" {recommendation}")
|
||
else:
|
||
click.echo(f" • {recommendation}")
|
||
|
||
if not result.detected_systems:
|
||
click.echo("\n✨ This project is ready for Kaizen Agentic installation!")
|
||
click.echo(" Run: kaizen-agentic install <agent-names>")
|
||
|
||
|
||
@cli.command()
|
||
@click.option("--target", "-t", default=".", help="Target directory (default: current)")
|
||
@click.option(
|
||
"--dry-run", "-n", is_flag=True, help="Show what would be done without executing"
|
||
)
|
||
@click.option(
|
||
"--auto-resolve", "-a", is_flag=True, help="Automatically resolve simple conflicts"
|
||
)
|
||
def migrate(target: str, dry_run: bool, auto_resolve: bool):
|
||
"""Create migration plan for integrating Kaizen agents into existing project."""
|
||
from .migration import AgentMigrationPlanner, AgentMigrator
|
||
|
||
target_path = Path(target).resolve()
|
||
|
||
if not target_path.exists():
|
||
click.echo(f"Error: Directory not found: {target_path}")
|
||
sys.exit(1)
|
||
|
||
click.echo(f"Creating migration plan for: {target_path}")
|
||
click.echo("=" * 50)
|
||
|
||
planner = AgentMigrationPlanner()
|
||
integration_plan = planner.create_integration_plan(target_path)
|
||
|
||
if (
|
||
not integration_plan.migration_plans
|
||
and not integration_plan.conflict_resolutions
|
||
):
|
||
click.echo("✨ No migration needed - project is ready for Kaizen agents!")
|
||
click.echo(" Run: kaizen-agentic install <agent-names>")
|
||
return
|
||
|
||
# Show migration plans
|
||
if integration_plan.migration_plans:
|
||
click.echo(f"\n🔄 Migration Plans ({len(integration_plan.migration_plans)}):")
|
||
for plan in integration_plan.migration_plans:
|
||
strategy_emoji = {
|
||
"replace": "🔄",
|
||
"extend": "🔗",
|
||
"preserve": "💾",
|
||
"merge": "🔀",
|
||
"remove": "🗑️",
|
||
}
|
||
emoji = strategy_emoji.get(plan.strategy.value, "❓")
|
||
|
||
click.echo(
|
||
f" {emoji} {plan.source_agent.name} ({plan.source_agent.type.value})"
|
||
)
|
||
click.echo(f" Strategy: {plan.strategy.value}")
|
||
if plan.target_agent:
|
||
click.echo(f" Target: {plan.target_agent}")
|
||
|
||
for note in plan.migration_notes:
|
||
click.echo(f" 📝 {note}")
|
||
|
||
# Show conflict resolutions
|
||
if integration_plan.conflict_resolutions:
|
||
click.echo(
|
||
f"\n⚠️ Conflict Resolutions ({len(integration_plan.conflict_resolutions)}):"
|
||
)
|
||
for resolution in integration_plan.conflict_resolutions:
|
||
click.echo(f" • {resolution.agent1} vs {resolution.agent2}")
|
||
click.echo(f" Resolution: {resolution.resolution.value}")
|
||
if resolution.action_details:
|
||
for key, value in resolution.action_details.items():
|
||
click.echo(f" {key}: {value}")
|
||
|
||
# Show integration order
|
||
if integration_plan.integration_order:
|
||
click.echo("\n📋 Integration Order:")
|
||
for i, agent_name in enumerate(integration_plan.integration_order, 1):
|
||
click.echo(f" {i}. {agent_name}")
|
||
|
||
# Show post-migration tasks
|
||
if integration_plan.post_migration_tasks:
|
||
click.echo("\n✅ Post-Migration Tasks:")
|
||
for task in integration_plan.post_migration_tasks:
|
||
click.echo(f" • {task}")
|
||
|
||
# Execute migration if requested
|
||
if not dry_run:
|
||
click.echo("\n🚀 Executing migration...")
|
||
migrator = AgentMigrator()
|
||
results = migrator.execute_migration(integration_plan, dry_run=False)
|
||
|
||
click.echo("\n📊 Migration Results:")
|
||
for agent, result in results.items():
|
||
status_emoji = "✅" if "ERROR" not in result else "❌"
|
||
click.echo(f" {status_emoji} {agent}: {result}")
|
||
|
||
click.echo(f"\n💾 Backup created at: {integration_plan.backup_directory}")
|
||
else:
|
||
click.echo(
|
||
"\n🔍 This was a dry run. Use --no-dry-run to execute the migration."
|
||
)
|
||
click.echo(
|
||
f" Backup would be created at: {integration_plan.backup_directory}"
|
||
)
|
||
|
||
|
||
@cli.group()
|
||
def extensions():
|
||
"""Manage agent extensions."""
|
||
pass
|
||
|
||
|
||
@extensions.command()
|
||
@click.option("--target", "-t", default=".", help="Target directory (default: current)")
|
||
@click.option("--base-agent", "-b", help="Filter by base agent")
|
||
def list_extensions(target: str, base_agent: Optional[str]):
|
||
"""List installed extensions."""
|
||
from .extensions import ExtensionManager
|
||
|
||
target_path = Path(target).resolve()
|
||
manager = ExtensionManager(target_path)
|
||
|
||
extensions_list = manager.list_extensions(base_agent)
|
||
|
||
if not extensions_list:
|
||
if base_agent:
|
||
click.echo(f"No extensions found for agent: {base_agent}")
|
||
else:
|
||
click.echo("No extensions installed in this project")
|
||
return
|
||
|
||
click.echo(f"Extensions in {target_path}:")
|
||
click.echo("=" * 40)
|
||
|
||
for ext in extensions_list:
|
||
status = "✅" if ext.enabled else "❌"
|
||
click.echo(f"\n{status} {ext.name} (extends {ext.base_agent})")
|
||
click.echo(f" Type: {ext.extension_type.value}")
|
||
click.echo(f" Description: {ext.description}")
|
||
click.echo(f" Version: {ext.version}")
|
||
|
||
if ext.custom_commands:
|
||
click.echo(f" Custom commands: {', '.join(ext.custom_commands.keys())}")
|
||
|
||
|
||
@extensions.command()
|
||
@click.argument("name")
|
||
@click.argument("base_agent")
|
||
@click.option("--target", "-t", default=".", help="Target directory (default: current)")
|
||
@click.option("--description", "-d", help="Extension description")
|
||
@click.option("--template", default="basic", help="Template type (basic, advanced)")
|
||
def create(
|
||
name: str, base_agent: str, target: str, description: Optional[str], template: str
|
||
):
|
||
"""Create a new agent extension."""
|
||
from .extensions import ExtensionManager, ExtensionType, create_extension_template
|
||
|
||
target_path = Path(target).resolve()
|
||
manager = ExtensionManager(target_path)
|
||
|
||
# Generate template
|
||
template_content = create_extension_template(
|
||
name, base_agent, target_path, template
|
||
)
|
||
|
||
# Save template to file
|
||
template_dir = target_path / ".kaizen" / "extensions" / name
|
||
template_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
template_file = template_dir / "template.md"
|
||
template_file.write_text(template_content)
|
||
|
||
# Create basic extension
|
||
manager.create_extension(
|
||
name=name,
|
||
base_agent=base_agent,
|
||
extension_type=ExtensionType.FUNCTIONAL_EXTENSION,
|
||
description=description or f"Custom extension for {base_agent}",
|
||
)
|
||
|
||
click.echo(f"✅ Created extension: {name}")
|
||
click.echo(f" Base agent: {base_agent}")
|
||
click.echo(f" Template saved to: {template_file}")
|
||
click.echo(
|
||
f" Edit the configuration and run: kaizen-agentic extensions enable {name}"
|
||
)
|
||
|
||
|
||
@extensions.command()
|
||
@click.argument("name")
|
||
@click.option("--target", "-t", default=".", help="Target directory (default: current)")
|
||
def enable(name: str, target: str):
|
||
"""Enable an extension."""
|
||
from .extensions import ExtensionManager
|
||
|
||
target_path = Path(target).resolve()
|
||
manager = ExtensionManager(target_path)
|
||
|
||
if manager.enable_extension(name):
|
||
click.echo(f"✅ Enabled extension: {name}")
|
||
else:
|
||
click.echo(f"❌ Extension not found: {name}")
|
||
|
||
|
||
@extensions.command()
|
||
@click.argument("name")
|
||
@click.option("--target", "-t", default=".", help="Target directory (default: current)")
|
||
def disable(name: str, target: str):
|
||
"""Disable an extension."""
|
||
from .extensions import ExtensionManager
|
||
|
||
target_path = Path(target).resolve()
|
||
manager = ExtensionManager(target_path)
|
||
|
||
if manager.disable_extension(name):
|
||
click.echo(f"❌ Disabled extension: {name}")
|
||
else:
|
||
click.echo(f"❌ Extension not found: {name}")
|
||
|
||
|
||
@extensions.command()
|
||
@click.argument("name")
|
||
@click.option("--target", "-t", default=".", help="Target directory (default: current)")
|
||
@click.confirmation_option(prompt="Are you sure you want to remove this extension?")
|
||
def remove(name: str, target: str):
|
||
"""Remove an extension."""
|
||
from .extensions import ExtensionManager
|
||
|
||
target_path = Path(target).resolve()
|
||
manager = ExtensionManager(target_path)
|
||
|
||
if manager.remove_extension(name):
|
||
click.echo(f"🗑️ Removed extension: {name}")
|
||
else:
|
||
click.echo(f"❌ Extension not found: {name}")
|
||
|
||
|
||
@cli.group()
|
||
def memory():
|
||
"""Manage project-scoped agent memory (.kaizen/agents/<name>/memory.md)."""
|
||
pass
|
||
|
||
|
||
@memory.command("show")
|
||
@click.argument("agent_name")
|
||
@click.option("--target", "-t", default=".", help="Project root (default: current)")
|
||
def memory_show(agent_name: str, target: str):
|
||
"""Print agent memory for the current project."""
|
||
memory_path = _memory_path(target, agent_name)
|
||
|
||
if not memory_path.exists():
|
||
click.echo(f"No memory found for agent '{agent_name}'.")
|
||
click.echo(f" Expected: {memory_path}")
|
||
click.echo(f" Run: kaizen-agentic memory init {agent_name}")
|
||
return
|
||
|
||
click.echo(memory_path.read_text())
|
||
|
||
|
||
@memory.command("init")
|
||
@click.argument("agent_name")
|
||
@click.option("--target", "-t", default=".", help="Project root (default: current)")
|
||
@click.option(
|
||
"--no-metrics",
|
||
is_flag=True,
|
||
help="Skip scaffolding .kaizen/metrics/<agent>/ (default: create metrics dir)",
|
||
)
|
||
def memory_init(agent_name: str, target: str, no_metrics: bool):
|
||
"""Scaffold an empty memory file for an agent."""
|
||
memory_path = _memory_path(target, agent_name)
|
||
|
||
if memory_path.exists():
|
||
click.echo(f"Memory file already exists: {memory_path}")
|
||
return
|
||
|
||
memory_path.parent.mkdir(parents=True, exist_ok=True)
|
||
project_name = Path(target).resolve().name
|
||
|
||
content = f"""---
|
||
agent: {agent_name}
|
||
project: {project_name}
|
||
last_updated: {_today()}
|
||
session_count: 0
|
||
---
|
||
|
||
## Project Context
|
||
<!-- What this agent knows about the project it works in -->
|
||
|
||
## Accumulated Findings
|
||
<!-- Patterns, recurring issues, key decisions encountered -->
|
||
|
||
## What Worked
|
||
<!-- Approaches that produced good results in this project -->
|
||
|
||
## Watch Points
|
||
<!-- Recurring risks, traps, or areas requiring extra care -->
|
||
|
||
## Open Threads
|
||
<!-- Things noticed but not yet acted on -->
|
||
|
||
## Session Log
|
||
<!-- One-line entry per session: date · summary · outcome -->
|
||
"""
|
||
memory_path.write_text(content)
|
||
click.echo(f"Initialized memory for '{agent_name}': {memory_path}")
|
||
|
||
if not no_metrics:
|
||
metrics_dir = MetricsStore(Path(target), agent_name).scaffold()
|
||
click.echo(f"Initialized metrics for '{agent_name}': {metrics_dir}")
|
||
|
||
# For agents with protocols, note the protocol location
|
||
registry = _get_registry()
|
||
protocols_dir = registry.agents_dir / "protocols" / agent_name
|
||
if protocols_dir.exists():
|
||
slugs = [f.stem for f in sorted(protocols_dir.glob("*.md")) if f.name != "README.md"]
|
||
if slugs:
|
||
click.echo(f" Protocols available for '{agent_name}':")
|
||
for slug in slugs:
|
||
click.echo(f" kaizen-agentic protocols show {agent_name} {slug}")
|
||
|
||
|
||
@memory.command("brief")
|
||
@click.argument("agent_name")
|
||
@click.option("--target", "-t", default=".", help="Project root (default: current)")
|
||
@click.option("--raw", is_flag=True, help="Dump raw memory files without synthesis header")
|
||
def memory_brief(agent_name: str, target: str, raw: bool):
|
||
"""Print a coach-synthesised orientation for an agent.
|
||
|
||
Reads all agent memories in the project and formats an orientation brief
|
||
for the specified agent, following the coach agent (agents/agent-coach.md)
|
||
output format. Pass to a Claude session with the coach agent loaded for
|
||
full LLM synthesis.
|
||
"""
|
||
project_root = Path(target).resolve()
|
||
kaizen_dir = project_root / ".kaizen" / "agents"
|
||
project_name = project_root.name
|
||
|
||
# Collect all agent memories
|
||
own_memory: Optional[str] = None
|
||
other_memories: dict = {}
|
||
|
||
if kaizen_dir.exists():
|
||
for agent_dir in sorted(kaizen_dir.iterdir()):
|
||
if not agent_dir.is_dir():
|
||
continue
|
||
mf = agent_dir / "memory.md"
|
||
if not mf.exists():
|
||
continue
|
||
if agent_dir.name == agent_name:
|
||
own_memory = mf.read_text()
|
||
else:
|
||
other_memories[agent_dir.name] = mf.read_text()
|
||
|
||
if raw:
|
||
if own_memory:
|
||
click.echo(f"=== {agent_name} ===\n{own_memory}")
|
||
for name, content in other_memories.items():
|
||
click.echo(f"=== {name} ===\n{content}")
|
||
return
|
||
|
||
from datetime import date as _date
|
||
today = _date.today().isoformat()
|
||
sources = ([agent_name] if own_memory else []) + list(other_memories.keys())
|
||
|
||
click.echo(f"## Orientation Brief for: {agent_name}")
|
||
click.echo(f"Project: {project_name}")
|
||
click.echo(f"Generated: {today}")
|
||
click.echo(f"Sources: {', '.join(sources) if sources else 'none'}")
|
||
click.echo()
|
||
|
||
if not sources:
|
||
click.echo("No agent memory files found in this project.")
|
||
click.echo(f" Run: kaizen-agentic memory init {agent_name}")
|
||
click.echo(" Then load the coach agent (agents/agent-coach.md) for synthesis.")
|
||
return
|
||
|
||
# Own memory section
|
||
if own_memory:
|
||
click.echo("### Your Memory")
|
||
click.echo(own_memory)
|
||
else:
|
||
click.echo(f"### Your Memory\n(none — run: kaizen-agentic memory init {agent_name})\n")
|
||
|
||
# Cross-agent context
|
||
if other_memories:
|
||
click.echo("### Context From Other Agents")
|
||
click.echo("(Load coach agent for full synthesis. Raw content below.)\n")
|
||
for name, content in other_memories.items():
|
||
click.echo(f"--- {name} ---")
|
||
click.echo(content)
|
||
else:
|
||
click.echo("### Context From Other Agents\nNo other agent memories found in this project.\n")
|
||
|
||
click.echo("---")
|
||
click.echo("Tip: Load agents/agent-coach.md in your Claude session and pass this output")
|
||
click.echo(" for a full cross-agent synthesis and orientation brief.")
|
||
|
||
|
||
@memory.command("clear")
|
||
@click.argument("agent_name")
|
||
@click.option("--target", "-t", default=".", help="Project root (default: current)")
|
||
@click.confirmation_option(prompt="This will permanently delete the agent memory. Continue?")
|
||
def memory_clear(agent_name: str, target: str):
|
||
"""Wipe agent memory for the current project."""
|
||
memory_path = _memory_path(target, agent_name)
|
||
|
||
if not memory_path.exists():
|
||
click.echo(f"No memory found for agent '{agent_name}' — nothing to clear.")
|
||
return
|
||
|
||
memory_path.unlink()
|
||
click.echo(f"Cleared memory for '{agent_name}': {memory_path}")
|
||
|
||
# Remove empty parent directory
|
||
if not any(memory_path.parent.iterdir()):
|
||
memory_path.parent.rmdir()
|
||
|
||
|
||
@cli.group()
|
||
def metrics():
|
||
"""Manage project-scoped agent metrics (.kaizen/metrics/<agent>/)."""
|
||
pass
|
||
|
||
|
||
@metrics.command("record")
|
||
@click.argument("agent_name")
|
||
@click.option("--target", "-t", default=".", help="Project root (default: current)")
|
||
@click.option("--success", "outcome_success", is_flag=True, help="Record successful execution")
|
||
@click.option("--failure", "outcome_failure", is_flag=True, help="Record failed execution")
|
||
@click.option("--time", "execution_time", type=float, help="Execution time in seconds")
|
||
@click.option("--quality", type=float, help="Quality score 0.0–1.0")
|
||
@click.option("--session-id", help="Optional session identifier")
|
||
@click.option("--idempotency-key", help="Skip append if this key was already recorded")
|
||
@click.option("--json", "json_input", is_flag=True, help="Read full record JSON from stdin")
|
||
def metrics_record(
|
||
agent_name: str,
|
||
target: str,
|
||
outcome_success: bool,
|
||
outcome_failure: bool,
|
||
execution_time: Optional[float],
|
||
quality: Optional[float],
|
||
session_id: Optional[str],
|
||
idempotency_key: Optional[str],
|
||
json_input: bool,
|
||
):
|
||
"""Append one execution record for an agent."""
|
||
store = MetricsStore(_project_root(target), agent_name)
|
||
|
||
if json_input:
|
||
payload = json.load(sys.stdin)
|
||
if not isinstance(payload, dict):
|
||
click.echo("Error: JSON input must be an object", err=True)
|
||
sys.exit(1)
|
||
else:
|
||
if outcome_success and outcome_failure:
|
||
click.echo("Error: use only one of --success or --failure", err=True)
|
||
sys.exit(1)
|
||
if not outcome_success and not outcome_failure:
|
||
click.echo("Error: specify --success or --failure (or use --json)", err=True)
|
||
sys.exit(1)
|
||
payload = {"success": outcome_success}
|
||
if execution_time is not None:
|
||
payload["execution_time_s"] = execution_time
|
||
if quality is not None:
|
||
payload["quality_score"] = quality
|
||
if session_id:
|
||
payload["session_id"] = session_id
|
||
|
||
if store.append(payload, idempotency_key=idempotency_key):
|
||
click.echo(f"Recorded metrics for '{agent_name}'")
|
||
else:
|
||
click.echo(f"Skipped duplicate record for '{agent_name}' (idempotency key exists)")
|
||
|
||
|
||
@metrics.command("show")
|
||
@click.argument("agent_name")
|
||
@click.option("--target", "-t", default=".", help="Project root (default: current)")
|
||
@click.option("--limit", "-n", default=5, show_default=True, help="Recent executions to show")
|
||
def metrics_show(agent_name: str, target: str, limit: int):
|
||
"""Print metrics summary and recent executions for an agent."""
|
||
store = MetricsStore(_project_root(target), agent_name)
|
||
|
||
if not store.executions_path.exists():
|
||
click.echo(f"No metrics found for agent '{agent_name}'.")
|
||
click.echo(f" Expected: {store.agent_dir}")
|
||
click.echo(f" Run: kaizen-agentic memory init {agent_name}")
|
||
return
|
||
|
||
summary = store.read_summary() or store.write_summary()
|
||
click.echo(f"Metrics for '{agent_name}':")
|
||
click.echo("=" * 40)
|
||
click.echo(json.dumps(summary, indent=2))
|
||
|
||
records = store.read_executions()
|
||
if records:
|
||
click.echo("\nRecent executions:")
|
||
for record in records[-limit:]:
|
||
click.echo(json.dumps(record, sort_keys=True))
|
||
|
||
|
||
@metrics.command("list")
|
||
@click.option("--target", "-t", default=".", help="Project root (default: current)")
|
||
def metrics_list(target: str):
|
||
"""List agents with metrics in the current project."""
|
||
agents = MetricsStore.list_agents(_project_root(target))
|
||
if not agents:
|
||
click.echo("No agent metrics found in this project.")
|
||
click.echo(" Run: kaizen-agentic memory init <agent>")
|
||
return
|
||
|
||
click.echo("Agents with metrics:")
|
||
for name in agents:
|
||
store = MetricsStore(_project_root(target), name)
|
||
summary = store.read_summary()
|
||
count = summary["execution_count"] if summary else len(store.read_executions())
|
||
click.echo(f" • {name} ({count} executions)")
|
||
|
||
|
||
@metrics.command("optimize")
|
||
@click.argument("agent_name", required=False)
|
||
@click.option("--target", "-t", default=".", help="Project root (default: current)")
|
||
@click.option(
|
||
"--min-samples",
|
||
default=MIN_SAMPLES_FOR_RECOMMENDATIONS,
|
||
show_default=True,
|
||
help="Minimum execution records required for recommendations",
|
||
)
|
||
def metrics_optimize(agent_name: Optional[str], target: str, min_samples: int):
|
||
"""Run optimizer analysis on project metrics and write recommendations."""
|
||
project_root = _project_root(target)
|
||
agents = [agent_name] if agent_name else MetricsStore.list_agents(project_root)
|
||
|
||
if not agents:
|
||
click.echo("No agent metrics found to optimize.")
|
||
click.echo(" Record executions with: kaizen-agentic metrics record <agent> --success")
|
||
return
|
||
|
||
optimizer_store = OptimizerStore(project_root)
|
||
combined_reports = []
|
||
|
||
for name in agents:
|
||
store = MetricsStore(project_root, name)
|
||
records = store.read_executions()
|
||
loop = OptimizationLoop.from_metrics_store(store, min_samples=1)
|
||
report = loop.get_optimization_report_json()
|
||
report["sample_threshold"] = min_samples
|
||
report["meets_sample_threshold"] = len(records) >= min_samples
|
||
combined_reports.append(report)
|
||
|
||
click.echo(f"Agent: {name}")
|
||
click.echo("=" * 40)
|
||
click.echo(json.dumps(report, indent=2))
|
||
|
||
if len(records) >= min_samples:
|
||
optimizer_store.append_recommendations(
|
||
name,
|
||
report["recommendations"],
|
||
metrics_count=len(records),
|
||
)
|
||
else:
|
||
click.echo(
|
||
f" Note: {len(records)} record(s) — need {min_samples} for actionable recommendations"
|
||
)
|
||
click.echo()
|
||
|
||
analysis_payload = {
|
||
"project": project_root.name,
|
||
"optimized_at": _today(),
|
||
"min_samples": min_samples,
|
||
"agents": combined_reports,
|
||
}
|
||
analysis_path = optimizer_store.write_analysis(analysis_payload)
|
||
click.echo(f"Wrote optimizer analysis: {analysis_path}")
|
||
|
||
|
||
@metrics.command("export")
|
||
@click.argument("agent_name")
|
||
@click.option("--target", "-t", default=".", help="Project root (default: current)")
|
||
def metrics_export(agent_name: str, target: str):
|
||
"""Dump executions.jsonl for an agent to stdout."""
|
||
store = MetricsStore(_project_root(target), agent_name)
|
||
if not store.executions_path.exists():
|
||
click.echo(f"No metrics found for agent '{agent_name}'.", err=True)
|
||
sys.exit(1)
|
||
click.echo(store.executions_path.read_text(encoding="utf-8"), nl=False)
|
||
|
||
|
||
@cli.group()
|
||
def protocols():
|
||
"""Browse agent protocol runbooks (agents/protocols/<agent>/<slug>.md)."""
|
||
pass
|
||
|
||
|
||
@protocols.command("list")
|
||
@click.argument("agent_name", required=False)
|
||
def protocols_list(agent_name: Optional[str]):
|
||
"""List available protocols, optionally filtered by agent."""
|
||
registry = _get_registry()
|
||
protocols_dir = registry.agents_dir / "protocols"
|
||
|
||
if not protocols_dir.exists():
|
||
click.echo("No protocols directory found.")
|
||
return
|
||
|
||
found = []
|
||
agent_dirs = (
|
||
[protocols_dir / agent_name] if agent_name else sorted(protocols_dir.iterdir())
|
||
)
|
||
for agent_dir in agent_dirs:
|
||
if not agent_dir.is_dir() or agent_dir.name == "__pycache__":
|
||
continue
|
||
for protocol_file in sorted(agent_dir.glob("*.md")):
|
||
if protocol_file.name == "README.md":
|
||
continue
|
||
# Try to read title from frontmatter
|
||
title = protocol_file.stem.replace("-", " ").title()
|
||
try:
|
||
content = protocol_file.read_text()
|
||
for line in content.splitlines():
|
||
if line.startswith("title:"):
|
||
title = line.split(":", 1)[1].strip().strip('"')
|
||
break
|
||
except Exception:
|
||
pass
|
||
found.append((agent_dir.name, protocol_file.stem, title))
|
||
|
||
if not found:
|
||
if agent_name:
|
||
click.echo(f"No protocols found for agent '{agent_name}'.")
|
||
else:
|
||
click.echo("No protocols found.")
|
||
return
|
||
|
||
click.echo("Available Protocols:")
|
||
click.echo("=" * 40)
|
||
current_agent = None
|
||
for agent, slug, title in found:
|
||
if agent != current_agent:
|
||
click.echo(f"\n {agent}:")
|
||
current_agent = agent
|
||
click.echo(f" • {slug}: {title}")
|
||
|
||
|
||
@protocols.command("show")
|
||
@click.argument("agent_name")
|
||
@click.argument("slug")
|
||
def protocols_show(agent_name: str, slug: str):
|
||
"""Print a protocol runbook."""
|
||
registry = _get_registry()
|
||
protocol_path = registry.agents_dir / "protocols" / agent_name / f"{slug}.md"
|
||
|
||
if not protocol_path.exists():
|
||
click.echo(f"Protocol not found: {agent_name}/{slug}")
|
||
click.echo(f" Expected: {protocol_path}")
|
||
click.echo(f" Run: kaizen-agentic protocols list {agent_name}")
|
||
return
|
||
|
||
click.echo(protocol_path.read_text())
|
||
|
||
|
||
def _project_root(target: str) -> Path:
|
||
return Path(target).resolve()
|
||
|
||
|
||
def _memory_path(target: str, agent_name: str) -> Path:
|
||
return _project_root(target) / ".kaizen" / "agents" / agent_name / "memory.md"
|
||
|
||
|
||
def _today() -> str:
|
||
from datetime import date
|
||
return date.today().isoformat()
|
||
|
||
|
||
def _get_registry() -> AgentRegistry:
|
||
"""Get the agent registry."""
|
||
# Try to find agents directory
|
||
current_dir = Path.cwd()
|
||
|
||
# Check if we're in a kaizen-agentic project
|
||
if (current_dir / "agents").exists():
|
||
agents_dir = current_dir / "agents"
|
||
elif (current_dir / "src" / "kaizen_agentic").exists():
|
||
# We're in the kaizen-agentic repo itself
|
||
agents_dir = current_dir / "agents"
|
||
else:
|
||
# Try to find installed package
|
||
try:
|
||
import kaizen_agentic
|
||
|
||
package_dir = Path(kaizen_agentic.__file__).parent.parent.parent
|
||
agents_dir = package_dir / "agents"
|
||
if not agents_dir.exists():
|
||
# Try relative to package
|
||
agents_dir = Path(kaizen_agentic.__file__).parent / "data" / "agents"
|
||
except ImportError:
|
||
click.echo("Error: Could not find agents directory")
|
||
click.echo(
|
||
"Make sure you're in a kaizen-agentic project or have the package installed"
|
||
)
|
||
sys.exit(1)
|
||
|
||
if not agents_dir.exists():
|
||
click.echo(f"Error: Agents directory not found: {agents_dir}")
|
||
sys.exit(1)
|
||
|
||
return AgentRegistry(agents_dir)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
cli()
|