"""Migration and conflict resolution tools for existing agent systems.""" import json import shutil import yaml from pathlib import Path from typing import Dict, List, Optional, Set, Tuple from dataclasses import dataclass from enum import Enum from .detection import AgentSystemDetector, DetectedAgent, AgentSystemType class MigrationStrategy(Enum): """Strategies for migrating existing agents.""" REPLACE = "replace" # Replace with Kaizen equivalent EXTEND = "extend" # Extend Kaizen agent with custom functionality PRESERVE = "preserve" # Keep custom agent alongside Kaizen agents MERGE = "merge" # Merge functionality into existing Kaizen agent REMOVE = "remove" # Remove conflicting agent class ConflictResolution(Enum): """Ways to resolve conflicts between agents.""" RENAME = "rename" # Rename one of the conflicting agents NAMESPACE = "namespace" # Put agents in different namespaces MERGE_FUNCTIONALITY = "merge" # Combine functionality CHOOSE_KAIZEN = "choose_kaizen" # Keep Kaizen agent, remove custom CHOOSE_CUSTOM = "choose_custom" # Keep custom agent, skip Kaizen MANUAL_REVIEW = "manual" # Require manual resolution @dataclass class MigrationPlan: """Plan for migrating an existing agent.""" source_agent: DetectedAgent strategy: MigrationStrategy target_agent: Optional[str] = None # Kaizen agent name if applicable backup_path: Optional[Path] = None custom_config: Optional[Dict] = None migration_notes: List[str] = None def __post_init__(self): if self.migration_notes is None: self.migration_notes = [] @dataclass class ConflictResolutionPlan: """Plan for resolving a conflict between agents.""" agent1: str agent2: str conflict_type: str resolution: ConflictResolution action_details: Dict = None def __post_init__(self): if self.action_details is None: self.action_details = {} @dataclass class IntegrationPlan: """Complete plan for integrating Kaizen agents into an existing project.""" project_path: Path migration_plans: List[MigrationPlan] conflict_resolutions: List[ConflictResolutionPlan] backup_directory: Path integration_order: List[str] # Order to perform migrations post_migration_tasks: List[str] class AgentMigrationPlanner: """Creates migration plans for existing agent systems.""" def __init__(self): self.kaizen_equivalents = { # Map common custom agent names to Kaizen equivalents "todo": "keepaTodofile", "todo_manager": "keepaTodofile", "todo-manager": "keepaTodofile", "task_manager": "keepaTodofile", "changelog": "keepaChangelog", "changelog_bot": "keepaChangelog", "changelog-bot": "keepaChangelog", "version_manager": "keepaChangelog", "code_review": "code-refactoring", "code_reviewer": "code-refactoring", "code-reviewer": "code-refactoring", "refactor": "code-refactoring", "test_helper": "testing-efficiency", "test_runner": "testing-efficiency", "test-runner": "testing-efficiency", "testing_helper": "testing-efficiency", "tdd": "tdd-workflow", "tdd_helper": "tdd-workflow", "setup": "setupRepository", "setup_repo": "setupRepository", "repo_setup": "setupRepository", "project_setup": "setupRepository", "docs": "claude-documentation", "documentation": "claude-documentation", "doc_generator": "claude-documentation", } self.functional_categories = { "project_management": ["keepaTodofile", "project-management", "priority-evaluation"], "testing": ["testing-efficiency", "test-maintenance", "tdd-workflow"], "documentation": ["claude-documentation", "keepaContributingfile"], "code_quality": ["code-refactoring", "datamodel-optimization", "optimization"], "infrastructure": ["setupRepository", "tooling-optimization"], "version_control": ["keepaChangelog"], } def create_integration_plan(self, project_path: Path) -> IntegrationPlan: """Create a complete integration plan for a project.""" detector = AgentSystemDetector() detection_result = detector.detect_agent_systems(project_path) # Create backup directory backup_dir = project_path / f".kaizen-migration-backup-{int(Path().stat().st_mtime)}" # Create migration plans for each detected agent migration_plans = [] for agent in detection_result.agents: if agent.type != AgentSystemType.KAIZEN_AGENTIC: plan = self._create_migration_plan(agent, project_path, backup_dir) migration_plans.append(plan) # Create conflict resolution plans conflict_resolutions = [] for agent1, agent2, reason in detection_result.conflicts: resolution = self._create_conflict_resolution(agent1, agent2, reason) conflict_resolutions.append(resolution) # Determine integration order integration_order = self._determine_integration_order(migration_plans) # Generate post-migration tasks post_migration_tasks = self._generate_post_migration_tasks(detection_result) return IntegrationPlan( project_path=project_path, migration_plans=migration_plans, conflict_resolutions=conflict_resolutions, backup_directory=backup_dir, integration_order=integration_order, post_migration_tasks=post_migration_tasks ) def _create_migration_plan( self, agent: DetectedAgent, project_path: Path, backup_dir: Path ) -> MigrationPlan: """Create a migration plan for a single agent.""" # Determine if there's a Kaizen equivalent agent_name_lower = agent.name.lower().replace("-", "_").replace(" ", "_") target_agent = self.kaizen_equivalents.get(agent_name_lower) # Determine migration strategy if target_agent: if agent.type == AgentSystemType.CUSTOM_AGENTS: # Check if custom agent has unique functionality if self._has_unique_functionality(agent): strategy = MigrationStrategy.EXTEND else: strategy = MigrationStrategy.REPLACE else: strategy = MigrationStrategy.REPLACE else: # No direct equivalent - preserve or extend if self._is_essential_custom_agent(agent): strategy = MigrationStrategy.PRESERVE else: # Try to find a category match category_match = self._find_category_match(agent) if category_match: strategy = MigrationStrategy.EXTEND target_agent = category_match else: strategy = MigrationStrategy.PRESERVE # Create backup path backup_path = backup_dir / agent.file_path.name # Generate migration notes notes = self._generate_migration_notes(agent, strategy, target_agent) return MigrationPlan( source_agent=agent, strategy=strategy, target_agent=target_agent, backup_path=backup_path, migration_notes=notes ) def _has_unique_functionality(self, agent: DetectedAgent) -> bool: """Check if a custom agent has functionality not available in Kaizen agents.""" # This would analyze the agent's implementation # For now, assume custom Python agents have unique functionality return agent.file_path.suffix == ".py" def _is_essential_custom_agent(self, agent: DetectedAgent) -> bool: """Check if a custom agent is essential to the project.""" # Heuristics for essential agents essential_patterns = [ "deploy", "ci", "cd", "build", "release", "secret", "auth", "database", "api", "server", "client", "integration" ] agent_name_lower = agent.name.lower() return any(pattern in agent_name_lower for pattern in essential_patterns) def _find_category_match(self, agent: DetectedAgent) -> Optional[str]: """Find a Kaizen agent in the same functional category.""" agent_name_lower = agent.name.lower() for category, kaizen_agents in self.functional_categories.items(): # Check if agent name suggests this category if any(cat_word in agent_name_lower for cat_word in category.split("_")): # Return the primary agent for this category return kaizen_agents[0] return None def _create_conflict_resolution( self, agent1: str, agent2: str, reason: str ) -> ConflictResolutionPlan: """Create a conflict resolution plan.""" if "Name conflict" in reason: resolution = ConflictResolution.RENAME action_details = { "rename_agent": agent2, # Rename the second agent "new_name": f"{agent2}_custom" } elif "Functional overlap" in reason: # Check if one is a Kaizen agent if agent1 in self.kaizen_equivalents.values(): resolution = ConflictResolution.CHOOSE_KAIZEN action_details = {"keep": agent1, "remove": agent2} elif agent2 in self.kaizen_equivalents.values(): resolution = ConflictResolution.CHOOSE_KAIZEN action_details = {"keep": agent2, "remove": agent1} else: resolution = ConflictResolution.MERGE_FUNCTIONALITY action_details = {"primary": agent1, "merge_into": agent2} else: resolution = ConflictResolution.MANUAL_REVIEW action_details = {"reason": reason} return ConflictResolutionPlan( agent1=agent1, agent2=agent2, conflict_type=reason, resolution=resolution, action_details=action_details ) def _determine_integration_order(self, migration_plans: List[MigrationPlan]) -> List[str]: """Determine the order to perform migrations.""" # Order by dependency and risk order = [] # 1. Infrastructure agents first infra_agents = [p for p in migration_plans if self._is_infrastructure_agent(p)] order.extend([p.source_agent.name for p in infra_agents]) # 2. Core functionality agents core_agents = [p for p in migration_plans if self._is_core_agent(p) and p not in infra_agents] order.extend([p.source_agent.name for p in core_agents]) # 3. Optional/enhancement agents last remaining = [p for p in migration_plans if p.source_agent.name not in order] order.extend([p.source_agent.name for p in remaining]) return order def _is_infrastructure_agent(self, plan: MigrationPlan) -> bool: """Check if agent is infrastructure-related.""" infra_keywords = ["setup", "config", "infrastructure", "deploy", "build"] agent_name = plan.source_agent.name.lower() return any(keyword in agent_name for keyword in infra_keywords) def _is_core_agent(self, plan: MigrationPlan) -> bool: """Check if agent is core functionality.""" core_keywords = ["todo", "changelog", "test", "code", "review"] agent_name = plan.source_agent.name.lower() return any(keyword in agent_name for keyword in core_keywords) def _generate_migration_notes( self, agent: DetectedAgent, strategy: MigrationStrategy, target_agent: Optional[str] ) -> List[str]: """Generate helpful notes for the migration.""" notes = [] if strategy == MigrationStrategy.REPLACE: notes.append(f"Replace with Kaizen agent: {target_agent}") notes.append("Custom functionality will be lost - review before migrating") elif strategy == MigrationStrategy.EXTEND: notes.append(f"Extend Kaizen agent: {target_agent}") notes.append("Custom functionality can be preserved as extensions") elif strategy == MigrationStrategy.PRESERVE: notes.append("Keep custom agent alongside Kaizen agents") notes.append("Ensure no naming conflicts with new agents") elif strategy == MigrationStrategy.MERGE: notes.append(f"Merge functionality into: {target_agent}") notes.append("Manual review required for feature integration") elif strategy == MigrationStrategy.REMOVE: notes.append("Remove conflicting agent") notes.append("Functionality available in Kaizen agents") # Add file-specific notes if agent.file_path.suffix == ".py": notes.append("Python implementation - may have custom logic") elif agent.file_path.suffix in [".yml", ".yaml"]: notes.append("YAML configuration - check for custom settings") elif agent.file_path.suffix == ".json": notes.append("JSON configuration - verify data format compatibility") return notes def _generate_post_migration_tasks(self, detection_result) -> List[str]: """Generate post-migration tasks.""" tasks = [] if AgentSystemType.CLAUDE_CODE in detection_result.detected_systems: tasks.append("Update CLAUDE.md with new agent references") if detection_result.config_files: tasks.append("Verify all configuration files are updated") tasks.extend([ "Run 'kaizen-agentic validate' to verify installation", "Test all agent functionality", "Update project documentation", "Train team on new agent workflows", "Archive or remove backup files after verification" ]) return tasks class AgentMigrator: """Executes migration plans.""" def __init__(self): self.planner = AgentMigrationPlanner() def execute_migration(self, plan: IntegrationPlan, dry_run: bool = True) -> Dict[str, str]: """Execute a migration plan.""" results = {} if not dry_run: # Create backup directory plan.backup_directory.mkdir(exist_ok=True) # Execute migrations in order for agent_name in plan.integration_order: migration_plan = next( (p for p in plan.migration_plans if p.source_agent.name == agent_name), None ) if migration_plan: result = self._execute_single_migration(migration_plan, dry_run) results[agent_name] = result # Resolve conflicts for conflict_plan in plan.conflict_resolutions: result = self._resolve_conflict(conflict_plan, dry_run) conflict_key = f"{conflict_plan.agent1}_vs_{conflict_plan.agent2}" results[conflict_key] = result return results def _execute_single_migration(self, plan: MigrationPlan, dry_run: bool) -> str: """Execute migration for a single agent.""" if dry_run: return f"DRY_RUN: Would {plan.strategy.value} {plan.source_agent.name}" try: if plan.strategy == MigrationStrategy.REPLACE: return self._replace_agent(plan) elif plan.strategy == MigrationStrategy.EXTEND: return self._extend_agent(plan) elif plan.strategy == MigrationStrategy.PRESERVE: return self._preserve_agent(plan) elif plan.strategy == MigrationStrategy.MERGE: return self._merge_agent(plan) elif plan.strategy == MigrationStrategy.REMOVE: return self._remove_agent(plan) else: return f"UNKNOWN_STRATEGY: {plan.strategy}" except Exception as e: return f"ERROR: {e}" def _replace_agent(self, plan: MigrationPlan) -> str: """Replace custom agent with Kaizen equivalent.""" # Backup the original if plan.backup_path: shutil.copy2(plan.source_agent.file_path, plan.backup_path) # The actual Kaizen agent will be installed separately # For now, just mark for replacement return f"REPLACED: {plan.source_agent.name} -> {plan.target_agent}" def _extend_agent(self, plan: MigrationPlan) -> str: """Extend Kaizen agent with custom functionality.""" # Create extension configuration extension_config = { "base_agent": plan.target_agent, "custom_source": str(plan.source_agent.file_path), "extension_type": "functional_overlay" } extension_path = plan.source_agent.file_path.parent / f"{plan.source_agent.name}_extension.json" with open(extension_path, 'w') as f: json.dump(extension_config, f, indent=2) return f"EXTENDED: {plan.target_agent} with {plan.source_agent.name}" def _preserve_agent(self, plan: MigrationPlan) -> str: """Preserve custom agent alongside Kaizen agents.""" # Rename if necessary to avoid conflicts if plan.source_agent.name in ["todo", "changelog", "test"]: new_name = f"{plan.source_agent.name}_custom" new_path = plan.source_agent.file_path.parent / f"{new_name}{plan.source_agent.file_path.suffix}" shutil.move(plan.source_agent.file_path, new_path) return f"PRESERVED: {plan.source_agent.name} -> {new_name}" return f"PRESERVED: {plan.source_agent.name}" def _merge_agent(self, plan: MigrationPlan) -> str: """Merge agent functionality.""" # This would require more sophisticated analysis return f"MERGE_PLANNED: {plan.source_agent.name} -> {plan.target_agent}" def _remove_agent(self, plan: MigrationPlan) -> str: """Remove conflicting agent.""" if plan.backup_path: shutil.move(plan.source_agent.file_path, plan.backup_path) else: plan.source_agent.file_path.unlink() return f"REMOVED: {plan.source_agent.name}" def _resolve_conflict(self, plan: ConflictResolutionPlan, dry_run: bool) -> str: """Resolve a conflict between agents.""" if dry_run: return f"DRY_RUN: Would resolve {plan.resolution.value} conflict" if plan.resolution == ConflictResolution.RENAME: # Implement renaming logic return f"RENAMED: {plan.action_details}" elif plan.resolution == ConflictResolution.CHOOSE_KAIZEN: return f"RESOLVED: Chose Kaizen agent {plan.action_details['keep']}" else: return f"RESOLUTION_PLANNED: {plan.resolution.value}"