Files
kaizen-agentic/src/kaizen_agentic/detection.py
tegwick d68310793b Fix linting violations for v1.0.0 release preparation
- Apply black formatting to all Python files
- Fix various flake8 violations in agent system code
- Clean up imports and whitespace issues

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-19 20:44:58 +02:00

418 lines
15 KiB
Python

"""Detection and analysis of existing agent systems in projects."""
import yaml
from pathlib import Path
from typing import List, Optional, Set, Tuple
from dataclasses import dataclass
from enum import Enum
class AgentSystemType(Enum):
"""Types of existing agent systems that might be found."""
KAIZEN_AGENTIC = "kaizen-agentic"
CLAUDE_CODE = "claude-code"
GITHUB_COPILOT = "github-copilot"
CUSTOM_AGENTS = "custom-agents"
ANTHROPIC_WORKBENCH = "anthropic-workbench"
OPENAI_ASSISTANTS = "openai-assistants"
LANGCHAIN_AGENTS = "langchain-agents"
AUTOGEN = "autogen"
CREWAI = "crewai"
OTHER = "other"
@dataclass
class DetectedAgent:
"""Information about a detected agent."""
name: str
type: AgentSystemType
file_path: Path
description: Optional[str] = None
dependencies: Set[str] = None
conflicts_with: Set[str] = None
can_migrate: bool = True
migration_notes: Optional[str] = None
def __post_init__(self):
if self.dependencies is None:
self.dependencies = set()
if self.conflicts_with is None:
self.conflicts_with = set()
@dataclass
class AgentSystemDetectionResult:
"""Result of agent system detection in a project."""
project_path: Path
detected_systems: List[AgentSystemType]
agents: List[DetectedAgent]
config_files: List[Path]
conflicts: List[Tuple[str, str, str]] # (agent1, agent2, reason)
integration_strategy: Optional[str] = None
migration_recommendations: List[str] = None
def __post_init__(self):
if self.migration_recommendations is None:
self.migration_recommendations = []
class AgentSystemDetector:
"""Detects existing agent systems in projects."""
def __init__(self):
self.detection_patterns = {
AgentSystemType.KAIZEN_AGENTIC: [
"agents/agent-*.md",
"CLAUDE.md",
".kaizen-agentic.yml",
],
AgentSystemType.CLAUDE_CODE: [
"CLAUDE.md",
".claude",
"claude_config.json",
".claude.json",
],
AgentSystemType.GITHUB_COPILOT: [
".github/copilot.yml",
".copilot/",
"copilot.yml",
],
AgentSystemType.CUSTOM_AGENTS: [
"agents/",
"ai_agents/",
"assistants/",
"bots/",
],
AgentSystemType.ANTHROPIC_WORKBENCH: [
".anthropic/",
"anthropic.yml",
"workbench.yml",
],
AgentSystemType.OPENAI_ASSISTANTS: [
"openai_assistants/",
".openai/",
"gpt_config.json",
],
AgentSystemType.LANGCHAIN_AGENTS: [
"langchain_agents/",
"langchain.yml",
"chains/",
],
AgentSystemType.AUTOGEN: [
"autogen_agents/",
".autogen/",
"autogen.yml",
],
AgentSystemType.CREWAI: [
"crew.yml",
"crewai.yml",
"crew/",
"agents.yml",
],
}
def detect_agent_systems(self, project_path: Path) -> AgentSystemDetectionResult:
"""Detect all agent systems in a project."""
project_path = Path(project_path)
detected_systems = []
agents = []
config_files = []
# Detect each type of agent system
for system_type, patterns in self.detection_patterns.items():
if self._detect_system_type(project_path, patterns):
detected_systems.append(system_type)
# Extract agents for this system type
system_agents = self._extract_agents_for_system(
project_path, system_type
)
agents.extend(system_agents)
# Find config files for this system type
system_configs = self._find_config_files_for_system(
project_path, system_type
)
config_files.extend(system_configs)
# Analyze conflicts
conflicts = self._analyze_conflicts(agents)
# Generate integration strategy
integration_strategy = self._generate_integration_strategy(
detected_systems, agents
)
# Generate migration recommendations
migration_recommendations = self._generate_migration_recommendations(
detected_systems, agents, conflicts
)
return AgentSystemDetectionResult(
project_path=project_path,
detected_systems=detected_systems,
agents=agents,
config_files=config_files,
conflicts=conflicts,
integration_strategy=integration_strategy,
migration_recommendations=migration_recommendations,
)
def _detect_system_type(self, project_path: Path, patterns: List[str]) -> bool:
"""Check if a system type exists based on file patterns."""
for pattern in patterns:
matches = list(project_path.glob(pattern))
if matches:
return True
return False
def _extract_agents_for_system(
self, project_path: Path, system_type: AgentSystemType
) -> List[DetectedAgent]:
"""Extract agents for a specific system type."""
agents = []
if system_type == AgentSystemType.KAIZEN_AGENTIC:
agents.extend(self._extract_kaizen_agents(project_path))
elif system_type == AgentSystemType.CLAUDE_CODE:
agents.extend(self._extract_claude_agents(project_path))
elif system_type == AgentSystemType.CUSTOM_AGENTS:
agents.extend(self._extract_custom_agents(project_path))
# Add more system-specific extraction methods as needed
return agents
def _extract_kaizen_agents(self, project_path: Path) -> List[DetectedAgent]:
"""Extract Kaizen Agentic agents."""
agents = []
agents_dir = project_path / "agents"
if agents_dir.exists():
for agent_file in agents_dir.glob("agent-*.md"):
try:
agent = self._parse_kaizen_agent_file(agent_file)
if agent:
agents.append(agent)
except Exception as e:
# Create a detected agent with error info
agents.append(
DetectedAgent(
name=agent_file.stem.replace("agent-", ""),
type=AgentSystemType.KAIZEN_AGENTIC,
file_path=agent_file,
can_migrate=False,
migration_notes=f"Parse error: {e}",
)
)
return agents
def _parse_kaizen_agent_file(self, agent_file: Path) -> Optional[DetectedAgent]:
"""Parse a Kaizen Agentic agent file."""
try:
content = agent_file.read_text(encoding="utf-8")
# Extract YAML frontmatter
if content.startswith("---"):
parts = content.split("---", 2)
if len(parts) >= 3:
frontmatter = yaml.safe_load(parts[1])
return DetectedAgent(
name=frontmatter.get(
"name", agent_file.stem.replace("agent-", "")
),
type=AgentSystemType.KAIZEN_AGENTIC,
file_path=agent_file,
description=frontmatter.get("description"),
dependencies=set(frontmatter.get("dependencies", [])),
)
except Exception:
pass
return None
def _extract_claude_agents(self, project_path: Path) -> List[DetectedAgent]:
"""Extract Claude Code agents (if any defined in CLAUDE.md)."""
agents = []
claude_file = project_path / "CLAUDE.md"
if claude_file.exists():
# Claude Code typically doesn't have separate agent files
# but might reference agent usage in CLAUDE.md
agents.append(
DetectedAgent(
name="claude-integration",
type=AgentSystemType.CLAUDE_CODE,
file_path=claude_file,
description="Claude Code integration configuration",
)
)
return agents
def _extract_custom_agents(self, project_path: Path) -> List[DetectedAgent]:
"""Extract custom agent implementations."""
agents = []
# Look for common agent directories
agent_dirs = ["agents", "ai_agents", "assistants", "bots"]
for dir_name in agent_dirs:
agent_dir = project_path / dir_name
if agent_dir.exists() and agent_dir.is_dir():
# Look for various file types that might be agents
for pattern in ["*.py", "*.yml", "*.yaml", "*.json", "*.md"]:
for agent_file in agent_dir.glob(pattern):
# Skip kaizen-agentic files
if (
agent_file.name.startswith("agent-")
and agent_file.suffix == ".md"
):
continue
agents.append(
DetectedAgent(
name=agent_file.stem,
type=AgentSystemType.CUSTOM_AGENTS,
file_path=agent_file,
description=f"Custom agent in {dir_name}/",
)
)
return agents
def _find_config_files_for_system(
self, project_path: Path, system_type: AgentSystemType
) -> List[Path]:
"""Find configuration files for a system type."""
config_files = []
patterns = self.detection_patterns.get(system_type, [])
for pattern in patterns:
matches = list(project_path.glob(pattern))
config_files.extend(matches)
return config_files
def _analyze_conflicts(
self, agents: List[DetectedAgent]
) -> List[Tuple[str, str, str]]:
"""Analyze potential conflicts between agents."""
conflicts = []
# Group agents by type
agents_by_type = {}
for agent in agents:
if agent.type not in agents_by_type:
agents_by_type[agent.type] = []
agents_by_type[agent.type].append(agent)
# Check for naming conflicts
for i, agent1 in enumerate(agents):
for j, agent2 in enumerate(agents[i + 1 :], i + 1):
if agent1.name == agent2.name and agent1.type != agent2.type:
conflicts.append(
(
agent1.name,
agent2.name,
f"Name conflict between {agent1.type.value} and {agent2.type.value}",
)
)
# Check for functional overlaps
functional_conflicts = {
("todo", "task", "project"): "Project management functionality",
("changelog", "version", "release"): "Version tracking functionality",
("test", "testing", "qa"): "Testing functionality",
("doc", "documentation", "readme"): "Documentation functionality",
}
for keywords, conflict_type in functional_conflicts.items():
matching_agents = []
for agent in agents:
if any(keyword in agent.name.lower() for keyword in keywords):
matching_agents.append(agent)
if len(matching_agents) > 1:
for i, agent1 in enumerate(matching_agents):
for agent2 in matching_agents[i + 1 :]:
conflicts.append(
(
agent1.name,
agent2.name,
f"Functional overlap: {conflict_type}",
)
)
return conflicts
def _generate_integration_strategy(
self, detected_systems: List[AgentSystemType], agents: List[DetectedAgent]
) -> str:
"""Generate an integration strategy based on detected systems."""
if not detected_systems:
return "clean_install"
if AgentSystemType.KAIZEN_AGENTIC in detected_systems:
return "update_existing"
if (
len(detected_systems) == 1
and detected_systems[0] == AgentSystemType.CLAUDE_CODE
):
return "claude_compatible"
if len([a for a in agents if a.type == AgentSystemType.CUSTOM_AGENTS]) > 5:
return "gradual_migration"
return "selective_integration"
def _generate_migration_recommendations(
self,
detected_systems: List[AgentSystemType],
agents: List[DetectedAgent],
conflicts: List[Tuple[str, str, str]],
) -> List[str]:
"""Generate migration recommendations."""
recommendations = []
if not detected_systems:
recommendations.append(
"Clean installation - no existing agent systems detected"
)
return recommendations
if AgentSystemType.KAIZEN_AGENTIC in detected_systems:
recommendations.append("Update existing Kaizen Agentic installation")
recommendations.append("Run 'kaizen-agentic update' to get latest agents")
if conflicts:
recommendations.append(
f"Resolve {len(conflicts)} naming/functional conflicts"
)
for agent1, agent2, reason in conflicts:
recommendations.append(f" - Conflict: {agent1} vs {agent2} ({reason})")
custom_agents = [a for a in agents if a.type == AgentSystemType.CUSTOM_AGENTS]
if custom_agents:
recommendations.append(
f"Consider migrating {len(custom_agents)} custom agents"
)
recommendations.append(
" - Review custom agents for Kaizen Agentic equivalents"
)
recommendations.append(
" - Create project-specific extensions for unique functionality"
)
if AgentSystemType.CLAUDE_CODE in detected_systems:
recommendations.append("Maintain Claude Code compatibility")
recommendations.append(" - Update CLAUDE.md with new agent references")
return recommendations