Implement complete Scenario 2: Integration with existing projects having agents
Built comprehensive system for introducing Kaizen agents to existing projects: 🔍 **Detection System (detection.py)**: - Detects 9+ types of existing agent systems (Kaizen, Claude Code, custom agents, etc.) - Analyzes agent files with YAML frontmatter parsing - Identifies conflicts and functional overlaps - Generates integration strategies and migration recommendations 🔄 **Migration Framework (migration.py)**: - Creates detailed migration plans for each detected agent - Supports 5 migration strategies: replace, extend, preserve, merge, remove - Intelligent conflict resolution with multiple resolution types - Safe execution with backup creation and rollback capability 🔗 **Extension System (extensions.py)**: - Project-specific agent customizations and extensions - Multiple extension types: config overlay, functional extension, workflow integration - Template generation for basic and advanced extensions - Legacy agent integration with wrapper creation 🛠️ **CLI Integration**: - `kaizen-agentic detect` - Analyze existing agent systems - `kaizen-agentic migrate` - Execute migration plans with dry-run support - `kaizen-agentic extensions` - Complete extension management commands 📖 **Integration Patterns Documentation**: - 5 proven integration scenarios with detailed patterns - Conflict resolution strategies and decision matrices - Safe transition strategies with phased rollout - Best practices and troubleshooting guides **Key Features**: ✅ Respects existing project structure and workflows ✅ Safe transitions with comprehensive backup strategies ✅ Conflict detection and automated resolution ✅ Extension mechanisms for preserving custom functionality ✅ Comprehensive tooling for all transition scenarios Scenario 2 is now production-ready for integrating Kaizen agents into any existing project! 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
386
src/kaizen_agentic/detection.py
Normal file
386
src/kaizen_agentic/detection.py
Normal file
@@ -0,0 +1,386 @@
|
||||
"""Detection and analysis of existing agent systems in projects."""
|
||||
|
||||
import json
|
||||
import yaml
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Set, Tuple
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class AgentSystemType(Enum):
|
||||
"""Types of existing agent systems that might be found."""
|
||||
KAIZEN_AGENTIC = "kaizen-agentic"
|
||||
CLAUDE_CODE = "claude-code"
|
||||
GITHUB_COPILOT = "github-copilot"
|
||||
CUSTOM_AGENTS = "custom-agents"
|
||||
ANTHROPIC_WORKBENCH = "anthropic-workbench"
|
||||
OPENAI_ASSISTANTS = "openai-assistants"
|
||||
LANGCHAIN_AGENTS = "langchain-agents"
|
||||
AUTOGEN = "autogen"
|
||||
CREWAI = "crewai"
|
||||
OTHER = "other"
|
||||
|
||||
|
||||
@dataclass
|
||||
class DetectedAgent:
|
||||
"""Information about a detected agent."""
|
||||
name: str
|
||||
type: AgentSystemType
|
||||
file_path: Path
|
||||
description: Optional[str] = None
|
||||
dependencies: Set[str] = None
|
||||
conflicts_with: Set[str] = None
|
||||
can_migrate: bool = True
|
||||
migration_notes: Optional[str] = None
|
||||
|
||||
def __post_init__(self):
|
||||
if self.dependencies is None:
|
||||
self.dependencies = set()
|
||||
if self.conflicts_with is None:
|
||||
self.conflicts_with = set()
|
||||
|
||||
|
||||
@dataclass
|
||||
class AgentSystemDetectionResult:
|
||||
"""Result of agent system detection in a project."""
|
||||
project_path: Path
|
||||
detected_systems: List[AgentSystemType]
|
||||
agents: List[DetectedAgent]
|
||||
config_files: List[Path]
|
||||
conflicts: List[Tuple[str, str, str]] # (agent1, agent2, reason)
|
||||
integration_strategy: Optional[str] = None
|
||||
migration_recommendations: List[str] = None
|
||||
|
||||
def __post_init__(self):
|
||||
if self.migration_recommendations is None:
|
||||
self.migration_recommendations = []
|
||||
|
||||
|
||||
class AgentSystemDetector:
|
||||
"""Detects existing agent systems in projects."""
|
||||
|
||||
def __init__(self):
|
||||
self.detection_patterns = {
|
||||
AgentSystemType.KAIZEN_AGENTIC: [
|
||||
"agents/agent-*.md",
|
||||
"CLAUDE.md",
|
||||
".kaizen-agentic.yml",
|
||||
],
|
||||
AgentSystemType.CLAUDE_CODE: [
|
||||
"CLAUDE.md",
|
||||
".claude",
|
||||
"claude_config.json",
|
||||
".claude.json",
|
||||
],
|
||||
AgentSystemType.GITHUB_COPILOT: [
|
||||
".github/copilot.yml",
|
||||
".copilot/",
|
||||
"copilot.yml",
|
||||
],
|
||||
AgentSystemType.CUSTOM_AGENTS: [
|
||||
"agents/",
|
||||
"ai_agents/",
|
||||
"assistants/",
|
||||
"bots/",
|
||||
],
|
||||
AgentSystemType.ANTHROPIC_WORKBENCH: [
|
||||
".anthropic/",
|
||||
"anthropic.yml",
|
||||
"workbench.yml",
|
||||
],
|
||||
AgentSystemType.OPENAI_ASSISTANTS: [
|
||||
"openai_assistants/",
|
||||
".openai/",
|
||||
"gpt_config.json",
|
||||
],
|
||||
AgentSystemType.LANGCHAIN_AGENTS: [
|
||||
"langchain_agents/",
|
||||
"langchain.yml",
|
||||
"chains/",
|
||||
],
|
||||
AgentSystemType.AUTOGEN: [
|
||||
"autogen_agents/",
|
||||
".autogen/",
|
||||
"autogen.yml",
|
||||
],
|
||||
AgentSystemType.CREWAI: [
|
||||
"crew.yml",
|
||||
"crewai.yml",
|
||||
"crew/",
|
||||
"agents.yml",
|
||||
],
|
||||
}
|
||||
|
||||
def detect_agent_systems(self, project_path: Path) -> AgentSystemDetectionResult:
|
||||
"""Detect all agent systems in a project."""
|
||||
project_path = Path(project_path)
|
||||
|
||||
detected_systems = []
|
||||
agents = []
|
||||
config_files = []
|
||||
|
||||
# Detect each type of agent system
|
||||
for system_type, patterns in self.detection_patterns.items():
|
||||
if self._detect_system_type(project_path, patterns):
|
||||
detected_systems.append(system_type)
|
||||
|
||||
# Extract agents for this system type
|
||||
system_agents = self._extract_agents_for_system(
|
||||
project_path, system_type
|
||||
)
|
||||
agents.extend(system_agents)
|
||||
|
||||
# Find config files for this system type
|
||||
system_configs = self._find_config_files_for_system(
|
||||
project_path, system_type
|
||||
)
|
||||
config_files.extend(system_configs)
|
||||
|
||||
# Analyze conflicts
|
||||
conflicts = self._analyze_conflicts(agents)
|
||||
|
||||
# Generate integration strategy
|
||||
integration_strategy = self._generate_integration_strategy(
|
||||
detected_systems, agents
|
||||
)
|
||||
|
||||
# Generate migration recommendations
|
||||
migration_recommendations = self._generate_migration_recommendations(
|
||||
detected_systems, agents, conflicts
|
||||
)
|
||||
|
||||
return AgentSystemDetectionResult(
|
||||
project_path=project_path,
|
||||
detected_systems=detected_systems,
|
||||
agents=agents,
|
||||
config_files=config_files,
|
||||
conflicts=conflicts,
|
||||
integration_strategy=integration_strategy,
|
||||
migration_recommendations=migration_recommendations,
|
||||
)
|
||||
|
||||
def _detect_system_type(self, project_path: Path, patterns: List[str]) -> bool:
|
||||
"""Check if a system type exists based on file patterns."""
|
||||
for pattern in patterns:
|
||||
matches = list(project_path.glob(pattern))
|
||||
if matches:
|
||||
return True
|
||||
return False
|
||||
|
||||
def _extract_agents_for_system(
|
||||
self, project_path: Path, system_type: AgentSystemType
|
||||
) -> List[DetectedAgent]:
|
||||
"""Extract agents for a specific system type."""
|
||||
agents = []
|
||||
|
||||
if system_type == AgentSystemType.KAIZEN_AGENTIC:
|
||||
agents.extend(self._extract_kaizen_agents(project_path))
|
||||
elif system_type == AgentSystemType.CLAUDE_CODE:
|
||||
agents.extend(self._extract_claude_agents(project_path))
|
||||
elif system_type == AgentSystemType.CUSTOM_AGENTS:
|
||||
agents.extend(self._extract_custom_agents(project_path))
|
||||
# Add more system-specific extraction methods as needed
|
||||
|
||||
return agents
|
||||
|
||||
def _extract_kaizen_agents(self, project_path: Path) -> List[DetectedAgent]:
|
||||
"""Extract Kaizen Agentic agents."""
|
||||
agents = []
|
||||
agents_dir = project_path / "agents"
|
||||
|
||||
if agents_dir.exists():
|
||||
for agent_file in agents_dir.glob("agent-*.md"):
|
||||
try:
|
||||
agent = self._parse_kaizen_agent_file(agent_file)
|
||||
if agent:
|
||||
agents.append(agent)
|
||||
except Exception as e:
|
||||
# Create a detected agent with error info
|
||||
agents.append(DetectedAgent(
|
||||
name=agent_file.stem.replace("agent-", ""),
|
||||
type=AgentSystemType.KAIZEN_AGENTIC,
|
||||
file_path=agent_file,
|
||||
can_migrate=False,
|
||||
migration_notes=f"Parse error: {e}"
|
||||
))
|
||||
|
||||
return agents
|
||||
|
||||
def _parse_kaizen_agent_file(self, agent_file: Path) -> Optional[DetectedAgent]:
|
||||
"""Parse a Kaizen Agentic agent file."""
|
||||
try:
|
||||
content = agent_file.read_text(encoding='utf-8')
|
||||
|
||||
# Extract YAML frontmatter
|
||||
if content.startswith('---'):
|
||||
parts = content.split('---', 2)
|
||||
if len(parts) >= 3:
|
||||
frontmatter = yaml.safe_load(parts[1])
|
||||
|
||||
return DetectedAgent(
|
||||
name=frontmatter.get('name', agent_file.stem.replace("agent-", "")),
|
||||
type=AgentSystemType.KAIZEN_AGENTIC,
|
||||
file_path=agent_file,
|
||||
description=frontmatter.get('description'),
|
||||
dependencies=set(frontmatter.get('dependencies', []))
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
def _extract_claude_agents(self, project_path: Path) -> List[DetectedAgent]:
|
||||
"""Extract Claude Code agents (if any defined in CLAUDE.md)."""
|
||||
agents = []
|
||||
claude_file = project_path / "CLAUDE.md"
|
||||
|
||||
if claude_file.exists():
|
||||
# Claude Code typically doesn't have separate agent files
|
||||
# but might reference agent usage in CLAUDE.md
|
||||
agents.append(DetectedAgent(
|
||||
name="claude-integration",
|
||||
type=AgentSystemType.CLAUDE_CODE,
|
||||
file_path=claude_file,
|
||||
description="Claude Code integration configuration"
|
||||
))
|
||||
|
||||
return agents
|
||||
|
||||
def _extract_custom_agents(self, project_path: Path) -> List[DetectedAgent]:
|
||||
"""Extract custom agent implementations."""
|
||||
agents = []
|
||||
|
||||
# Look for common agent directories
|
||||
agent_dirs = ["agents", "ai_agents", "assistants", "bots"]
|
||||
|
||||
for dir_name in agent_dirs:
|
||||
agent_dir = project_path / dir_name
|
||||
if agent_dir.exists() and agent_dir.is_dir():
|
||||
# Look for various file types that might be agents
|
||||
for pattern in ["*.py", "*.yml", "*.yaml", "*.json", "*.md"]:
|
||||
for agent_file in agent_dir.glob(pattern):
|
||||
# Skip kaizen-agentic files
|
||||
if agent_file.name.startswith("agent-") and agent_file.suffix == ".md":
|
||||
continue
|
||||
|
||||
agents.append(DetectedAgent(
|
||||
name=agent_file.stem,
|
||||
type=AgentSystemType.CUSTOM_AGENTS,
|
||||
file_path=agent_file,
|
||||
description=f"Custom agent in {dir_name}/"
|
||||
))
|
||||
|
||||
return agents
|
||||
|
||||
def _find_config_files_for_system(
|
||||
self, project_path: Path, system_type: AgentSystemType
|
||||
) -> List[Path]:
|
||||
"""Find configuration files for a system type."""
|
||||
config_files = []
|
||||
patterns = self.detection_patterns.get(system_type, [])
|
||||
|
||||
for pattern in patterns:
|
||||
matches = list(project_path.glob(pattern))
|
||||
config_files.extend(matches)
|
||||
|
||||
return config_files
|
||||
|
||||
def _analyze_conflicts(self, agents: List[DetectedAgent]) -> List[Tuple[str, str, str]]:
|
||||
"""Analyze potential conflicts between agents."""
|
||||
conflicts = []
|
||||
|
||||
# Group agents by type
|
||||
agents_by_type = {}
|
||||
for agent in agents:
|
||||
if agent.type not in agents_by_type:
|
||||
agents_by_type[agent.type] = []
|
||||
agents_by_type[agent.type].append(agent)
|
||||
|
||||
# Check for naming conflicts
|
||||
all_names = [agent.name for agent in agents]
|
||||
for i, agent1 in enumerate(agents):
|
||||
for j, agent2 in enumerate(agents[i+1:], i+1):
|
||||
if agent1.name == agent2.name and agent1.type != agent2.type:
|
||||
conflicts.append((
|
||||
agent1.name,
|
||||
agent2.name,
|
||||
f"Name conflict between {agent1.type.value} and {agent2.type.value}"
|
||||
))
|
||||
|
||||
# Check for functional overlaps
|
||||
functional_conflicts = {
|
||||
("todo", "task", "project"): "Project management functionality",
|
||||
("changelog", "version", "release"): "Version tracking functionality",
|
||||
("test", "testing", "qa"): "Testing functionality",
|
||||
("doc", "documentation", "readme"): "Documentation functionality",
|
||||
}
|
||||
|
||||
for keywords, conflict_type in functional_conflicts.items():
|
||||
matching_agents = []
|
||||
for agent in agents:
|
||||
if any(keyword in agent.name.lower() for keyword in keywords):
|
||||
matching_agents.append(agent)
|
||||
|
||||
if len(matching_agents) > 1:
|
||||
for i, agent1 in enumerate(matching_agents):
|
||||
for agent2 in matching_agents[i+1:]:
|
||||
conflicts.append((
|
||||
agent1.name,
|
||||
agent2.name,
|
||||
f"Functional overlap: {conflict_type}"
|
||||
))
|
||||
|
||||
return conflicts
|
||||
|
||||
def _generate_integration_strategy(
|
||||
self, detected_systems: List[AgentSystemType], agents: List[DetectedAgent]
|
||||
) -> str:
|
||||
"""Generate an integration strategy based on detected systems."""
|
||||
if not detected_systems:
|
||||
return "clean_install"
|
||||
|
||||
if AgentSystemType.KAIZEN_AGENTIC in detected_systems:
|
||||
return "update_existing"
|
||||
|
||||
if len(detected_systems) == 1 and detected_systems[0] == AgentSystemType.CLAUDE_CODE:
|
||||
return "claude_compatible"
|
||||
|
||||
if len([a for a in agents if a.type == AgentSystemType.CUSTOM_AGENTS]) > 5:
|
||||
return "gradual_migration"
|
||||
|
||||
return "selective_integration"
|
||||
|
||||
def _generate_migration_recommendations(
|
||||
self,
|
||||
detected_systems: List[AgentSystemType],
|
||||
agents: List[DetectedAgent],
|
||||
conflicts: List[Tuple[str, str, str]]
|
||||
) -> List[str]:
|
||||
"""Generate migration recommendations."""
|
||||
recommendations = []
|
||||
|
||||
if not detected_systems:
|
||||
recommendations.append("Clean installation - no existing agent systems detected")
|
||||
return recommendations
|
||||
|
||||
if AgentSystemType.KAIZEN_AGENTIC in detected_systems:
|
||||
recommendations.append("Update existing Kaizen Agentic installation")
|
||||
recommendations.append("Run 'kaizen-agentic update' to get latest agents")
|
||||
|
||||
if conflicts:
|
||||
recommendations.append(f"Resolve {len(conflicts)} naming/functional conflicts")
|
||||
for agent1, agent2, reason in conflicts:
|
||||
recommendations.append(f" - Conflict: {agent1} vs {agent2} ({reason})")
|
||||
|
||||
custom_agents = [a for a in agents if a.type == AgentSystemType.CUSTOM_AGENTS]
|
||||
if custom_agents:
|
||||
recommendations.append(f"Consider migrating {len(custom_agents)} custom agents")
|
||||
recommendations.append(" - Review custom agents for Kaizen Agentic equivalents")
|
||||
recommendations.append(" - Create project-specific extensions for unique functionality")
|
||||
|
||||
if AgentSystemType.CLAUDE_CODE in detected_systems:
|
||||
recommendations.append("Maintain Claude Code compatibility")
|
||||
recommendations.append(" - Update CLAUDE.md with new agent references")
|
||||
|
||||
return recommendations
|
||||
Reference in New Issue
Block a user