Files
kaizen-agentic/src/kaizen_agentic/cli.py
Bernd Worsch 53dfd55916 feat(protocols): add protocols artifact convention, sys-medic protocol + CLI (WP-0002 T17-T24)
- ADR-003: protocols artifact convention (location, structure, lifecycle)
- agents/protocols/README.md: directory-level index and usage guide
- agents/protocols/sys-medic/k3s-node-health-assessment.md: full structured
  k3s node health assessment protocol (8 steps: OS baseline, process hygiene,
  memory, CPU, disk, network, k3s node state, runtime services)
- agent-sys-medic.md: add memory: enabled frontmatter, session-start/close
  protocols, node-profile memory template extensions, protocol reference in
  Default Task
- cli.py: add protocols command group (list, show); extend memory init to hint
  protocol commands for agents that have protocols

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-18 23:48:09 +00:00

1050 lines
37 KiB
Python

"""Command-line interface for Kaizen Agentic agent management."""
import sys
import subprocess
import contextlib
import io
import click
from pathlib import Path
from typing import List, Optional
from .registry import AgentRegistry, AgentCategory
from .installer import AgentInstaller, ProjectInitializer, InstallationConfig
def safe_cli_wrapper():
"""
Wrapper to handle Click errors gracefully and provide clean user experience.
WORKAROUND FOR CLICK LIBRARY ISSUE:
===================================
This function addresses a spurious error message that appears when using Click
with certain argument configurations. The issue manifests as:
"Error: Got unexpected extra argument (agent-name)"
Despite this error message, the underlying CLI function executes correctly.
This appears to be a Click library display/buffering issue where error handling
interferes with normal execution flow.
AFFECTED COMMANDS: install, update
ISSUE DETAILS:
- Affects: Click library (tested with Click 8.x series)
- Symptom: Misleading error messages during successful command execution
- Impact: Confusing user experience despite functional CLI
- Root cause: Click's argument validation timing/display mechanism
WORKAROUND APPROACH:
- Capture stdout/stderr streams during CLI execution
- Detect spurious error patterns specific to known issues
- Filter misleading messages while preserving legitimate errors
- Provide clean output for successful operations
TODO: REVISIT WHEN CLICK UPDATES
================================
Monitor Click library releases and test removal of this workaround:
- Test with Click 9.x+ releases
- Remove this wrapper if the underlying issue is resolved
- Update entry point back to direct CLI function: kaizen_agentic.cli:cli
TESTING:
This workaround is covered by tests in test_cli_error_handling.py
"""
# Capture stderr to intercept spurious error messages
stderr_capture = io.StringIO()
stdout_capture = io.StringIO()
# Check if this is an install or update command before processing
affected_commands = len(sys.argv) >= 2 and sys.argv[1] in ["install", "update"]
try:
with contextlib.redirect_stderr(stderr_capture), contextlib.redirect_stdout(stdout_capture):
cli(standalone_mode=False)
except click.UsageError as e:
if affected_commands and "Got unexpected extra argument" in str(e):
# This is the spurious error for install/update commands
# Check if we got some stdout output indicating success
captured_stdout = stdout_capture.getvalue()
success_indicators = ["Installing agents to:", "Updating all installed agents:"]
if any(indicator in captured_stdout for indicator in success_indicators):
# The command was actually executing, show the real output
print(captured_stdout, end='')
sys.exit(0)
else:
# This might be a real error
print(f"Error: {e}")
sys.exit(2)
else:
# Legitimate error for other commands
print(f"Error: {e}")
sys.exit(2)
except SystemExit as e:
# Show captured output and handle exits
captured_stdout = stdout_capture.getvalue()
captured_stderr = stderr_capture.getvalue()
if e.code == 0:
# Successful exit
print(captured_stdout, end='')
else:
# Error exit - show both stdout and stderr unless it's the spurious error
if affected_commands and "Got unexpected extra argument" in captured_stderr:
# Show only stdout for install/update commands with spurious errors
print(captured_stdout, end='')
success_indicators = ["Installing agents to:", "Updating all installed agents:"]
if any(indicator in captured_stdout for indicator in success_indicators):
sys.exit(0) # Override error exit if we see success indicators
else:
# Show everything for other commands
print(captured_stdout, end='')
print(captured_stderr, end='', file=sys.stderr)
sys.exit(e.code)
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
# If we get here, show captured output
print(stdout_capture.getvalue(), end='')
stderr_content = stderr_capture.getvalue()
if stderr_content and not (affected_commands and "Got unexpected extra argument" in stderr_content):
print(stderr_content, end='', file=sys.stderr)
@click.group()
@click.version_option()
def cli():
"""Kaizen Agentic - AI agent development framework."""
pass
@cli.command()
@click.option(
"--category",
type=click.Choice([c.value for c in AgentCategory]),
help="Filter by category",
)
@click.option("--verbose", "-v", is_flag=True, help="Show detailed information")
def list(category: Optional[str], verbose: bool):
"""List available agents."""
registry = _get_registry()
if category:
cat_enum = AgentCategory(category)
agents = registry.list_agents(cat_enum)
click.echo(f"\n{category.replace('-', ' ').title()} Agents:")
click.echo("=" * 40)
else:
if verbose:
categories = registry.get_categories()
for cat, agents in categories.items():
click.echo(
f"\n{cat.value.replace('-', ' ').title()} ({len(agents)} agents):"
)
click.echo("=" * 50)
for agent in agents:
click.echo(f"{agent.name}: {agent.description}")
return
else:
agents = registry.list_agents()
click.echo(f"\nAvailable Agents ({len(agents)} total):")
click.echo("=" * 40)
for agent in agents:
if verbose:
click.echo(f"\n{agent.name}")
click.echo(f" Description: {agent.description}")
click.echo(f" Category: {agent.category.value}")
if agent.dependencies:
click.echo(f" Dependencies: {', '.join(agent.dependencies)}")
else:
click.echo(f"{agent.name}: {agent.description}")
@cli.command()
@click.argument("agents", nargs=-1, required=True)
@click.option("--target", "-t", default=".", help="Target directory (default: current)")
@click.option("--no-backup", is_flag=True, help="Skip creating backup")
@click.option("--no-docs", is_flag=True, help="Skip updating documentation")
def install(agents: List[str], target: str, no_backup: bool, no_docs: bool):
"""
Install agents into a project.
NOTE: This command is affected by a Click library issue that causes spurious
"Got unexpected extra argument" messages. This is handled by safe_cli_wrapper().
See safe_cli_wrapper() docstring for details and removal timeline.
"""
try:
registry = _get_registry()
installer = AgentInstaller(registry)
target_path = Path(target).resolve()
config = InstallationConfig(
target_dir=target_path,
claude_config_path=target_path / "CLAUDE.md",
makefile_path=target_path / "Makefile",
update_docs=not no_docs,
create_backup=not no_backup,
)
click.echo(f"Installing agents to: {target_path}")
# Resolve dependencies with fallback
try:
resolved = registry.resolve_dependencies(list(agents))
if len(resolved) > len(agents):
additional = [a for a in resolved if a not in agents]
click.echo(f"Including dependencies: {', '.join(additional)}")
except Exception:
# Fall back to original agent list if dependency resolution fails
resolved = list(agents)
results = installer.install_agents(resolved, config)
# Display results
success_count = 0
for agent_name, status in results.items():
if status == "INSTALLED":
click.echo(f"{agent_name}")
success_count += 1
else:
click.echo(f"{agent_name}: {status}")
click.echo(f"\nInstalled {success_count}/{len(results)} agents successfully")
# Force successful exit to override any Click error handling
sys.exit(0)
except Exception as e:
click.echo(f"Installation failed: {e}")
sys.exit(1)
@cli.command()
@click.option("--target", "-t", default=".", help="Target directory (default: current)")
@click.argument("agents", nargs=-1)
def update(target: str, agents: List[str]):
"""
Update installed agents.
NOTE: This command is affected by a Click library issue that causes spurious
"Got unexpected extra argument" messages. This is handled by safe_cli_wrapper().
See safe_cli_wrapper() docstring for details and removal timeline.
"""
registry = _get_registry()
installer = AgentInstaller(registry)
target_path = Path(target).resolve()
if not agents:
agents = installer.list_installed_agents(target_path)
if not agents:
click.echo("No agents installed in this project")
return
click.echo(f"Updating all installed agents: {', '.join(agents)}")
else:
click.echo(f"Updating specific agents: {', '.join(agents)}")
results = installer.update_agents(target_path, list(agents))
# Display results
success_count = 0
for agent_name, status in results.items():
if status == "INSTALLED":
click.echo(f"{agent_name}")
success_count += 1
else:
click.echo(f"{agent_name}: {status}")
click.echo(f"\nUpdated {success_count}/{len(results)} agents successfully")
@cli.command()
@click.argument("agents", nargs=-1, required=True)
@click.option("--target", "-t", default=".", help="Target directory (default: current)")
def remove(agents: List[str], target: str):
"""Remove agents from a project."""
registry = _get_registry()
installer = AgentInstaller(registry)
target_path = Path(target).resolve()
click.echo(f"Removing agents from: {target_path}")
results = installer.remove_agents(list(agents), target_path)
# Display results
for agent_name, status in results.items():
if status == "REMOVED":
click.echo(f"{agent_name}")
elif status == "NOT_FOUND":
click.echo(f" ⚠️ {agent_name}: Not installed")
else:
click.echo(f"{agent_name}: {status}")
@cli.command()
@click.argument("project_name")
@click.option(
"--template",
"-t",
default="python-basic",
help="Project template (python-basic, python-web, python-cli, python-data)",
)
@click.option("--agents", "-a", help="Comma-separated list of agents to install")
@click.option(
"--parent-dir", default=".", help="Parent directory for project (default: current)"
)
def init(project_name: str, template: str, agents: Optional[str], parent_dir: str):
"""Initialize a new project with agents."""
registry = _get_registry()
initializer = ProjectInitializer(registry)
project_path = Path(parent_dir) / project_name
if project_path.exists():
click.echo(f"Error: Directory {project_path} already exists")
sys.exit(1)
# Parse agent list
agent_list = None
if agents:
agent_list = [a.strip() for a in agents.split(",")]
click.echo(f"Initializing project: {project_name}")
click.echo(f"Template: {template}")
# Show available templates
templates = registry.get_agent_templates()
if template not in templates:
click.echo(f"Error: Unknown template '{template}'")
click.echo(f"Available templates: {', '.join(templates.keys())}")
sys.exit(1)
if not agent_list:
agent_list = templates[template]
click.echo(f"Using template agents: {', '.join(agent_list)}")
else:
click.echo(f"Using custom agents: {', '.join(agent_list)}")
results = initializer.init_project(project_path, template, agent_list, project_name)
# Display results
success_count = sum(1 for status in results.values() if status == "INSTALLED")
click.echo(f"\nProject initialized with {success_count}/{len(results)} agents")
click.echo("\nNext steps:")
click.echo(f" cd {project_name}")
click.echo(" make setup-complete # Set up development environment")
click.echo(" make test # Run tests")
@cli.command()
@click.option("--target", "-t", default=".", help="Target directory (default: current)")
def validate(target: str):
"""Validate agents in a project."""
registry = _get_registry()
installer = AgentInstaller(registry)
target_path = Path(target).resolve()
# Validate registry agents
click.echo("Validating agent registry...")
registry_errors = registry.validate_agents()
if registry_errors:
click.echo("Registry validation errors:")
for agent, errors in registry_errors.items():
click.echo(f" {agent}:")
for error in errors:
click.echo(f"{error}")
else:
click.echo(" ✅ Registry validation passed")
# Validate installed agents
click.echo(f"\nValidating installed agents in: {target_path}")
install_errors = installer.validate_installation(target_path)
if install_errors:
click.echo("Installation validation errors:")
for agent, errors in install_errors.items():
click.echo(f" {agent}:")
for error in errors:
click.echo(f"{error}")
else:
click.echo(" ✅ Installation validation passed")
# Show installed agents
installed = installer.list_installed_agents(target_path)
if installed:
click.echo(f"\nInstalled agents ({len(installed)}):")
for agent in installed:
click.echo(f"{agent}")
else:
click.echo("\nNo agents installed in this project")
@cli.command()
def templates():
"""List available project templates."""
registry = _get_registry()
templates = registry.get_agent_templates()
click.echo("Available Project Templates:")
click.echo("=" * 40)
for template_name, agent_list in templates.items():
click.echo(f"\n{template_name}:")
click.echo(f" Agents ({len(agent_list)}): {', '.join(agent_list)}")
@cli.command()
@click.option("--target", "-t", default=".", help="Target directory (default: current)")
def status(target: str):
"""Show status of agents in a project."""
registry = _get_registry()
installer = AgentInstaller(registry)
target_path = Path(target).resolve()
click.echo(f"Project: {target_path.name}")
click.echo(f"Path: {target_path}")
click.echo("=" * 50)
# Check if agents directory exists
agents_dir = target_path / "agents"
if not agents_dir.exists():
click.echo("❌ No agents directory found")
click.echo("\nRun 'kaizen-agentic init' to initialize a new project")
click.echo("or 'kaizen-agentic install <agents>' to add agents")
return
# List installed agents
installed = installer.list_installed_agents(target_path)
if installed:
click.echo(f"✅ Agents installed ({len(installed)}):")
# Group by category
categories = {}
for agent_name in installed:
agent = registry.get_agent(agent_name)
if agent:
cat = agent.category.value
if cat not in categories:
categories[cat] = []
categories[cat].append(agent_name)
else:
if "unknown" not in categories:
categories["unknown"] = []
categories["unknown"].append(agent_name)
for category, agents in categories.items():
click.echo(f"\n {category.replace('-', ' ').title()}:")
for agent in agents:
click.echo(f"{agent}")
else:
click.echo("❌ No agents installed")
# Check for configuration files
click.echo("\nConfiguration files:")
config_files = ["CLAUDE.md", "Makefile", "pyproject.toml", ".gitignore"]
for config_file in config_files:
file_path = target_path / config_file
if file_path.exists():
click.echo(f"{config_file}")
else:
click.echo(f"{config_file}")
@cli.command()
@click.option("--target", "-t", default=".", help="Target directory (default: current)")
@click.option("--detailed", "-d", is_flag=True, help="Show detailed analysis")
def detect(target: str, detailed: bool):
"""Detect existing agent systems in a project."""
from .detection import AgentSystemDetector
target_path = Path(target).resolve()
if not target_path.exists():
click.echo(f"Error: Directory not found: {target_path}")
sys.exit(1)
click.echo(f"Detecting agent systems in: {target_path}")
click.echo("=" * 50)
detector = AgentSystemDetector()
result = detector.detect_agent_systems(target_path)
# Show detected systems
if result.detected_systems:
click.echo(f"\n🔍 Detected Agent Systems ({len(result.detected_systems)}):")
for system in result.detected_systems:
click.echo(f"{system.value}")
else:
click.echo("\n🔍 No existing agent systems detected")
# Show detected agents
if result.agents:
click.echo(f"\n🤖 Detected Agents ({len(result.agents)}):")
for agent in result.agents:
status = "" if agent.can_migrate else "⚠️"
click.echo(f" {status} {agent.name} ({agent.type.value})")
if detailed and agent.description:
click.echo(f" {agent.description}")
if not agent.can_migrate and agent.migration_notes:
click.echo(f" Note: {agent.migration_notes}")
# Show config files
if result.config_files:
click.echo(f"\n📄 Configuration Files ({len(result.config_files)}):")
for config_file in result.config_files:
click.echo(f"{config_file.relative_to(target_path)}")
# Show conflicts
if result.conflicts:
click.echo(f"\n⚠️ Potential Conflicts ({len(result.conflicts)}):")
for agent1, agent2, reason in result.conflicts:
click.echo(f"{agent1} vs {agent2}: {reason}")
# Show integration strategy
if result.integration_strategy:
click.echo(
f"\n💡 Recommended Integration Strategy: {result.integration_strategy}"
)
# Show migration recommendations
if result.migration_recommendations:
click.echo("\n📋 Migration Recommendations:")
for recommendation in result.migration_recommendations:
if recommendation.startswith(" "):
click.echo(f" {recommendation}")
else:
click.echo(f"{recommendation}")
if not result.detected_systems:
click.echo("\n✨ This project is ready for Kaizen Agentic installation!")
click.echo(" Run: kaizen-agentic install <agent-names>")
@cli.command()
@click.option("--target", "-t", default=".", help="Target directory (default: current)")
@click.option(
"--dry-run", "-n", is_flag=True, help="Show what would be done without executing"
)
@click.option(
"--auto-resolve", "-a", is_flag=True, help="Automatically resolve simple conflicts"
)
def migrate(target: str, dry_run: bool, auto_resolve: bool):
"""Create migration plan for integrating Kaizen agents into existing project."""
from .migration import AgentMigrationPlanner, AgentMigrator
target_path = Path(target).resolve()
if not target_path.exists():
click.echo(f"Error: Directory not found: {target_path}")
sys.exit(1)
click.echo(f"Creating migration plan for: {target_path}")
click.echo("=" * 50)
planner = AgentMigrationPlanner()
integration_plan = planner.create_integration_plan(target_path)
if (
not integration_plan.migration_plans
and not integration_plan.conflict_resolutions
):
click.echo("✨ No migration needed - project is ready for Kaizen agents!")
click.echo(" Run: kaizen-agentic install <agent-names>")
return
# Show migration plans
if integration_plan.migration_plans:
click.echo(f"\n🔄 Migration Plans ({len(integration_plan.migration_plans)}):")
for plan in integration_plan.migration_plans:
strategy_emoji = {
"replace": "🔄",
"extend": "🔗",
"preserve": "💾",
"merge": "🔀",
"remove": "🗑️",
}
emoji = strategy_emoji.get(plan.strategy.value, "")
click.echo(
f" {emoji} {plan.source_agent.name} ({plan.source_agent.type.value})"
)
click.echo(f" Strategy: {plan.strategy.value}")
if plan.target_agent:
click.echo(f" Target: {plan.target_agent}")
for note in plan.migration_notes:
click.echo(f" 📝 {note}")
# Show conflict resolutions
if integration_plan.conflict_resolutions:
click.echo(
f"\n⚠️ Conflict Resolutions ({len(integration_plan.conflict_resolutions)}):"
)
for resolution in integration_plan.conflict_resolutions:
click.echo(f"{resolution.agent1} vs {resolution.agent2}")
click.echo(f" Resolution: {resolution.resolution.value}")
if resolution.action_details:
for key, value in resolution.action_details.items():
click.echo(f" {key}: {value}")
# Show integration order
if integration_plan.integration_order:
click.echo("\n📋 Integration Order:")
for i, agent_name in enumerate(integration_plan.integration_order, 1):
click.echo(f" {i}. {agent_name}")
# Show post-migration tasks
if integration_plan.post_migration_tasks:
click.echo("\n✅ Post-Migration Tasks:")
for task in integration_plan.post_migration_tasks:
click.echo(f"{task}")
# Execute migration if requested
if not dry_run:
click.echo("\n🚀 Executing migration...")
migrator = AgentMigrator()
results = migrator.execute_migration(integration_plan, dry_run=False)
click.echo("\n📊 Migration Results:")
for agent, result in results.items():
status_emoji = "" if "ERROR" not in result else ""
click.echo(f" {status_emoji} {agent}: {result}")
click.echo(f"\n💾 Backup created at: {integration_plan.backup_directory}")
else:
click.echo(
"\n🔍 This was a dry run. Use --no-dry-run to execute the migration."
)
click.echo(
f" Backup would be created at: {integration_plan.backup_directory}"
)
@cli.group()
def extensions():
"""Manage agent extensions."""
pass
@extensions.command()
@click.option("--target", "-t", default=".", help="Target directory (default: current)")
@click.option("--base-agent", "-b", help="Filter by base agent")
def list_extensions(target: str, base_agent: Optional[str]):
"""List installed extensions."""
from .extensions import ExtensionManager
target_path = Path(target).resolve()
manager = ExtensionManager(target_path)
extensions_list = manager.list_extensions(base_agent)
if not extensions_list:
if base_agent:
click.echo(f"No extensions found for agent: {base_agent}")
else:
click.echo("No extensions installed in this project")
return
click.echo(f"Extensions in {target_path}:")
click.echo("=" * 40)
for ext in extensions_list:
status = "" if ext.enabled else ""
click.echo(f"\n{status} {ext.name} (extends {ext.base_agent})")
click.echo(f" Type: {ext.extension_type.value}")
click.echo(f" Description: {ext.description}")
click.echo(f" Version: {ext.version}")
if ext.custom_commands:
click.echo(f" Custom commands: {', '.join(ext.custom_commands.keys())}")
@extensions.command()
@click.argument("name")
@click.argument("base_agent")
@click.option("--target", "-t", default=".", help="Target directory (default: current)")
@click.option("--description", "-d", help="Extension description")
@click.option("--template", default="basic", help="Template type (basic, advanced)")
def create(
name: str, base_agent: str, target: str, description: Optional[str], template: str
):
"""Create a new agent extension."""
from .extensions import ExtensionManager, ExtensionType, create_extension_template
target_path = Path(target).resolve()
manager = ExtensionManager(target_path)
# Generate template
template_content = create_extension_template(
name, base_agent, target_path, template
)
# Save template to file
template_dir = target_path / ".kaizen" / "extensions" / name
template_dir.mkdir(parents=True, exist_ok=True)
template_file = template_dir / "template.md"
template_file.write_text(template_content)
# Create basic extension
manager.create_extension(
name=name,
base_agent=base_agent,
extension_type=ExtensionType.FUNCTIONAL_EXTENSION,
description=description or f"Custom extension for {base_agent}",
)
click.echo(f"✅ Created extension: {name}")
click.echo(f" Base agent: {base_agent}")
click.echo(f" Template saved to: {template_file}")
click.echo(
f" Edit the configuration and run: kaizen-agentic extensions enable {name}"
)
@extensions.command()
@click.argument("name")
@click.option("--target", "-t", default=".", help="Target directory (default: current)")
def enable(name: str, target: str):
"""Enable an extension."""
from .extensions import ExtensionManager
target_path = Path(target).resolve()
manager = ExtensionManager(target_path)
if manager.enable_extension(name):
click.echo(f"✅ Enabled extension: {name}")
else:
click.echo(f"❌ Extension not found: {name}")
@extensions.command()
@click.argument("name")
@click.option("--target", "-t", default=".", help="Target directory (default: current)")
def disable(name: str, target: str):
"""Disable an extension."""
from .extensions import ExtensionManager
target_path = Path(target).resolve()
manager = ExtensionManager(target_path)
if manager.disable_extension(name):
click.echo(f"❌ Disabled extension: {name}")
else:
click.echo(f"❌ Extension not found: {name}")
@extensions.command()
@click.argument("name")
@click.option("--target", "-t", default=".", help="Target directory (default: current)")
@click.confirmation_option(prompt="Are you sure you want to remove this extension?")
def remove(name: str, target: str):
"""Remove an extension."""
from .extensions import ExtensionManager
target_path = Path(target).resolve()
manager = ExtensionManager(target_path)
if manager.remove_extension(name):
click.echo(f"🗑️ Removed extension: {name}")
else:
click.echo(f"❌ Extension not found: {name}")
@cli.group()
def memory():
"""Manage project-scoped agent memory (.kaizen/agents/<name>/memory.md)."""
pass
@memory.command("show")
@click.argument("agent_name")
@click.option("--target", "-t", default=".", help="Project root (default: current)")
def memory_show(agent_name: str, target: str):
"""Print agent memory for the current project."""
memory_path = _memory_path(target, agent_name)
if not memory_path.exists():
click.echo(f"No memory found for agent '{agent_name}'.")
click.echo(f" Expected: {memory_path}")
click.echo(f" Run: kaizen-agentic memory init {agent_name}")
return
click.echo(memory_path.read_text())
@memory.command("init")
@click.argument("agent_name")
@click.option("--target", "-t", default=".", help="Project root (default: current)")
def memory_init(agent_name: str, target: str):
"""Scaffold an empty memory file for an agent."""
memory_path = _memory_path(target, agent_name)
if memory_path.exists():
click.echo(f"Memory file already exists: {memory_path}")
return
memory_path.parent.mkdir(parents=True, exist_ok=True)
project_name = Path(target).resolve().name
content = f"""---
agent: {agent_name}
project: {project_name}
last_updated: {_today()}
session_count: 0
---
## Project Context
<!-- What this agent knows about the project it works in -->
## Accumulated Findings
<!-- Patterns, recurring issues, key decisions encountered -->
## What Worked
<!-- Approaches that produced good results in this project -->
## Watch Points
<!-- Recurring risks, traps, or areas requiring extra care -->
## Open Threads
<!-- Things noticed but not yet acted on -->
## Session Log
<!-- One-line entry per session: date · summary · outcome -->
"""
memory_path.write_text(content)
click.echo(f"Initialized memory for '{agent_name}': {memory_path}")
# For agents with protocols, note the protocol location
registry = _get_registry()
protocols_dir = registry.agents_dir / "protocols" / agent_name
if protocols_dir.exists():
slugs = [f.stem for f in sorted(protocols_dir.glob("*.md")) if f.name != "README.md"]
if slugs:
click.echo(f" Protocols available for '{agent_name}':")
for slug in slugs:
click.echo(f" kaizen-agentic protocols show {agent_name} {slug}")
@memory.command("brief")
@click.argument("agent_name")
@click.option("--target", "-t", default=".", help="Project root (default: current)")
@click.option("--raw", is_flag=True, help="Dump raw memory files without synthesis header")
def memory_brief(agent_name: str, target: str, raw: bool):
"""Print a coach-synthesised orientation for an agent.
Reads all agent memories in the project and formats an orientation brief
for the specified agent, following the coach agent (agents/agent-coach.md)
output format. Pass to a Claude session with the coach agent loaded for
full LLM synthesis.
"""
project_root = Path(target).resolve()
kaizen_dir = project_root / ".kaizen" / "agents"
project_name = project_root.name
# Collect all agent memories
own_memory: Optional[str] = None
other_memories: dict = {}
if kaizen_dir.exists():
for agent_dir in sorted(kaizen_dir.iterdir()):
if not agent_dir.is_dir():
continue
mf = agent_dir / "memory.md"
if not mf.exists():
continue
if agent_dir.name == agent_name:
own_memory = mf.read_text()
else:
other_memories[agent_dir.name] = mf.read_text()
if raw:
if own_memory:
click.echo(f"=== {agent_name} ===\n{own_memory}")
for name, content in other_memories.items():
click.echo(f"=== {name} ===\n{content}")
return
from datetime import date as _date
today = _date.today().isoformat()
sources = ([agent_name] if own_memory else []) + list(other_memories.keys())
click.echo(f"## Orientation Brief for: {agent_name}")
click.echo(f"Project: {project_name}")
click.echo(f"Generated: {today}")
click.echo(f"Sources: {', '.join(sources) if sources else 'none'}")
click.echo()
if not sources:
click.echo("No agent memory files found in this project.")
click.echo(f" Run: kaizen-agentic memory init {agent_name}")
click.echo(" Then load the coach agent (agents/agent-coach.md) for synthesis.")
return
# Own memory section
if own_memory:
click.echo("### Your Memory")
click.echo(own_memory)
else:
click.echo(f"### Your Memory\n(none — run: kaizen-agentic memory init {agent_name})\n")
# Cross-agent context
if other_memories:
click.echo("### Context From Other Agents")
click.echo("(Load coach agent for full synthesis. Raw content below.)\n")
for name, content in other_memories.items():
click.echo(f"--- {name} ---")
click.echo(content)
else:
click.echo("### Context From Other Agents\nNo other agent memories found in this project.\n")
click.echo("---")
click.echo("Tip: Load agents/agent-coach.md in your Claude session and pass this output")
click.echo(" for a full cross-agent synthesis and orientation brief.")
@memory.command("clear")
@click.argument("agent_name")
@click.option("--target", "-t", default=".", help="Project root (default: current)")
@click.confirmation_option(prompt="This will permanently delete the agent memory. Continue?")
def memory_clear(agent_name: str, target: str):
"""Wipe agent memory for the current project."""
memory_path = _memory_path(target, agent_name)
if not memory_path.exists():
click.echo(f"No memory found for agent '{agent_name}' — nothing to clear.")
return
memory_path.unlink()
click.echo(f"Cleared memory for '{agent_name}': {memory_path}")
# Remove empty parent directory
if not any(memory_path.parent.iterdir()):
memory_path.parent.rmdir()
@cli.group()
def protocols():
"""Browse agent protocol runbooks (agents/protocols/<agent>/<slug>.md)."""
pass
@protocols.command("list")
@click.argument("agent_name", required=False)
def protocols_list(agent_name: Optional[str]):
"""List available protocols, optionally filtered by agent."""
registry = _get_registry()
protocols_dir = registry.agents_dir / "protocols"
if not protocols_dir.exists():
click.echo("No protocols directory found.")
return
found = []
agent_dirs = (
[protocols_dir / agent_name] if agent_name else sorted(protocols_dir.iterdir())
)
for agent_dir in agent_dirs:
if not agent_dir.is_dir() or agent_dir.name == "__pycache__":
continue
for protocol_file in sorted(agent_dir.glob("*.md")):
if protocol_file.name == "README.md":
continue
# Try to read title from frontmatter
title = protocol_file.stem.replace("-", " ").title()
try:
content = protocol_file.read_text()
for line in content.splitlines():
if line.startswith("title:"):
title = line.split(":", 1)[1].strip().strip('"')
break
except Exception:
pass
found.append((agent_dir.name, protocol_file.stem, title))
if not found:
if agent_name:
click.echo(f"No protocols found for agent '{agent_name}'.")
else:
click.echo("No protocols found.")
return
click.echo("Available Protocols:")
click.echo("=" * 40)
current_agent = None
for agent, slug, title in found:
if agent != current_agent:
click.echo(f"\n {agent}:")
current_agent = agent
click.echo(f"{slug}: {title}")
@protocols.command("show")
@click.argument("agent_name")
@click.argument("slug")
def protocols_show(agent_name: str, slug: str):
"""Print a protocol runbook."""
registry = _get_registry()
protocol_path = registry.agents_dir / "protocols" / agent_name / f"{slug}.md"
if not protocol_path.exists():
click.echo(f"Protocol not found: {agent_name}/{slug}")
click.echo(f" Expected: {protocol_path}")
click.echo(f" Run: kaizen-agentic protocols list {agent_name}")
return
click.echo(protocol_path.read_text())
def _memory_path(target: str, agent_name: str) -> Path:
return Path(target).resolve() / ".kaizen" / "agents" / agent_name / "memory.md"
def _today() -> str:
from datetime import date
return date.today().isoformat()
def _get_registry() -> AgentRegistry:
"""Get the agent registry."""
# Try to find agents directory
current_dir = Path.cwd()
# Check if we're in a kaizen-agentic project
if (current_dir / "agents").exists():
agents_dir = current_dir / "agents"
elif (current_dir / "src" / "kaizen_agentic").exists():
# We're in the kaizen-agentic repo itself
agents_dir = current_dir / "agents"
else:
# Try to find installed package
try:
import kaizen_agentic
package_dir = Path(kaizen_agentic.__file__).parent.parent.parent
agents_dir = package_dir / "agents"
if not agents_dir.exists():
# Try relative to package
agents_dir = Path(kaizen_agentic.__file__).parent / "data" / "agents"
except ImportError:
click.echo("Error: Could not find agents directory")
click.echo(
"Make sure you're in a kaizen-agentic project or have the package installed"
)
sys.exit(1)
if not agents_dir.exists():
click.echo(f"Error: Agents directory not found: {agents_dir}")
sys.exit(1)
return AgentRegistry(agents_dir)
if __name__ == "__main__":
cli()