#!/usr/bin/env python3 """ Requirements Engineering and Incremental Development Planning Toolkit Practical tools to prevent interface compatibility issues and mock object mismatches. Designed to solve the problems encountered during Issue #59 development. """ import ast import inspect import importlib from pathlib import Path from typing import Dict, List, Any, Set, Optional, Type from dataclasses import dataclass from datetime import datetime import json import sys @dataclass class DomainModelInfo: """Information about a domain model.""" name: str module: str attributes: Dict[str, str] methods: List[str] base_classes: List[str] is_abstract: bool file_path: str @dataclass class InterfaceInfo: """Information about an interface or abstract base class.""" name: str module: str abstract_methods: List[str] methods: List[str] base_classes: List[str] file_path: str @dataclass class MockValidationResult: """Result of mock validation.""" is_valid: bool missing_attributes: List[str] extra_attributes: List[str] type_mismatches: Dict[str, str] errors: List[str] class DomainModelAnalyzer: """Analyzes existing domain models to understand structure.""" def __init__(self, project_root: Path): self.project_root = project_root self.domain_models: Dict[str, DomainModelInfo] = {} self.interfaces: Dict[str, InterfaceInfo] = {} def analyze_project(self) -> Dict[str, Any]: """Analyze entire project for domain models and interfaces.""" self._scan_domain_models() self._scan_interfaces() return { "domain_models": self.domain_models, "interfaces": self.interfaces, "analysis_timestamp": datetime.now().isoformat(), "summary": { "total_domain_models": len(self.domain_models), "total_interfaces": len(self.interfaces), "abstract_classes": len([i for i in self.interfaces.values() if i.abstract_methods]) } } def _scan_domain_models(self): """Scan for domain model classes.""" domain_dirs = self.project_root.glob("domain/*/models.py") for model_file in domain_dirs: self._analyze_python_file(model_file, is_domain_model=True) def _scan_interfaces(self): """Scan for interface and abstract base classes.""" # Look in infrastructure, domain, and markitect modules search_patterns = [ "infrastructure/**/interfaces.py", "infrastructure/**/base.py", "domain/**/repositories.py", "markitect/**/base.py" ] for pattern in search_patterns: for file_path in self.project_root.glob(pattern): self._analyze_python_file(file_path, is_domain_model=False) def _analyze_python_file(self, file_path: Path, is_domain_model: bool = False): """Analyze a Python file for classes and interfaces.""" try: with open(file_path) as f: tree = ast.parse(f.read()) for node in ast.walk(tree): if isinstance(node, ast.ClassDef): if is_domain_model: self._process_domain_model(node, file_path) else: self._process_interface(node, file_path) except Exception as e: print(f"Error analyzing {file_path}: {e}") def _process_domain_model(self, class_node: ast.ClassDef, file_path: Path): """Process a domain model class.""" attributes = {} methods = [] base_classes = [] # Extract base classes for base in class_node.bases: if isinstance(base, ast.Name): base_classes.append(base.id) # Extract attributes and methods for item in class_node.body: if isinstance(item, ast.AnnAssign) and isinstance(item.target, ast.Name): # Type annotated attribute attr_name = item.target.id if isinstance(item.annotation, ast.Name): attr_type = item.annotation.id else: attr_type = ast.unparse(item.annotation) attributes[attr_name] = attr_type elif isinstance(item, ast.FunctionDef): methods.append(item.name) module_path = str(file_path.relative_to(self.project_root)).replace('/', '.').replace('.py', '') model_info = DomainModelInfo( name=class_node.name, module=module_path, attributes=attributes, methods=methods, base_classes=base_classes, is_abstract=any('ABC' in base for base in base_classes), file_path=str(file_path) ) self.domain_models[class_node.name] = model_info def _process_interface(self, class_node: ast.ClassDef, file_path: Path): """Process an interface or abstract base class.""" abstract_methods = [] methods = [] base_classes = [] # Extract base classes for base in class_node.bases: if isinstance(base, ast.Name): base_classes.append(base.id) # Extract methods for item in class_node.body: if isinstance(item, ast.FunctionDef): methods.append(item.name) # Check for abstract methods for decorator in item.decorator_list: if isinstance(decorator, ast.Name) and decorator.id == 'abstractmethod': abstract_methods.append(item.name) module_path = str(file_path.relative_to(self.project_root)).replace('/', '.').replace('.py', '') interface_info = InterfaceInfo( name=class_node.name, module=module_path, abstract_methods=abstract_methods, methods=methods, base_classes=base_classes, file_path=str(file_path) ) self.interfaces[class_node.name] = interface_info class MockValidator: """Validates mock objects against real domain models.""" def __init__(self, domain_analyzer: DomainModelAnalyzer): self.domain_analyzer = domain_analyzer def validate_mock_against_model(self, mock_obj: Any, model_name: str) -> MockValidationResult: """Validate a mock object against a domain model.""" if model_name not in self.domain_analyzer.domain_models: return MockValidationResult( is_valid=False, missing_attributes=[], extra_attributes=[], type_mismatches={}, errors=[f"Domain model '{model_name}' not found"] ) model_info = self.domain_analyzer.domain_models[model_name] return self._validate_mock_attributes(mock_obj, model_info) def _validate_mock_attributes(self, mock_obj: Any, model_info: DomainModelInfo) -> MockValidationResult: """Validate mock object attributes against model.""" mock_attrs = set(attr for attr in dir(mock_obj) if not attr.startswith('_')) model_attrs = set(model_info.attributes.keys()) | set(model_info.methods) missing_attrs = model_attrs - mock_attrs extra_attrs = mock_attrs - model_attrs # Check for critical missing attributes (non-methods) critical_missing = [attr for attr in missing_attrs if attr in model_info.attributes] is_valid = len(critical_missing) == 0 errors = [] if critical_missing: errors.append(f"Missing critical attributes: {critical_missing}") return MockValidationResult( is_valid=is_valid, missing_attributes=list(missing_attrs), extra_attributes=list(extra_attrs), type_mismatches={}, # Would need runtime inspection errors=errors ) class InterfaceCompatibilityChecker: """Checks interface compatibility between layers.""" def __init__(self, domain_analyzer: DomainModelAnalyzer): self.domain_analyzer = domain_analyzer def check_plugin_interface_compatibility(self, plugin_interface: str, existing_repo: str) -> Dict[str, Any]: """Check if a plugin interface is compatible with existing repository.""" if plugin_interface not in self.domain_analyzer.interfaces: return {"compatible": False, "error": f"Plugin interface '{plugin_interface}' not found"} if existing_repo not in self.domain_analyzer.interfaces: return {"compatible": False, "error": f"Existing repository '{existing_repo}' not found"} plugin_info = self.domain_analyzer.interfaces[plugin_interface] repo_info = self.domain_analyzer.interfaces[existing_repo] # Compare abstract methods plugin_methods = set(plugin_info.abstract_methods) repo_methods = set(repo_info.methods) missing_methods = plugin_methods - repo_methods compatible = len(missing_methods) == 0 return { "compatible": compatible, "plugin_methods": list(plugin_methods), "repo_methods": list(repo_methods), "missing_methods": list(missing_methods), "compatibility_score": len(plugin_methods & repo_methods) / max(len(plugin_methods), 1) } class RequirementsEngineeringAgent: """Main agent for requirements engineering and incremental development planning.""" def __init__(self, project_root: Path): self.project_root = project_root self.domain_analyzer = DomainModelAnalyzer(project_root) self.mock_validator = MockValidator(self.domain_analyzer) self.compatibility_checker = InterfaceCompatibilityChecker(self.domain_analyzer) self.analysis_cache = {} def analyze_project_foundations(self) -> Dict[str, Any]: """Analyze project foundations before starting new development.""" print("๐Ÿ” Analyzing project foundations...") analysis = self.domain_analyzer.analyze_project() self.analysis_cache = analysis return { "foundation_analysis": analysis, "recommendations": self._generate_foundation_recommendations(analysis), "risks": self._identify_foundation_risks(analysis) } def plan_interface_evolution(self, interface_name: str, planned_changes: Dict[str, Any]) -> Dict[str, Any]: """Plan interface evolution with backward compatibility.""" print(f"๐Ÿ“‹ Planning evolution for interface: {interface_name}") if interface_name not in self.domain_analyzer.interfaces: return {"error": f"Interface '{interface_name}' not found"} interface_info = self.domain_analyzer.interfaces[interface_name] # Analyze impact of planned changes impact_analysis = self._analyze_change_impact(interface_info, planned_changes) return { "interface": interface_name, "current_state": interface_info, "planned_changes": planned_changes, "impact_analysis": impact_analysis, "migration_plan": self._generate_migration_plan(interface_info, planned_changes), "compatibility_strategy": self._plan_compatibility_strategy(planned_changes) } def validate_test_mocks(self, test_file: Path) -> Dict[str, Any]: """Validate mock objects in test files against actual domain models.""" print(f"๐Ÿงช Validating mocks in: {test_file}") # This would require AST parsing of test files to extract mock usage # For now, return structure for manual implementation return { "test_file": str(test_file), "validation_status": "manual_check_required", "recommendations": [ "Use Mock(spec=ActualClass) for all domain model mocks", "Verify all mock attributes match actual domain model attributes", "Use actual enums instead of string representations", "Include all required attributes with proper types" ] } def generate_development_checklist(self, feature_name: str, requirements: Dict[str, Any]) -> List[Dict[str, Any]]: """Generate a comprehensive development checklist.""" print(f"๐Ÿ“ Generating development checklist for: {feature_name}") checklist = [ { "phase": "Foundation Analysis", "tasks": [ "Analyze existing domain models", "Map current interface contracts", "Understand dependency graph", "Verify architectural foundations" ] }, { "phase": "Interface Contract Definition", "tasks": [ "Define new interface contracts", "Verify compatibility with existing interfaces", "Plan interface evolution strategy", "Document contract specifications" ] }, { "phase": "Test Architecture Design", "tasks": [ "Design test structure matching application architecture", "Create spec-compliant mock objects", "Plan integration test strategy", "Define validation checkpoints" ] }, { "phase": "Incremental Implementation", "tasks": [ "Implement base interfaces", "Add concrete implementations", "Validate at each checkpoint", "Maintain backward compatibility" ] }, { "phase": "Integration Validation", "tasks": [ "Test interface compatibility", "Validate mock object alignment", "Run integration tests", "Verify end-to-end workflows" ] } ] return checklist def _generate_foundation_recommendations(self, analysis: Dict[str, Any]) -> List[str]: """Generate recommendations based on foundation analysis.""" recommendations = [] if analysis["summary"]["total_domain_models"] == 0: recommendations.append("โš ๏ธ No domain models found - start with domain model design") if analysis["summary"]["total_interfaces"] == 0: recommendations.append("โš ๏ธ No interfaces found - define interface contracts first") if analysis["summary"]["abstract_classes"] == 0: recommendations.append("๐Ÿ’ก Consider adding abstract base classes for plugin system") recommendations.append("โœ… Run mock validation before writing tests") recommendations.append("โœ… Verify interface compatibility before implementation") return recommendations def _identify_foundation_risks(self, analysis: Dict[str, Any]) -> List[str]: """Identify risks in current foundation.""" risks = [] if analysis["summary"]["total_domain_models"] > 20: risks.append("โš ๏ธ Large number of domain models - complexity risk") if analysis["summary"]["total_interfaces"] < analysis["summary"]["total_domain_models"] * 0.3: risks.append("โš ๏ธ Few interfaces relative to domain models - tight coupling risk") return risks def _analyze_change_impact(self, interface_info: InterfaceInfo, changes: Dict[str, Any]) -> Dict[str, Any]: """Analyze impact of planned interface changes.""" return { "breaking_changes": changes.get("breaking_changes", []), "affected_implementations": [], # Would need to scan for implementations "migration_complexity": "medium", # Would analyze based on change type "estimated_effort": "2-4 hours" # Would calculate based on impact } def _generate_migration_plan(self, interface_info: InterfaceInfo, changes: Dict[str, Any]) -> List[str]: """Generate step-by-step migration plan.""" return [ "1. Create new interface with backward compatibility", "2. Update existing implementations incrementally", "3. Add deprecation warnings to old interface", "4. Update tests to use new interface", "5. Remove deprecated interface after transition period" ] def _plan_compatibility_strategy(self, changes: Dict[str, Any]) -> Dict[str, Any]: """Plan backward compatibility strategy.""" return { "strategy": "adapter_pattern", "transition_period": "2 development cycles", "deprecation_warnings": True, "migration_guide": "Will be generated" } def main(): """CLI interface for the requirements engineering agent.""" import argparse parser = argparse.ArgumentParser(description="Requirements Engineering and Development Planning Agent") parser.add_argument("command", choices=["analyze", "validate-mocks", "plan-interface", "checklist"]) parser.add_argument("--project-root", default=".", help="Project root directory") parser.add_argument("--interface", help="Interface name for planning") parser.add_argument("--feature", help="Feature name for checklist") parser.add_argument("--test-file", help="Test file to validate") args = parser.parse_args() project_root = Path(args.project_root) agent = RequirementsEngineeringAgent(project_root) if args.command == "analyze": result = agent.analyze_project_foundations() print(json.dumps(result, indent=2, default=str)) elif args.command == "validate-mocks" and args.test_file: result = agent.validate_test_mocks(Path(args.test_file)) print(json.dumps(result, indent=2)) elif args.command == "plan-interface" and args.interface: changes = {"new_methods": ["example_method"], "breaking_changes": []} result = agent.plan_interface_evolution(args.interface, changes) print(json.dumps(result, indent=2, default=str)) elif args.command == "checklist" and args.feature: result = agent.generate_development_checklist(args.feature, {}) print(json.dumps(result, indent=2)) else: parser.print_help() if __name__ == "__main__": main()