"""Command-line interface for Kaizen Agentic agent management.""" import json import sys import contextlib import io import click from pathlib import Path from typing import List, Optional from .registry import AgentRegistry, AgentCategory from .installer import AgentInstaller, ProjectInitializer, InstallationConfig from .integrations.artifact_store import ( default_api_token, default_api_url, publish_optimizer_evidence, ) from .integrations.event_bus import ( build_metrics_recorded_envelope, publish_metrics_recorded_event, resolve_project_slug, ) from .integrations.helix import HelixCorrelationAdapter, enrich_helix_correlation from .metrics import MetricsStore, OptimizerStore, performance_summary_markdown from .optimization import OptimizationLoop, MIN_SAMPLES_FOR_RECOMMENDATIONS from .engagement_promote import promote_engagement from .schedule import ( ScheduleError, default_schedule_yaml, engagement_schedule_yaml, load_schedule, schedule_path, validate_schedule, ) from .agent_docs import ( render_installed_agents_section, upsert_installed_agents_section, ) def safe_cli_wrapper(): """ Wrapper to handle Click errors gracefully and provide clean user experience. WORKAROUND FOR CLICK LIBRARY ISSUE: =================================== This function addresses a spurious error message that appears when using Click with certain argument configurations. The issue manifests as: "Error: Got unexpected extra argument (agent-name)" Despite this error message, the underlying CLI function executes correctly. This appears to be a Click library display/buffering issue where error handling interferes with normal execution flow. AFFECTED COMMANDS: install, update ISSUE DETAILS: - Affects: Click library (tested with Click 8.x series) - Symptom: Misleading error messages during successful command execution - Impact: Confusing user experience despite functional CLI - Root cause: Click's argument validation timing/display mechanism WORKAROUND APPROACH: - Capture stdout/stderr streams during CLI execution - Detect spurious error patterns specific to known issues - Filter misleading messages while preserving legitimate errors - Provide clean output for successful operations TODO: REVISIT WHEN CLICK UPDATES ================================ Monitor Click library releases and test removal of this workaround: - Test with Click 9.x+ releases - Remove this wrapper if the underlying issue is resolved - Update entry point back to direct CLI function: kaizen_agentic.cli:cli TESTING: This workaround is covered by tests in test_cli_error_handling.py """ # Capture stderr to intercept spurious error messages stderr_capture = io.StringIO() stdout_capture = io.StringIO() # Check if this is an install or update command before processing affected_commands = len(sys.argv) >= 2 and sys.argv[1] in ["install", "update"] try: with contextlib.redirect_stderr(stderr_capture), contextlib.redirect_stdout( stdout_capture ): cli(standalone_mode=False) except click.UsageError as e: if affected_commands and "Got unexpected extra argument" in str(e): # This is the spurious error for install/update commands # Check if we got some stdout output indicating success captured_stdout = stdout_capture.getvalue() success_indicators = [ "Installing agents to:", "Updating all installed agents:", ] if any(indicator in captured_stdout for indicator in success_indicators): # The command was actually executing, show the real output print(captured_stdout, end="") sys.exit(0) else: # This might be a real error print(f"Error: {e}") sys.exit(2) else: # Legitimate error for other commands print(f"Error: {e}") sys.exit(2) except SystemExit as e: # Show captured output and handle exits captured_stdout = stdout_capture.getvalue() captured_stderr = stderr_capture.getvalue() if e.code == 0: # Successful exit print(captured_stdout, end="") else: # Error exit - show both stdout and stderr unless it's the spurious error if affected_commands and "Got unexpected extra argument" in captured_stderr: # Show only stdout for install/update commands with spurious errors print(captured_stdout, end="") success_indicators = [ "Installing agents to:", "Updating all installed agents:", ] if any( indicator in captured_stdout for indicator in success_indicators ): sys.exit(0) # Override error exit if we see success indicators else: # Show everything for other commands print(captured_stdout, end="") print(captured_stderr, end="", file=sys.stderr) sys.exit(e.code) except Exception as e: print(f"Error: {e}") sys.exit(1) # If we get here, show captured output print(stdout_capture.getvalue(), end="") stderr_content = stderr_capture.getvalue() if stderr_content and not ( affected_commands and "Got unexpected extra argument" in stderr_content ): print(stderr_content, end="", file=sys.stderr) _FEEDBACK_CHANNELS = { "issues": "https://gitea.coulomb.social/coulomb/kaizen-agentic/issues", "issue_templates": "https://gitea.coulomb.social/coulomb/kaizen-agentic/issues/new/choose", "feedback_guide": ( "https://gitea.coulomb.social/coulomb/kaizen-agentic/" "src/branch/main/docs/FEEDBACK.md" ), "contributing": ( "https://gitea.coulomb.social/coulomb/kaizen-agentic/" "src/branch/main/CONTRIBUTING.md" ), } @click.group() @click.version_option() def cli(): """Kaizen Agentic - AI agent development framework.""" pass @cli.command("feedback") @click.option("--json", "as_json", is_flag=True, help="Emit machine-readable JSON") def feedback(as_json: bool): """Show how to submit bugs, ideas, and adoption feedback.""" payload = { "channels": _FEEDBACK_CHANNELS, "templates": ["bug_report", "feature_request", "feedback"], "cli_hint": ( "Use Gitea issue templates or State Hub messages " "for cross-repo coordination" ), } if as_json: click.echo(json.dumps(payload, indent=2, sort_keys=True)) return click.echo("Kaizen Agentic — feedback channels") click.echo("=" * 40) click.echo(f"Issues: {_FEEDBACK_CHANNELS['issues']}") click.echo(f"New issue: {_FEEDBACK_CHANNELS['issue_templates']}") click.echo(f"Feedback guide: {_FEEDBACK_CHANNELS['feedback_guide']}") click.echo(f"Contributing: {_FEEDBACK_CHANNELS['contributing']}") click.echo() click.echo("Templates: bug report · feature request · general feedback") click.echo( "Tip: include Python version and `kaizen-agentic --version` in bug reports." ) @cli.command("list") @click.option( "--category", type=click.Choice([c.value for c in AgentCategory]), help="Filter by category", ) @click.option("--verbose", "-v", is_flag=True, help="Show detailed information") def list_agents(category: Optional[str], verbose: bool): """List available agents.""" registry = _get_registry() if category: cat_enum = AgentCategory(category) agents = registry.list_agents(cat_enum) click.echo(f"\n{category.replace('-', ' ').title()} Agents:") click.echo("=" * 40) else: if verbose: categories = registry.get_categories() for cat, agents in categories.items(): click.echo( f"\n{cat.value.replace('-', ' ').title()} ({len(agents)} agents):" ) click.echo("=" * 50) for agent in agents: click.echo(f" • {agent.name}: {agent.description}") return else: agents = registry.list_agents() click.echo(f"\nAvailable Agents ({len(agents)} total):") click.echo("=" * 40) for agent in agents: if verbose: click.echo(f"\n{agent.name}") click.echo(f" Description: {agent.description}") click.echo(f" Category: {agent.category.value}") if agent.dependencies: click.echo(f" Dependencies: {', '.join(agent.dependencies)}") else: click.echo(f" • {agent.name}: {agent.description}") @cli.command() @click.argument("agents", nargs=-1, required=True) @click.option("--target", "-t", default=".", help="Target directory (default: current)") @click.option("--no-backup", is_flag=True, help="Skip creating backup") @click.option("--no-docs", is_flag=True, help="Skip updating documentation") def install(agents: List[str], target: str, no_backup: bool, no_docs: bool): """ Install agents into a project. NOTE: This command is affected by a Click library issue that causes spurious "Got unexpected extra argument" messages. This is handled by safe_cli_wrapper(). See safe_cli_wrapper() docstring for details and removal timeline. """ try: registry = _get_registry() installer = AgentInstaller(registry) target_path = Path(target).resolve() config = InstallationConfig( target_dir=target_path, claude_config_path=target_path / "CLAUDE.md", makefile_path=target_path / "Makefile", update_docs=not no_docs, create_backup=not no_backup, ) click.echo(f"Installing agents to: {target_path}") # Resolve dependencies with fallback try: resolved = registry.resolve_dependencies(list(agents)) if len(resolved) > len(agents): additional = [a for a in resolved if a not in agents] click.echo(f"Including dependencies: {', '.join(additional)}") except Exception: # Fall back to original agent list if dependency resolution fails resolved = list(agents) results = installer.install_agents(resolved, config) # Display results success_count = 0 for agent_name, status in results.items(): if status == "INSTALLED": click.echo(f" ✅ {agent_name}") success_count += 1 else: click.echo(f" ❌ {agent_name}: {status}") click.echo(f"\nInstalled {success_count}/{len(results)} agents successfully") # Force successful exit to override any Click error handling sys.exit(0) except Exception as e: click.echo(f"Installation failed: {e}") sys.exit(1) @cli.command() @click.option("--target", "-t", default=".", help="Target directory (default: current)") @click.argument("agents", nargs=-1) def update(target: str, agents: List[str]): """ Update installed agents. NOTE: This command is affected by a Click library issue that causes spurious "Got unexpected extra argument" messages. This is handled by safe_cli_wrapper(). See safe_cli_wrapper() docstring for details and removal timeline. """ registry = _get_registry() installer = AgentInstaller(registry) target_path = Path(target).resolve() if not agents: agents = installer.list_installed_agents(target_path) if not agents: click.echo("No agents installed in this project") return click.echo(f"Updating all installed agents: {', '.join(agents)}") else: click.echo(f"Updating specific agents: {', '.join(agents)}") results = installer.update_agents(target_path, list(agents)) # Display results success_count = 0 for agent_name, status in results.items(): if status == "INSTALLED": click.echo(f" ✅ {agent_name}") success_count += 1 else: click.echo(f" ❌ {agent_name}: {status}") click.echo(f"\nUpdated {success_count}/{len(results)} agents successfully") @cli.command() @click.argument("agents", nargs=-1, required=True) @click.option("--target", "-t", default=".", help="Target directory (default: current)") def remove(agents: List[str], target: str): """Remove agents from a project.""" registry = _get_registry() installer = AgentInstaller(registry) target_path = Path(target).resolve() click.echo(f"Removing agents from: {target_path}") results = installer.remove_agents(list(agents), target_path) # Display results for agent_name, status in results.items(): if status == "REMOVED": click.echo(f" ✅ {agent_name}") elif status == "NOT_FOUND": click.echo(f" ⚠️ {agent_name}: Not installed") else: click.echo(f" ❌ {agent_name}: {status}") @cli.command() @click.argument("project_name") @click.option( "--template", "-t", default="python-basic", help="Project template (python-basic, python-web, python-cli, python-data)", ) @click.option("--agents", "-a", help="Comma-separated list of agents to install") @click.option( "--parent-dir", default=".", help="Parent directory for project (default: current)" ) def init(project_name: str, template: str, agents: Optional[str], parent_dir: str): """Initialize a new project with agents.""" registry = _get_registry() initializer = ProjectInitializer(registry) project_path = Path(parent_dir) / project_name if project_path.exists(): click.echo(f"Error: Directory {project_path} already exists") sys.exit(1) # Parse agent list agent_list = None if agents: agent_list = [a.strip() for a in agents.split(",")] click.echo(f"Initializing project: {project_name}") click.echo(f"Template: {template}") # Show available templates templates = registry.get_agent_templates() if template not in templates: click.echo(f"Error: Unknown template '{template}'") click.echo(f"Available templates: {', '.join(templates.keys())}") sys.exit(1) if not agent_list: agent_list = templates[template] click.echo(f"Using template agents: {', '.join(agent_list)}") else: click.echo(f"Using custom agents: {', '.join(agent_list)}") results = initializer.init_project(project_path, template, agent_list, project_name) # Display results success_count = sum(1 for status in results.values() if status == "INSTALLED") click.echo(f"\nProject initialized with {success_count}/{len(results)} agents") click.echo("\nNext steps:") click.echo(f" cd {project_name}") click.echo(" make setup-complete # Set up development environment") click.echo(" make test # Run tests") @cli.command() @click.option("--target", "-t", default=".", help="Target directory (default: current)") def validate(target: str): """Validate agents in a project.""" registry = _get_registry() installer = AgentInstaller(registry) target_path = Path(target).resolve() # Validate agent frontmatter schema click.echo("Validating agent frontmatter schema...") schema_errors = registry.validate_frontmatter_schema() if schema_errors: click.echo("Frontmatter schema errors:") for agent_file, errors in schema_errors.items(): click.echo(f" {agent_file}:") for error in errors: click.echo(f" ❌ {error}") else: click.echo(" ✅ Frontmatter schema validation passed") # Validate registry agents click.echo("\nValidating agent registry...") registry_errors = registry.validate_agents() if registry_errors: click.echo("Registry validation errors:") for agent, errors in registry_errors.items(): click.echo(f" {agent}:") for error in errors: click.echo(f" ❌ {error}") else: click.echo(" ✅ Registry validation passed") # Validate installed agents click.echo(f"\nValidating installed agents in: {target_path}") install_errors = installer.validate_installation(target_path) if install_errors: click.echo("Installation validation errors:") for agent, errors in install_errors.items(): click.echo(f" {agent}:") for error in errors: click.echo(f" ❌ {error}") else: click.echo(" ✅ Installation validation passed") # Show installed agents installed = installer.list_installed_agents(target_path) if installed: click.echo(f"\nInstalled agents ({len(installed)}):") for agent in installed: click.echo(f" • {agent}") else: click.echo("\nNo agents installed in this project") @cli.command() def templates(): """List available project templates.""" registry = _get_registry() templates = registry.get_agent_templates() click.echo("Available Project Templates:") click.echo("=" * 40) for template_name, agent_list in templates.items(): click.echo(f"\n{template_name}:") click.echo(f" Agents ({len(agent_list)}): {', '.join(agent_list)}") @cli.command() @click.option("--target", "-t", default=".", help="Target directory (default: current)") def status(target: str): """Show status of agents in a project.""" registry = _get_registry() installer = AgentInstaller(registry) target_path = Path(target).resolve() click.echo(f"Project: {target_path.name}") click.echo(f"Path: {target_path}") click.echo("=" * 50) # Check if agents directory exists agents_dir = target_path / "agents" if not agents_dir.exists(): click.echo("❌ No agents directory found") click.echo("\nRun 'kaizen-agentic init' to initialize a new project") click.echo("or 'kaizen-agentic install ' to add agents") return # List installed agents installed = installer.list_installed_agents(target_path) if installed: click.echo(f"✅ Agents installed ({len(installed)}):") # Group by category categories = {} for agent_name in installed: agent = registry.get_agent(agent_name) if agent: cat = agent.category.value if cat not in categories: categories[cat] = [] categories[cat].append(agent_name) else: if "unknown" not in categories: categories["unknown"] = [] categories["unknown"].append(agent_name) for category, agents in categories.items(): click.echo(f"\n {category.replace('-', ' ').title()}:") for agent in agents: click.echo(f" • {agent}") else: click.echo("❌ No agents installed") # Check for configuration files click.echo("\nConfiguration files:") config_files = ["CLAUDE.md", "Makefile", "pyproject.toml", ".gitignore"] for config_file in config_files: file_path = target_path / config_file if file_path.exists(): click.echo(f" ✅ {config_file}") else: click.echo(f" ❌ {config_file}") @cli.command() @click.option("--target", "-t", default=".", help="Target directory (default: current)") @click.option("--detailed", "-d", is_flag=True, help="Show detailed analysis") def detect(target: str, detailed: bool): """Detect existing agent systems in a project.""" from .detection import AgentSystemDetector target_path = Path(target).resolve() if not target_path.exists(): click.echo(f"Error: Directory not found: {target_path}") sys.exit(1) click.echo(f"Detecting agent systems in: {target_path}") click.echo("=" * 50) detector = AgentSystemDetector() result = detector.detect_agent_systems(target_path) # Show detected systems if result.detected_systems: click.echo(f"\n🔍 Detected Agent Systems ({len(result.detected_systems)}):") for system in result.detected_systems: click.echo(f" • {system.value}") else: click.echo("\n🔍 No existing agent systems detected") # Show detected agents if result.agents: click.echo(f"\n🤖 Detected Agents ({len(result.agents)}):") for agent in result.agents: status = "✅" if agent.can_migrate else "⚠️" click.echo(f" {status} {agent.name} ({agent.type.value})") if detailed and agent.description: click.echo(f" {agent.description}") if not agent.can_migrate and agent.migration_notes: click.echo(f" Note: {agent.migration_notes}") # Show config files if result.config_files: click.echo(f"\n📄 Configuration Files ({len(result.config_files)}):") for config_file in result.config_files: click.echo(f" • {config_file.relative_to(target_path)}") # Show conflicts if result.conflicts: click.echo(f"\n⚠️ Potential Conflicts ({len(result.conflicts)}):") for agent1, agent2, reason in result.conflicts: click.echo(f" • {agent1} vs {agent2}: {reason}") # Show integration strategy if result.integration_strategy: click.echo( f"\n💡 Recommended Integration Strategy: {result.integration_strategy}" ) # Show migration recommendations if result.migration_recommendations: click.echo("\n📋 Migration Recommendations:") for recommendation in result.migration_recommendations: if recommendation.startswith(" "): click.echo(f" {recommendation}") else: click.echo(f" • {recommendation}") if not result.detected_systems: click.echo("\n✨ This project is ready for Kaizen Agentic installation!") click.echo(" Run: kaizen-agentic install ") @cli.command() @click.option("--target", "-t", default=".", help="Target directory (default: current)") @click.option( "--dry-run", "-n", is_flag=True, help="Show what would be done without executing" ) @click.option( "--auto-resolve", "-a", is_flag=True, help="Automatically resolve simple conflicts" ) def migrate(target: str, dry_run: bool, auto_resolve: bool): """Create migration plan for integrating Kaizen agents into existing project.""" from .migration import AgentMigrationPlanner, AgentMigrator target_path = Path(target).resolve() if not target_path.exists(): click.echo(f"Error: Directory not found: {target_path}") sys.exit(1) click.echo(f"Creating migration plan for: {target_path}") click.echo("=" * 50) planner = AgentMigrationPlanner() integration_plan = planner.create_integration_plan(target_path) if ( not integration_plan.migration_plans and not integration_plan.conflict_resolutions ): click.echo("✨ No migration needed - project is ready for Kaizen agents!") click.echo(" Run: kaizen-agentic install ") return # Show migration plans if integration_plan.migration_plans: click.echo(f"\n🔄 Migration Plans ({len(integration_plan.migration_plans)}):") for plan in integration_plan.migration_plans: strategy_emoji = { "replace": "🔄", "extend": "🔗", "preserve": "💾", "merge": "🔀", "remove": "🗑️", } emoji = strategy_emoji.get(plan.strategy.value, "❓") click.echo( f" {emoji} {plan.source_agent.name} ({plan.source_agent.type.value})" ) click.echo(f" Strategy: {plan.strategy.value}") if plan.target_agent: click.echo(f" Target: {plan.target_agent}") for note in plan.migration_notes: click.echo(f" 📝 {note}") # Show conflict resolutions if integration_plan.conflict_resolutions: click.echo( f"\n⚠️ Conflict Resolutions ({len(integration_plan.conflict_resolutions)}):" ) for resolution in integration_plan.conflict_resolutions: click.echo(f" • {resolution.agent1} vs {resolution.agent2}") click.echo(f" Resolution: {resolution.resolution.value}") if resolution.action_details: for key, value in resolution.action_details.items(): click.echo(f" {key}: {value}") # Show integration order if integration_plan.integration_order: click.echo("\n📋 Integration Order:") for i, agent_name in enumerate(integration_plan.integration_order, 1): click.echo(f" {i}. {agent_name}") # Show post-migration tasks if integration_plan.post_migration_tasks: click.echo("\n✅ Post-Migration Tasks:") for task in integration_plan.post_migration_tasks: click.echo(f" • {task}") # Execute migration if requested if not dry_run: click.echo("\n🚀 Executing migration...") migrator = AgentMigrator() results = migrator.execute_migration(integration_plan, dry_run=False) click.echo("\n📊 Migration Results:") for agent, result in results.items(): status_emoji = "✅" if "ERROR" not in result else "❌" click.echo(f" {status_emoji} {agent}: {result}") click.echo(f"\n💾 Backup created at: {integration_plan.backup_directory}") else: click.echo( "\n🔍 This was a dry run. Use --no-dry-run to execute the migration." ) click.echo( f" Backup would be created at: {integration_plan.backup_directory}" ) @cli.group() def extensions(): """Manage agent extensions.""" pass @extensions.command() @click.option("--target", "-t", default=".", help="Target directory (default: current)") @click.option("--base-agent", "-b", help="Filter by base agent") def list_extensions(target: str, base_agent: Optional[str]): """List installed extensions.""" from .extensions import ExtensionManager target_path = Path(target).resolve() manager = ExtensionManager(target_path) extensions_list = manager.list_extensions(base_agent) if not extensions_list: if base_agent: click.echo(f"No extensions found for agent: {base_agent}") else: click.echo("No extensions installed in this project") return click.echo(f"Extensions in {target_path}:") click.echo("=" * 40) for ext in extensions_list: status = "✅" if ext.enabled else "❌" click.echo(f"\n{status} {ext.name} (extends {ext.base_agent})") click.echo(f" Type: {ext.extension_type.value}") click.echo(f" Description: {ext.description}") click.echo(f" Version: {ext.version}") if ext.custom_commands: click.echo(f" Custom commands: {', '.join(ext.custom_commands.keys())}") @extensions.command() @click.argument("name") @click.argument("base_agent") @click.option("--target", "-t", default=".", help="Target directory (default: current)") @click.option("--description", "-d", help="Extension description") @click.option("--template", default="basic", help="Template type (basic, advanced)") def create( name: str, base_agent: str, target: str, description: Optional[str], template: str ): """Create a new agent extension.""" from .extensions import ExtensionManager, ExtensionType, create_extension_template target_path = Path(target).resolve() manager = ExtensionManager(target_path) # Generate template template_content = create_extension_template( name, base_agent, target_path, template ) # Save template to file template_dir = target_path / ".kaizen" / "extensions" / name template_dir.mkdir(parents=True, exist_ok=True) template_file = template_dir / "template.md" template_file.write_text(template_content) # Create basic extension manager.create_extension( name=name, base_agent=base_agent, extension_type=ExtensionType.FUNCTIONAL_EXTENSION, description=description or f"Custom extension for {base_agent}", ) click.echo(f"✅ Created extension: {name}") click.echo(f" Base agent: {base_agent}") click.echo(f" Template saved to: {template_file}") click.echo( f" Edit the configuration and run: kaizen-agentic extensions enable {name}" ) @extensions.command() @click.argument("name") @click.option("--target", "-t", default=".", help="Target directory (default: current)") def enable(name: str, target: str): """Enable an extension.""" from .extensions import ExtensionManager target_path = Path(target).resolve() manager = ExtensionManager(target_path) if manager.enable_extension(name): click.echo(f"✅ Enabled extension: {name}") else: click.echo(f"❌ Extension not found: {name}") @extensions.command() @click.argument("name") @click.option("--target", "-t", default=".", help="Target directory (default: current)") def disable(name: str, target: str): """Disable an extension.""" from .extensions import ExtensionManager target_path = Path(target).resolve() manager = ExtensionManager(target_path) if manager.disable_extension(name): click.echo(f"❌ Disabled extension: {name}") else: click.echo(f"❌ Extension not found: {name}") @extensions.command("remove") @click.argument("name") @click.option("--target", "-t", default=".", help="Target directory (default: current)") @click.confirmation_option(prompt="Are you sure you want to remove this extension?") def remove_extension(name: str, target: str): """Remove an extension.""" from .extensions import ExtensionManager target_path = Path(target).resolve() manager = ExtensionManager(target_path) if manager.remove_extension(name): click.echo(f"🗑️ Removed extension: {name}") else: click.echo(f"❌ Extension not found: {name}") @cli.group() def memory(): """Manage project-scoped agent memory (.kaizen/agents//memory.md).""" pass @memory.command("show") @click.argument("agent_name") @click.option("--target", "-t", default=".", help="Project root (default: current)") def memory_show(agent_name: str, target: str): """Print agent memory for the current project.""" memory_path = _memory_path(target, agent_name) if not memory_path.exists(): click.echo(f"No memory found for agent '{agent_name}'.") click.echo(f" Expected: {memory_path}") click.echo(f" Run: kaizen-agentic memory init {agent_name}") return click.echo(memory_path.read_text()) @memory.command("init") @click.argument("agent_name") @click.option("--target", "-t", default=".", help="Project root (default: current)") @click.option( "--no-metrics", is_flag=True, help="Skip scaffolding .kaizen/metrics// (default: create metrics dir)", ) def memory_init(agent_name: str, target: str, no_metrics: bool): """Scaffold an empty memory file for an agent.""" memory_path = _memory_path(target, agent_name) if memory_path.exists(): click.echo(f"Memory file already exists: {memory_path}") return memory_path.parent.mkdir(parents=True, exist_ok=True) project_name = Path(target).resolve().name content = f"""--- agent: {agent_name} project: {project_name} last_updated: {_today()} session_count: 0 --- ## Project Context ## Accumulated Findings ## What Worked ## Watch Points ## Open Threads ## Session Log """ memory_path.write_text(content) click.echo(f"Initialized memory for '{agent_name}': {memory_path}") if not no_metrics: metrics_dir = MetricsStore(Path(target), agent_name).scaffold() click.echo(f"Initialized metrics for '{agent_name}': {metrics_dir}") # For agents with protocols, note the protocol location registry = _get_registry() protocols_dir = registry.agents_dir / "protocols" / agent_name if protocols_dir.exists(): slugs = [ f.stem for f in sorted(protocols_dir.glob("*.md")) if f.name != "README.md" ] if slugs: click.echo(f" Protocols available for '{agent_name}':") for slug in slugs: click.echo(f" kaizen-agentic protocols show {agent_name} {slug}") @memory.command("brief") @click.argument("agent_name") @click.option("--target", "-t", default=".", help="Project root (default: current)") @click.option( "--raw", is_flag=True, help="Dump raw memory files without synthesis header" ) def memory_brief(agent_name: str, target: str, raw: bool): """Print a coach-synthesised orientation for an agent. Reads all agent memories in the project and formats an orientation brief for the specified agent, following the coach agent (agents/agent-coach.md) output format. Pass to a Claude session with the coach agent loaded for full LLM synthesis. """ project_root = Path(target).resolve() kaizen_dir = project_root / ".kaizen" / "agents" project_name = project_root.name # Collect all agent memories own_memory: Optional[str] = None other_memories: dict = {} if kaizen_dir.exists(): for agent_dir in sorted(kaizen_dir.iterdir()): if not agent_dir.is_dir(): continue mf = agent_dir / "memory.md" if not mf.exists(): continue if agent_dir.name == agent_name: own_memory = mf.read_text() else: other_memories[agent_dir.name] = mf.read_text() if raw: if own_memory: click.echo(f"=== {agent_name} ===\n{own_memory}") for name, content in other_memories.items(): click.echo(f"=== {name} ===\n{content}") return from datetime import date as _date today = _date.today().isoformat() sources = ([agent_name] if own_memory else []) + list(other_memories.keys()) click.echo(f"## Orientation Brief for: {agent_name}") click.echo(f"Project: {project_name}") click.echo(f"Generated: {today}") click.echo(f"Sources: {', '.join(sources) if sources else 'none'}") click.echo() metrics_store = MetricsStore(project_root, agent_name) metrics_summary = metrics_store.read_summary() if metrics_summary is None and metrics_store.executions_path.exists(): metrics_summary = metrics_store.write_summary() if not sources and not metrics_summary: click.echo("No agent memory files found in this project.") click.echo(f" Run: kaizen-agentic memory init {agent_name}") click.echo(" Then load the coach agent (agents/agent-coach.md) for synthesis.") return performance_block = performance_summary_markdown(metrics_summary or {}) if performance_block: click.echo(performance_block) # Own memory section if own_memory: click.echo("### Your Memory") click.echo(own_memory) else: click.echo( f"### Your Memory\n(none — run: kaizen-agentic memory init {agent_name})\n" ) # Cross-agent context if other_memories: click.echo("### Context From Other Agents") click.echo("(Load coach agent for full synthesis. Raw content below.)\n") for name, content in other_memories.items(): click.echo(f"--- {name} ---") click.echo(content) else: click.echo( "### Context From Other Agents\nNo other agent memories found in this project.\n" ) click.echo("---") click.echo( "Tip: Load agents/agent-coach.md in your Claude session and pass this output" ) click.echo(" for a full cross-agent synthesis and orientation brief.") @memory.command("clear") @click.argument("agent_name") @click.option("--target", "-t", default=".", help="Project root (default: current)") @click.confirmation_option( prompt="This will permanently delete the agent memory. Continue?" ) def memory_clear(agent_name: str, target: str): """Wipe agent memory for the current project.""" memory_path = _memory_path(target, agent_name) if not memory_path.exists(): click.echo(f"No memory found for agent '{agent_name}' — nothing to clear.") return memory_path.unlink() click.echo(f"Cleared memory for '{agent_name}': {memory_path}") # Remove empty parent directory if not any(memory_path.parent.iterdir()): memory_path.parent.rmdir() @cli.group() def metrics(): """Manage project-scoped agent metrics (.kaizen/metrics//).""" pass @metrics.command("record") @click.argument("agent_name") @click.option("--target", "-t", default=".", help="Project root (default: current)") @click.option( "--success", "outcome_success", is_flag=True, help="Record successful execution" ) @click.option( "--failure", "outcome_failure", is_flag=True, help="Record failed execution" ) @click.option("--time", "execution_time", type=float, help="Execution time in seconds") @click.option("--quality", type=float, help="Quality score 0.0–1.0") @click.option("--session-id", help="Optional session identifier") @click.option("--idempotency-key", help="Skip append if this key was already recorded") @click.option( "--json", "json_input", is_flag=True, help="Read full record JSON from stdin" ) @click.option( "--emit-event", is_flag=True, help="Publish kaizen.metrics.recorded to NATS (requires nats-py)", ) def metrics_record( agent_name: str, target: str, outcome_success: bool, outcome_failure: bool, execution_time: Optional[float], quality: Optional[float], session_id: Optional[str], idempotency_key: Optional[str], json_input: bool, emit_event: bool, ): """Append one execution record for an agent.""" store = MetricsStore(_project_root(target), agent_name) if json_input: payload = json.load(sys.stdin) if not isinstance(payload, dict): click.echo("Error: JSON input must be an object", err=True) sys.exit(1) else: if outcome_success and outcome_failure: click.echo("Error: use only one of --success or --failure", err=True) sys.exit(1) if not outcome_success and not outcome_failure: click.echo( "Error: specify --success or --failure (or use --json)", err=True ) sys.exit(1) payload = {"success": outcome_success} if execution_time is not None: payload["execution_time_s"] = execution_time if quality is not None: payload["quality_score"] = quality if session_id: payload["session_id"] = session_id payload = enrich_helix_correlation(payload) if store.append(payload, idempotency_key=idempotency_key): click.echo(f"Recorded metrics for '{agent_name}'") if emit_event: summary = store.read_summary() or store.write_summary() envelope = build_metrics_recorded_envelope( agent=agent_name, project=resolve_project_slug(store.project_root), summary=summary, ) try: subject = publish_metrics_recorded_event(envelope) except RuntimeError as exc: click.echo(f"Error: {exc}", err=True) sys.exit(1) click.echo( f"Emitted kaizen.metrics.recorded for '{agent_name}' → {subject}" ) else: click.echo( f"Skipped duplicate record for '{agent_name}' (idempotency key exists)" ) @metrics.command("show") @click.argument("agent_name") @click.option("--target", "-t", default=".", help="Project root (default: current)") @click.option( "--limit", "-n", default=5, show_default=True, help="Recent executions to show" ) def metrics_show(agent_name: str, target: str, limit: int): """Print metrics summary and recent executions for an agent.""" store = MetricsStore(_project_root(target), agent_name) if not store.executions_path.exists(): click.echo(f"No metrics found for agent '{agent_name}'.") click.echo(f" Expected: {store.agent_dir}") click.echo(f" Run: kaizen-agentic memory init {agent_name}") return summary = store.read_summary() or store.write_summary() click.echo(f"Metrics for '{agent_name}':") click.echo("=" * 40) click.echo(json.dumps(summary, indent=2)) records = store.read_executions() if records: click.echo("\nRecent executions:") for record in records[-limit:]: click.echo(json.dumps(record, sort_keys=True)) @metrics.command("list") @click.option("--target", "-t", default=".", help="Project root (default: current)") def metrics_list(target: str): """List agents with metrics in the current project.""" agents = MetricsStore.list_agents(_project_root(target)) if not agents: click.echo("No agent metrics found in this project.") click.echo(" Run: kaizen-agentic memory init ") return click.echo("Agents with metrics:") for name in agents: store = MetricsStore(_project_root(target), name) summary = store.read_summary() count = summary["execution_count"] if summary else len(store.read_executions()) click.echo(f" • {name} ({count} executions)") @metrics.command("optimize") @click.argument("agent_name", required=False) @click.option("--target", "-t", default=".", help="Project root (default: current)") @click.option( "--min-samples", default=MIN_SAMPLES_FOR_RECOMMENDATIONS, show_default=True, help="Minimum execution records required for recommendations", ) def metrics_optimize(agent_name: Optional[str], target: str, min_samples: int): """Run optimizer analysis on project metrics and write recommendations.""" project_root = _project_root(target) agents = [agent_name] if agent_name else MetricsStore.list_agents(project_root) if not agents: click.echo("No agent metrics found to optimize.") click.echo( " Record executions with: kaizen-agentic metrics record --success" ) return optimizer_store = OptimizerStore(project_root) combined_reports = [] for name in agents: store = MetricsStore(project_root, name) records = store.read_executions() loop = OptimizationLoop.from_metrics_store(store, min_samples=1) report = loop.get_optimization_report_json() report["sample_threshold"] = min_samples report["meets_sample_threshold"] = len(records) >= min_samples combined_reports.append(report) click.echo(f"Agent: {name}") click.echo("=" * 40) click.echo(json.dumps(report, indent=2)) if len(records) >= min_samples: optimizer_store.append_recommendations( name, report["recommendations"], metrics_count=len(records), ) else: click.echo( f" Note: {len(records)} record(s) — " f"need {min_samples} for actionable recommendations" ) click.echo() analysis_payload = { "project": project_root.name, "optimized_at": _today(), "min_samples": min_samples, "agents": combined_reports, } analysis_path = optimizer_store.write_analysis(analysis_payload) click.echo(f"Wrote optimizer analysis: {analysis_path}") @metrics.command("correlate") @click.argument("session_uid") @click.option( "--store-db", envvar="HELIX_STORE_DB", help="Helix Forge session-memory SQLite database path", ) def metrics_correlate(session_uid: str, store_db: Optional[str]): """Look up Helix Forge digest summary for a session UID (read-only).""" adapter = HelixCorrelationAdapter( store_db=Path(store_db).resolve() if store_db else None ) if adapter.store_db is None: adapter = HelixCorrelationAdapter.from_env() summary = adapter.lookup(session_uid) click.echo(json.dumps(summary, indent=2, sort_keys=True)) @metrics.command("publish") @click.option("--target", "-t", default=".", help="Project root (default: current)") @click.option( "--api-url", default=default_api_url, show_default=True, help="artifact-store API base URL (ARTIFACTSTORE_API_URL)", ) @click.option( "--token", default=default_api_token, help="artifact-store bearer token (ARTIFACTSTORE_API_TOKEN)", ) @click.option( "--subject", help="Package subject (default: project directory name)", ) @click.option( "--retention-class", default="raw-evidence", show_default=True, help="artifact-store retention class", ) def metrics_publish( target: str, api_url: str, token: str, subject: Optional[str], retention_class: str, ): """Publish optimizer evidence to artifact-store (optional integration).""" project_root = _project_root(target) if not token: click.echo( "Error: artifact-store token required. Set ARTIFACTSTORE_API_TOKEN or --token.", err=True, ) sys.exit(1) try: result = publish_optimizer_evidence( project_root, api_url=api_url, token=token, subject=subject, retention_class=retention_class, ) except FileNotFoundError as exc: click.echo(f"Error: {exc}", err=True) sys.exit(1) except RuntimeError as exc: click.echo(f"Error: {exc}", err=True) sys.exit(1) click.echo(f"Published optimizer evidence package: {result.package_id}") click.echo(f" Files uploaded: {result.files_uploaded}") click.echo(f" Retention class: {result.retention_class}") if result.manifest_digest: click.echo(f" Manifest digest: {result.manifest_digest}") @metrics.command("export") @click.argument("agent_name") @click.option("--target", "-t", default=".", help="Project root (default: current)") def metrics_export(agent_name: str, target: str): """Dump executions.jsonl for an agent to stdout.""" store = MetricsStore(_project_root(target), agent_name) if not store.executions_path.exists(): click.echo(f"No metrics found for agent '{agent_name}'.", err=True) sys.exit(1) click.echo(store.executions_path.read_text(encoding="utf-8"), nl=False) @cli.group() def protocols(): """Browse agent protocol runbooks (agents/protocols//.md).""" pass @protocols.command("list") @click.argument("agent_name", required=False) def protocols_list(agent_name: Optional[str]): """List available protocols, optionally filtered by agent.""" registry = _get_registry() protocols_dir = registry.agents_dir / "protocols" if not protocols_dir.exists(): click.echo("No protocols directory found.") return found = [] agent_dirs = ( [protocols_dir / agent_name] if agent_name else sorted(protocols_dir.iterdir()) ) for agent_dir in agent_dirs: if not agent_dir.is_dir() or agent_dir.name == "__pycache__": continue for protocol_file in sorted(agent_dir.glob("*.md")): if protocol_file.name == "README.md": continue # Try to read title from frontmatter title = protocol_file.stem.replace("-", " ").title() try: content = protocol_file.read_text() for line in content.splitlines(): if line.startswith("title:"): title = line.split(":", 1)[1].strip().strip('"') break except Exception: pass found.append((agent_dir.name, protocol_file.stem, title)) if not found: if agent_name: click.echo(f"No protocols found for agent '{agent_name}'.") else: click.echo("No protocols found.") return click.echo("Available Protocols:") click.echo("=" * 40) current_agent = None for agent, slug, title in found: if agent != current_agent: click.echo(f"\n {agent}:") current_agent = agent click.echo(f" • {slug}: {title}") @protocols.command("show") @click.argument("agent_name") @click.argument("slug") def protocols_show(agent_name: str, slug: str): """Print a protocol runbook.""" registry = _get_registry() protocol_path = registry.agents_dir / "protocols" / agent_name / f"{slug}.md" if not protocol_path.exists(): click.echo(f"Protocol not found: {agent_name}/{slug}") click.echo(f" Expected: {protocol_path}") click.echo(f" Run: kaizen-agentic protocols list {agent_name}") return click.echo(protocol_path.read_text()) @cli.group() def schedule(): """Prepare and validate scheduled agent runs (.kaizen/schedule.yml, ADR-005). kaizen-agentic does not run cron schedules or invoke Claude. activity-core fires the cron and creates a task per (repo, agent); a coding-agent session runs `schedule prepare ` to assemble orientation, then executes the agent instructions. """ pass @schedule.command("validate") @click.option("--target", "-t", default=".", help="Project root (default: current)") def schedule_validate(target: str): """Validate .kaizen/schedule.yml against the ADR-005 schema.""" path = schedule_path(_project_root(target)) try: parsed = load_schedule(path) except ScheduleError as exc: click.echo(f"❌ {exc}", err=True) click.echo(" Run: kaizen-agentic schedule init", err=True) sys.exit(1) known_agents = _get_registry().agent_names() errors = validate_schedule(parsed, known_agents=known_agents) if errors: click.echo(f"❌ Schedule validation failed ({path}):") for error in errors: click.echo(f" • {error}") sys.exit(1) enabled = parsed.enabled_entries() click.echo(f"✅ Schedule valid: {path}") click.echo(f" {len(parsed.entries)} agent(s), {len(enabled)} enabled") @schedule.command("init") @click.option("--target", "-t", default=".", help="Project root (default: current)") @click.option( "--timezone", default="Europe/Berlin", show_default=True, help="Schedule timezone" ) @click.option("--force", is_flag=True, help="Overwrite an existing schedule.yml") @click.option( "--engagement", default=None, help="Customer engagement slug (bootstrap schedule for target repos)", ) @click.option( "--agents", default=None, help="Comma-separated agents for --engagement (default: coach,optimization)", ) @click.option( "--bootstrap-cadence", type=click.Choice(["hourly", "daily", "weekly"]), default="hourly", show_default=True, help="Cadence preset for --engagement (hourly uses daily enum + hourly cron)", ) def schedule_init( target: str, timezone: str, force: bool, engagement: Optional[str], agents: Optional[str], bootstrap_cadence: str, ): """Scaffold .kaizen/schedule.yml (weekly default or engagement bootstrap).""" if (agents or bootstrap_cadence != "hourly") and not engagement: click.echo( "Error: --agents and --bootstrap-cadence require --engagement", err=True, ) sys.exit(1) path = schedule_path(_project_root(target)) if path.exists() and not force: click.echo(f"Schedule already exists: {path}") click.echo(" Use --force to overwrite.") return if engagement: agent_list = ( [item.strip() for item in agents.split(",") if item.strip()] if agents else None ) known_agents = _get_registry().agent_names() if agent_list: unknown = [name for name in agent_list if name not in known_agents] if unknown: click.echo( f"Error: unknown agent(s) for engagement schedule: {', '.join(unknown)}", err=True, ) sys.exit(1) try: yaml_text = engagement_schedule_yaml( engagement, agents=agent_list, bootstrap_cadence=bootstrap_cadence, timezone=timezone, ) except ScheduleError as exc: click.echo(f"Error: {exc}", err=True) sys.exit(1) else: yaml_text = default_schedule_yaml(timezone=timezone) path.parent.mkdir(parents=True, exist_ok=True) path.write_text(yaml_text, encoding="utf-8") click.echo(f"Initialized schedule: {path}") if engagement: click.echo( f" Engagement: {engagement} (bootstrap-cadence={bootstrap_cadence})" ) click.echo(" Validate with: kaizen-agentic schedule validate") @schedule.command("promote") @click.option( "--engagement-repo", "-e", required=True, type=click.Path(exists=True, file_okay=False, path_type=Path), help="Customer engagement repo (e.g. coulomb-loop)", ) @click.option( "--engagement", default="coulomb-loop", show_default=True, help="Engagement slug written into fleet schedule.yml headers", ) @click.option( "--to-phase", type=click.Choice(["stabilize", "operate"]), default=None, help="Target phase (default: next phase after current)", ) @click.option("--loop", default=None, help="Promote a single loop id only") @click.option("--dry-run", is_flag=True, help="Print planned actions without writing") @click.option("--skip-cadence", is_flag=True, help="Skip loops/*/cadence.yml updates") @click.option( "--skip-definitions", is_flag=True, help="Skip activity-definitions transforms" ) @click.option("--skip-fleet", is_flag=True, help="Skip .kaizen/schedule.yml on roster") @click.option("--skip-sync", is_flag=True, help="Skip activity-core definition sync") @click.option( "--fleet-only", is_flag=True, help="Only update fleet schedule.yml (+ sync unless --skip-sync)", ) @click.option( "--activity-core", type=click.Path(exists=True, file_okay=False, path_type=Path), default=None, help="activity-core repo root (or ACTIVITY_CORE_ROOT env)", ) def schedule_promote( engagement_repo: Path, engagement: str, to_phase: Optional[str], loop: Optional[str], dry_run: bool, skip_cadence: bool, skip_definitions: bool, skip_fleet: bool, skip_sync: bool, fleet_only: bool, activity_core: Optional[Path], ): """Atomically promote cadence across cadence.yml, definitions, fleet, and sync.""" if fleet_only: skip_cadence = True skip_definitions = True if to_phase is None: to_phase = "operate" result = promote_engagement( engagement_repo, engagement_slug=engagement, to_phase=to_phase, loop=loop, dry_run=dry_run, skip_cadence=skip_cadence, skip_definitions=skip_definitions, skip_fleet=skip_fleet, skip_sync=skip_sync, activity_core_root=activity_core, ) if dry_run: click.echo("Dry run — planned actions:") else: click.echo("Promotion complete — actions:") by_layer: dict[str, list[str]] = {} for action in result.actions: by_layer.setdefault(action.layer, []).append(action.description) for layer in ("cadence", "definitions", "fleet", "sync"): items = by_layer.get(layer) if items: click.echo(f"\n[{layer}]") for item in items: click.echo(f" • {item}") if result.errors: click.echo("\nWarnings / errors:", err=True) for err in result.errors: click.echo(f" ! {err}", err=True) if not result.actions: sys.exit(1) if not result.actions and not result.errors: click.echo("Nothing to do — layers already aligned.") @schedule.command("list") @click.option("--target", "-t", default=".", help="Project root (default: current)") @click.option("--all", "show_all", is_flag=True, help="Include disabled entries") def schedule_list(target: str, show_all: bool): """Show enabled schedule entries from .kaizen/schedule.yml.""" path = schedule_path(_project_root(target)) try: parsed = load_schedule(path) except ScheduleError as exc: click.echo(f"No schedule found: {exc}") click.echo(" Run: kaizen-agentic schedule init") return entries = parsed.entries if show_all else parsed.enabled_entries() if not entries: click.echo("No enabled schedule entries (use --all to see disabled).") return click.echo(f"Scheduled agents ({path}):") if parsed.timezone: click.echo(f" Timezone: {parsed.timezone}") for entry in entries: flag = "✅" if entry.enabled else "⏸ " cron = f" cron={entry.cron}" if entry.cron else "" click.echo(f" {flag} {entry.agent}: {entry.cadence}{cron}") @schedule.command("prepare") @click.argument("agent_name") @click.option("--target", "-t", default=".", help="Project root (default: current)") @click.option( "--format", "output_format", type=click.Choice(["markdown", "json"]), default="markdown", show_default=True, help="Output format for the orientation bundle", ) def schedule_prepare(agent_name: str, target: str, output_format: str): """Assemble an orientation bundle for a scheduled agent run. Bundles the agent prompt, project memory, metrics summary, and repo pointers into a single payload. Works offline from local `.kaizen/` state; no State Hub required. Pass the output to a coding-agent session. """ bundle = _build_prepare_bundle(agent_name, _project_root(target)) if output_format == "json": click.echo(json.dumps(bundle, indent=2)) return click.echo(_render_prepare_markdown(bundle)) def _build_prepare_bundle(agent_name: str, project_root: Path) -> dict: """Collect the orientation bundle pieces for `schedule prepare`.""" registry = _get_registry() agent_path = registry.get_agent_path(agent_name) agent_prompt = agent_path.read_text(encoding="utf-8") if agent_path else None memory_path = project_root / ".kaizen" / "agents" / agent_name / "memory.md" memory = memory_path.read_text(encoding="utf-8") if memory_path.exists() else None metrics_store = MetricsStore(project_root, agent_name) metrics_summary = metrics_store.read_summary() if metrics_summary is None and metrics_store.executions_path.exists(): metrics_summary = metrics_store.write_summary() pointers = {} scope_path = project_root / "SCOPE.md" if scope_path.exists(): pointers["scope"] = str(scope_path) workplans_path = project_root / "workplans" if workplans_path.is_dir(): pointers["workplans"] = str(workplans_path) return { "agent": agent_name, "project": project_root.name, "generated": _today(), "agent_prompt": agent_prompt, "agent_prompt_found": agent_prompt is not None, "memory": memory, "metrics_summary": metrics_summary, "pointers": pointers, "session_close": [ f"kaizen-agentic metrics record {agent_name} --success " f"--time --quality <0-1>", f"Update memory: kaizen-agentic memory show {agent_name}", ], } def _render_prepare_markdown(bundle: dict) -> str: agent = bundle["agent"] lines = [ f"# Scheduled Run Orientation: {agent}", f"Project: {bundle['project']}", f"Generated: {bundle['generated']}", "", ] summary = bundle.get("metrics_summary") block = performance_summary_markdown(summary or {}) if block: lines.append(block) lines.append("## Agent Prompt") if bundle["agent_prompt_found"]: lines.append(bundle["agent_prompt"]) else: lines.append( f"(agent '{agent}' not found in registry — " f"run: kaizen-agentic list)" ) lines.append("") lines.append("## Project Memory") if bundle.get("memory"): lines.append(bundle["memory"]) else: lines.append(f"(none — run: kaizen-agentic memory init {agent})") lines.append("") pointers = bundle.get("pointers") or {} lines.append("## Repo Pointers") if pointers: for label, path in pointers.items(): lines.append(f"- {label}: {path}") else: lines.append("- (no SCOPE.md / workplans/ found)") lines.append("") lines.append("## Session Close") for cmd in bundle["session_close"]: lines.append(f"- `{cmd}`") return "\n".join(lines) @cli.command("create-agent") @click.argument("name") @click.option( "--category", "-c", type=click.Choice([c.value for c in AgentCategory]), help="Agent category (prompted if omitted)", ) @click.option("--description", "-d", help="One-line description (prompted if omitted)") @click.option( "--memory", type=click.Choice(["enabled", "disabled"]), default="enabled", show_default=True, help="Project memory support", ) @click.option("--model", help="Optional model hint (e.g. claude-opus-4-8)") @click.option( "--target", "-t", default=".", help="Project root containing agents/ (default: current)", ) @click.option("--force", is_flag=True, help="Overwrite an existing agent file") def create_agent( name: str, category: Optional[str], description: Optional[str], memory: str, model: Optional[str], target: str, force: bool, ): """Scaffold a new schema-valid agent definition (agents/agent-.md).""" if not category: category = click.prompt( "Category", type=click.Choice([c.value for c in AgentCategory]) ) if not description: description = click.prompt("One-line description") agents_dir = _project_root(target) / "agents" agents_dir.mkdir(parents=True, exist_ok=True) agent_path = agents_dir / f"agent-{name}.md" if agent_path.exists() and not force: click.echo(f"Agent already exists: {agent_path}") click.echo(" Use --force to overwrite.") sys.exit(1) frontmatter = [ "---", f"name: {name}", f"description: {description}", f"category: {category}", f"memory: {memory}", ] if model: frontmatter.append(f"model: {model}") frontmatter.append("---") title = name.replace("-", " ").title() body = f""" # {title} Agent ## Role ## When to Use ## Instructions ## Output """ agent_path.write_text("\n".join(frontmatter) + "\n" + body) # Validate the scaffold passes the frontmatter schema (T03). errors = ( AgentRegistry(agents_dir).validate_frontmatter_schema().get(agent_path.name, []) ) if errors: click.echo(f"⚠️ Created {agent_path} but it has schema issues:") for error in errors: click.echo(f" ❌ {error}") sys.exit(1) click.echo(f"✅ Created agent: {agent_path}") click.echo(" Edit the skeleton, then validate: kaizen-agentic validate") click.echo(" Before release: make agents-sync-package") @cli.group() def docs(): """Generate project documentation from agent metadata.""" pass @docs.command("generate") @click.option("--target", "-t", default=".", help="Project root (default: current)") @click.option( "--check", is_flag=True, help="Exit non-zero if CLAUDE.md would change (do not write)", ) def docs_generate(target: str, check: bool): """Refresh the '## Installed Agents' section of CLAUDE.md (idempotent).""" target_path = _project_root(target) # Resolve agents from the target project's own agents/ when present, so # `docs generate --target other/project` documents that project's agents # rather than the registry resolved from the current directory. local_agents = target_path / "agents" registry = AgentRegistry(local_agents) if local_agents.exists() else _get_registry() installer = AgentInstaller(registry) installed = installer.list_installed_agents(target_path) if not installed: click.echo("No agents installed in this project — nothing to document.") click.echo(" Run: kaizen-agentic install ") return agents = [a for a in (registry.get_agent(n) for n in installed) if a is not None] section = render_installed_agents_section(agents) claude_md = target_path / "CLAUDE.md" current = claude_md.read_text() if claude_md.exists() else "" updated = upsert_installed_agents_section(current, section) if check: if updated != current: click.echo(f"❌ CLAUDE.md is out of date: {claude_md}") click.echo(" Run: kaizen-agentic docs generate") sys.exit(1) click.echo(f"✅ CLAUDE.md is up to date ({len(agents)} agents)") return if updated == current: click.echo(f"CLAUDE.md already up to date ({len(agents)} agents)") return claude_md.write_text(updated) click.echo(f"Updated Installed Agents section ({len(agents)} agents): {claude_md}") def _project_root(target: str) -> Path: return Path(target).resolve() def _memory_path(target: str, agent_name: str) -> Path: return _project_root(target) / ".kaizen" / "agents" / agent_name / "memory.md" def _today() -> str: from datetime import date return date.today().isoformat() def _get_registry() -> AgentRegistry: """Get the agent registry.""" # Try to find agents directory current_dir = Path.cwd() # Check if we're in a kaizen-agentic project if (current_dir / "agents").exists(): agents_dir = current_dir / "agents" elif (current_dir / "src" / "kaizen_agentic").exists(): # We're in the kaizen-agentic repo itself agents_dir = current_dir / "agents" else: # Try to find installed package try: import kaizen_agentic package_dir = Path(kaizen_agentic.__file__).parent.parent.parent agents_dir = package_dir / "agents" if not agents_dir.exists(): # Try relative to package agents_dir = Path(kaizen_agentic.__file__).parent / "data" / "agents" except ImportError: click.echo("Error: kaizen-agentic package is not installed.", err=True) click.echo(" Fix: pip install -e . (from repo root)", err=True) click.echo(" Or: run from a project with an agents/ directory", err=True) sys.exit(1) if not agents_dir.exists(): click.echo(f"Error: agents directory not found: {agents_dir}", err=True) click.echo( " Fix: cd into a kaizen-agentic checkout or a project with agents/", err=True, ) click.echo( " Or: kaizen-agentic install