refactor: organize chaos test runner into tools directory

Move chaos_test_runner.py to tools/ directory for better project organization
and update all Makefile targets to reference the new location. This improves
the project structure by keeping specialized tools separate from main code.

Changes:
- Move chaos_test_runner.py to tools/chaos_test_runner.py
- Update Makefile chaos-* targets to use tools/ path
- Maintain all existing functionality and CLI interface

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-10-03 10:25:33 +02:00
parent 818d8346ad
commit 77db9f6231
2 changed files with 4 additions and 4 deletions

748
tools/chaos_test_runner.py Executable file
View File

@@ -0,0 +1,748 @@
#!/usr/bin/env python3
"""
Architectural Layer Independence Test Runner with Chaos Engineering
This module implements a sophisticated chaos engineering system that validates
architectural layer independence by injecting controlled failures and monitoring
the impact on dependent and independent layers.
The system systematically tests that:
1. Failures in lower layers propagate only to dependent upper layers
2. Independent layers remain unaffected by failures in unrelated layers
3. The dependency matrix matches the intended architectural design
Usage:
python chaos_test_runner.py [options]
Commands:
validate-independence Run full architectural independence validation
inject-layer-failure Inject failure into specific layer
dependency-matrix Show architectural dependency matrix
chaos-report Generate comprehensive chaos test report
"""
import os
import sys
import json
import time
import pytest
import tempfile
import traceback
import subprocess
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Set, Optional, Tuple, Any
from dataclasses import dataclass, asdict
from contextlib import contextmanager
from unittest.mock import patch, MagicMock
import logging
@dataclass
class ArchitecturalLayer:
"""Represents an architectural layer with its properties."""
name: str
level: int
description: str
test_pattern: str
dependencies: List[str] # Layers this layer depends on
modules: List[str] # Python modules in this layer
@dataclass
class ChaosInjectionResult:
"""Results from a chaos injection test."""
target_layer: str
injection_type: str
start_time: datetime
end_time: datetime
affected_layers: List[str]
expected_affected: List[str]
dependency_violations: List[str]
test_results: Dict[str, Any]
success: bool
error_message: Optional[str] = None
@dataclass
class DependencyViolation:
"""Represents a detected architectural dependency violation."""
violating_layer: str
affected_layer: str
violation_type: str
expected_independence: bool
actual_impact: bool
severity: str
description: str
class ArchitecturalChaosEngine:
"""
Chaos engineering engine for validating architectural layer independence.
This engine systematically injects failures into each architectural layer
and monitors the impact on other layers to validate the dependency matrix.
"""
def __init__(self, project_root: Optional[Path] = None):
self.project_root = project_root or Path(__file__).parent
self.test_dir = self.project_root / "tests"
self.results_dir = self.project_root / "chaos_results"
self.results_dir.mkdir(exist_ok=True)
# Set up logging
self.logger = self._setup_logging()
# Define architectural layers with their dependencies
self.layers = self._define_architectural_layers()
self.dependency_matrix = self._build_dependency_matrix()
# Chaos injection mechanisms
self.injection_strategies = {
'import_failure': self._inject_import_failure,
'function_failure': self._inject_function_failure,
'class_failure': self._inject_class_failure,
'module_unavailable': self._inject_module_unavailable,
'database_failure': self._inject_database_failure,
'network_failure': self._inject_network_failure,
'filesystem_failure': self._inject_filesystem_failure
}
def _setup_logging(self) -> logging.Logger:
"""Set up logging for chaos engineering operations."""
logger = logging.getLogger('chaos_engine')
logger.setLevel(logging.INFO)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def _define_architectural_layers(self) -> Dict[str, ArchitecturalLayer]:
"""Define the architectural layers and their relationships."""
return {
'L1_Presentation': ArchitecturalLayer(
name='L1_Presentation',
level=1,
description='CLI Interface and User Interaction',
test_pattern='test_l1_*.py',
dependencies=['L2_Application'],
modules=['cli', 'markitect.cli']
),
'L2_Application': ArchitecturalLayer(
name='L2_Application',
level=2,
description='Feature Workflows and Use Cases',
test_pattern='test_l2_*.py',
dependencies=['L3_Domain', 'L4_Service'],
modules=['application', 'tddai', 'markitect.issues']
),
'L3_Domain': ArchitecturalLayer(
name='L3_Domain',
level=3,
description='Business Logic and Domain Models',
test_pattern='test_l3_*.py',
dependencies=['L4_Service'],
modules=['domain', 'markitect.schema_generator', 'markitect.metaschema']
),
'L4_Service': ArchitecturalLayer(
name='L4_Service',
level=4,
description='Application Services and Orchestration',
test_pattern='test_l4_*.py',
dependencies=['L5_Infrastructure'],
modules=['services', 'markitect.ast_service', 'markitect.document_manager']
),
'L5_Infrastructure': ArchitecturalLayer(
name='L5_Infrastructure',
level=5,
description='Technical Infrastructure',
test_pattern='test_l5_*.py',
dependencies=['L6_Integration', 'L7_Foundation'],
modules=['infrastructure', 'markitect.cache_service', 'markitect.ast_cache']
),
'L6_Integration': ArchitecturalLayer(
name='L6_Integration',
level=6,
description='External API and System Integration',
test_pattern='test_l6_*.py',
dependencies=['L7_Foundation'],
modules=['gitea', 'markitect.issues.plugins']
),
'L7_Foundation': ArchitecturalLayer(
name='L7_Foundation',
level=7,
description='Core Components and Utilities',
test_pattern='test_l7_*.py',
dependencies=[], # Foundation depends on nothing
modules=['markitect.database', 'markitect.parser', 'markitect.frontmatter']
)
}
def _build_dependency_matrix(self) -> Dict[str, Set[str]]:
"""Build the complete dependency matrix including transitive dependencies."""
matrix = {}
for layer_name, layer in self.layers.items():
# Start with direct dependencies
all_deps = set(layer.dependencies)
# Add transitive dependencies
to_check = list(layer.dependencies)
while to_check:
dep = to_check.pop(0)
if dep in self.layers:
for transitive_dep in self.layers[dep].dependencies:
if transitive_dep not in all_deps:
all_deps.add(transitive_dep)
to_check.append(transitive_dep)
matrix[layer_name] = all_deps
return matrix
def show_dependency_matrix(self):
"""Display the architectural dependency matrix."""
print("🏗️ Architectural Layer Dependency Matrix")
print("=" * 50)
for layer_name in sorted(self.layers.keys()):
layer = self.layers[layer_name]
deps = self.dependency_matrix[layer_name]
print(f"\n{layer.name} (L{layer.level})")
print(f" Description: {layer.description}")
print(f" Direct Dependencies: {layer.dependencies}")
print(f" All Dependencies: {sorted(deps)}")
print(f" Test Pattern: {layer.test_pattern}")
@contextmanager
def _chaos_injection_context(self, layer_name: str, injection_type: str):
"""Context manager for safe chaos injection with cleanup."""
self.logger.info(f"🔥 Starting chaos injection: {injection_type} on {layer_name}")
# Store original state for restoration
original_modules = sys.modules.copy()
import builtins
original_import = builtins.__import__
try:
yield
except Exception as e:
self.logger.error(f"❌ Chaos injection failed: {e}")
raise
finally:
# Restore original state
builtins.__import__ = original_import
self._restore_system_state(original_modules, {})
self.logger.info(f"✅ Chaos injection cleanup completed for {layer_name}")
def _restore_system_state(self, original_modules: Dict, original_builtins: Dict):
"""Restore system state after chaos injection."""
# Restore modules
modules_to_remove = set(sys.modules.keys()) - set(original_modules.keys())
for module in modules_to_remove:
if module in sys.modules:
del sys.modules[module]
# Restore modified modules
for module_name, module in original_modules.items():
sys.modules[module_name] = module
def _inject_import_failure(self, layer_name: str, **kwargs):
"""Inject import failure for modules in the specified layer."""
layer = self.layers[layer_name]
failed_modules = []
# Store original import function
import builtins
original_import = builtins.__import__
def patched_import(name, *args, **kwargs):
# Check if this import should fail
if any(name.startswith(mod) for mod in layer.modules):
raise ImportError(f"Chaos injection: {name} module failure")
return original_import(name, *args, **kwargs)
# Apply the patch
builtins.__import__ = patched_import
failed_modules.extend(layer.modules)
return {'failed_modules': failed_modules}
def _inject_function_failure(self, layer_name: str, **kwargs):
"""Inject function-level failures in the specified layer."""
layer = self.layers[layer_name]
patched_functions = []
# This would patch specific functions based on the layer
# Implementation would depend on the specific layer's key functions
return {'patched_functions': patched_functions}
def _inject_class_failure(self, layer_name: str, **kwargs):
"""Inject class-level failures in the specified layer."""
layer = self.layers[layer_name]
patched_classes = []
# This would patch specific classes based on the layer
return {'patched_classes': patched_classes}
def _inject_module_unavailable(self, layer_name: str, **kwargs):
"""Make entire modules unavailable for the specified layer."""
layer = self.layers[layer_name]
for module_name in layer.modules:
if module_name in sys.modules:
# Temporarily remove the module
del sys.modules[module_name]
return {'removed_modules': layer.modules}
def _inject_database_failure(self, layer_name: str, **kwargs):
"""Inject database failures for infrastructure layer."""
if layer_name != 'L5_Infrastructure':
return {'message': 'Database failure only applicable to Infrastructure layer'}
# Patch database operations to fail
patches = []
return {'database_patches': patches}
def _inject_network_failure(self, layer_name: str, **kwargs):
"""Inject network failures for integration layer."""
if layer_name != 'L6_Integration':
return {'message': 'Network failure only applicable to Integration layer'}
# Patch network operations to fail
patches = []
return {'network_patches': patches}
def _inject_filesystem_failure(self, layer_name: str, **kwargs):
"""Inject filesystem failures."""
patches = []
return {'filesystem_patches': patches}
def run_layer_tests(self, layer_name: str) -> Dict[str, Any]:
"""Run tests for a specific layer and return results."""
layer = self.layers[layer_name]
test_files = list(self.test_dir.glob(layer.test_pattern))
if not test_files:
return {
'success': False,
'error': f'No test files found for pattern {layer.test_pattern}',
'test_count': 0,
'failures': 0
}
# Run pytest for the layer
cmd = ['python', '-m', 'pytest'] + [str(f) for f in test_files] + [
'--tb=short', '--quiet', '--disable-warnings'
]
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
cwd=self.project_root,
timeout=120 # 2 minute timeout per layer
)
# Parse pytest output for test counts
output = result.stdout + result.stderr
test_count = self._extract_test_count(output)
failures = self._extract_failure_count(output)
return {
'success': result.returncode == 0,
'test_count': test_count,
'failures': failures,
'output': output[:1000], # Truncate for storage
'return_code': result.returncode
}
except subprocess.TimeoutExpired:
return {
'success': False,
'error': 'Test execution timeout',
'test_count': 0,
'failures': 1
}
except Exception as e:
return {
'success': False,
'error': str(e),
'test_count': 0,
'failures': 1
}
def _extract_test_count(self, output: str) -> int:
"""Extract total test count from pytest output."""
import re
patterns = [
r'(\d+) passed',
r'collected (\d+) items',
r'(\d+) failed',
]
for pattern in patterns:
match = re.search(pattern, output)
if match:
return int(match.group(1))
return 0
def _extract_failure_count(self, output: str) -> int:
"""Extract failure count from pytest output."""
import re
patterns = [
r'(\d+) failed',
r'FAILED.*::.*(\d+)',
]
for pattern in patterns:
match = re.search(pattern, output)
if match:
return int(match.group(1))
return 0
def inject_chaos_and_test(self, target_layer: str, injection_type: str) -> ChaosInjectionResult:
"""Inject chaos into a layer and run all tests to measure impact."""
start_time = datetime.now()
try:
with self._chaos_injection_context(target_layer, injection_type):
# Perform the chaos injection
injection_result = self.injection_strategies[injection_type](target_layer)
# Run tests on all layers to see impact
test_results = {}
affected_layers = []
for layer_name in self.layers.keys():
self.logger.info(f"🧪 Testing {layer_name} under chaos conditions")
layer_result = self.run_layer_tests(layer_name)
test_results[layer_name] = layer_result
# Consider layer affected if tests fail
if not layer_result['success']:
affected_layers.append(layer_name)
# Determine expected affected layers
expected_affected = self._calculate_expected_impact(target_layer)
# Detect violations
violations = self._detect_dependency_violations(
target_layer, affected_layers, expected_affected
)
end_time = datetime.now()
return ChaosInjectionResult(
target_layer=target_layer,
injection_type=injection_type,
start_time=start_time,
end_time=end_time,
affected_layers=affected_layers,
expected_affected=expected_affected,
dependency_violations=[v.violating_layer for v in violations],
test_results=test_results,
success=len(violations) == 0
)
except Exception as e:
end_time = datetime.now()
return ChaosInjectionResult(
target_layer=target_layer,
injection_type=injection_type,
start_time=start_time,
end_time=end_time,
affected_layers=[],
expected_affected=[],
dependency_violations=[],
test_results={},
success=False,
error_message=str(e)
)
def _calculate_expected_impact(self, target_layer: str) -> List[str]:
"""Calculate which layers should be affected by failure in target layer."""
expected_affected = [target_layer] # Target layer should always be affected
# Find all layers that depend on the target layer
for layer_name, dependencies in self.dependency_matrix.items():
if target_layer in dependencies:
expected_affected.append(layer_name)
return expected_affected
def _detect_dependency_violations(self, target_layer: str,
actual_affected: List[str],
expected_affected: List[str]) -> List[DependencyViolation]:
"""Detect violations of architectural dependencies."""
violations = []
# Check for unexpected impacts (layers that shouldn't be affected but were)
for layer in actual_affected:
if layer not in expected_affected:
violations.append(DependencyViolation(
violating_layer=layer,
affected_layer=target_layer,
violation_type='unexpected_dependency',
expected_independence=True,
actual_impact=True,
severity='HIGH',
description=f'{layer} was affected by {target_layer} failure but should be independent'
))
# Check for missing impacts (layers that should be affected but weren't)
for layer in expected_affected:
if layer not in actual_affected and layer != target_layer:
violations.append(DependencyViolation(
violating_layer=layer,
affected_layer=target_layer,
violation_type='missing_dependency',
expected_independence=False,
actual_impact=False,
severity='MEDIUM',
description=f'{layer} should be affected by {target_layer} failure but was not'
))
return violations
def validate_architectural_independence(self) -> Dict[str, Any]:
"""Run comprehensive architectural independence validation."""
self.logger.info("🚀 Starting comprehensive architectural independence validation")
validation_results = {
'start_time': datetime.now(),
'layer_results': {},
'violations': [],
'summary': {
'total_injections': 0,
'successful_injections': 0,
'total_violations': 0,
'layers_tested': len(self.layers)
}
}
# Test each layer with different injection types
injection_types = ['import_failure', 'module_unavailable']
for layer_name in self.layers.keys():
layer_results = {}
for injection_type in injection_types:
self.logger.info(f"🔥 Testing {layer_name} with {injection_type}")
result = self.inject_chaos_and_test(layer_name, injection_type)
layer_results[injection_type] = result
validation_results['summary']['total_injections'] += 1
if result.success:
validation_results['summary']['successful_injections'] += 1
validation_results['summary']['total_violations'] += len(result.dependency_violations)
validation_results['layer_results'][layer_name] = layer_results
validation_results['end_time'] = datetime.now()
validation_results['duration'] = (
validation_results['end_time'] - validation_results['start_time']
).total_seconds()
# Save results
self._save_validation_results(validation_results)
return validation_results
def _save_validation_results(self, results: Dict[str, Any]):
"""Save validation results to file."""
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"chaos_validation_{timestamp}.json"
filepath = self.results_dir / filename
# Convert datetime objects to strings for JSON serialization
serializable_results = self._make_json_serializable(results)
with open(filepath, 'w') as f:
json.dump(serializable_results, f, indent=2)
self.logger.info(f"📄 Results saved to {filepath}")
def _make_json_serializable(self, obj):
"""Convert objects to JSON-serializable format."""
if isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(obj, dict):
return {k: self._make_json_serializable(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [self._make_json_serializable(item) for item in obj]
elif hasattr(obj, '__dict__'):
return self._make_json_serializable(asdict(obj))
else:
return obj
def generate_chaos_report(self, results_file: Optional[str] = None) -> str:
"""Generate a comprehensive chaos engineering report."""
if results_file:
with open(results_file, 'r') as f:
results = json.load(f)
else:
# Use the most recent results file
result_files = sorted(self.results_dir.glob("chaos_validation_*.json"))
if not result_files:
return "No chaos validation results found"
with open(result_files[-1], 'r') as f:
results = json.load(f)
report = self._build_text_report(results)
# Save report
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
report_file = self.results_dir / f"chaos_report_{timestamp}.md"
with open(report_file, 'w') as f:
f.write(report)
return report
def _build_text_report(self, results: Dict[str, Any]) -> str:
"""Build a formatted text report from results."""
summary = results['summary']
report = f"""# Architectural Independence Chaos Engineering Report
## Executive Summary
**Validation Date**: {results['start_time']}
**Duration**: {results.get('duration', 0):.2f} seconds
**Layers Tested**: {summary['layers_tested']}
**Total Chaos Injections**: {summary['total_injections']}
**Successful Injections**: {summary['successful_injections']}
**Total Violations Detected**: {summary['total_violations']}
**Overall Health**: {'✅ PASS' if summary['total_violations'] == 0 else '❌ VIOLATIONS DETECTED'}
## Architectural Layer Overview
"""
for layer_name, layer in self.layers.items():
dependencies = ', '.join(layer.dependencies) if layer.dependencies else 'None'
report += f"- **{layer.name}**: {layer.description} (Dependencies: {dependencies})\n"
report += "\n## Dependency Matrix\n\n"
for layer_name, deps in self.dependency_matrix.items():
deps_str = ', '.join(sorted(deps)) if deps else 'None'
report += f"- **{layer_name}**: {deps_str}\n"
report += "\n## Chaos Injection Results\n\n"
layer_results = results.get('layer_results', {})
for layer_name, injections in layer_results.items():
report += f"### {layer_name}\n\n"
for injection_type, result in injections.items():
status = '✅ PASS' if result['success'] else '❌ FAIL'
violations = len(result['dependency_violations'])
report += f"**{injection_type}**: {status}\n"
report += f"- Affected Layers: {', '.join(result['affected_layers'])}\n"
report += f"- Expected Affected: {', '.join(result['expected_affected'])}\n"
report += f"- Violations: {violations}\n"
if result.get('error_message'):
report += f"- Error: {result['error_message']}\n"
report += "\n"
report += "\n## Recommendations\n\n"
if summary['total_violations'] == 0:
report += "✅ **Excellent**: All architectural boundaries are properly maintained.\n"
report += "✅ **No violations detected**: The system demonstrates proper layer independence.\n"
else:
report += "❌ **Action Required**: Architectural violations detected that need attention.\n"
report += "🔧 **Priority**: Review and refactor components with dependency violations.\n"
report += "📊 **Monitor**: Run chaos tests regularly to prevent regression.\n"
return report
def main():
"""Main entry point for the chaos test runner."""
import argparse
parser = argparse.ArgumentParser(
description="Architectural Layer Independence Test Runner with Chaos Engineering"
)
parser.add_argument('command', choices=[
'validate-independence',
'inject-layer-failure',
'dependency-matrix',
'chaos-report'
], help='Command to execute')
parser.add_argument('--layer', type=str, help='Target layer for injection')
parser.add_argument('--injection-type', type=str, default='import_failure',
choices=['import_failure', 'module_unavailable', 'function_failure'],
help='Type of chaos injection')
parser.add_argument('--results-file', type=str, help='Results file for report generation')
parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output')
args = parser.parse_args()
# Set up logging level
if args.verbose:
logging.getLogger('chaos_engine').setLevel(logging.DEBUG)
engine = ArchitecturalChaosEngine()
if args.command == 'dependency-matrix':
engine.show_dependency_matrix()
elif args.command == 'inject-layer-failure':
if not args.layer:
print("❌ Error: --layer is required for inject-layer-failure")
sys.exit(1)
print(f"🔥 Injecting {args.injection_type} into {args.layer}")
result = engine.inject_chaos_and_test(args.layer, args.injection_type)
print(f"\n📊 Results:")
print(f"Success: {result.success}")
print(f"Affected Layers: {result.affected_layers}")
print(f"Expected Affected: {result.expected_affected}")
print(f"Violations: {result.dependency_violations}")
elif args.command == 'validate-independence':
print("🚀 Starting comprehensive architectural independence validation")
results = engine.validate_architectural_independence()
summary = results['summary']
print(f"\n📊 Validation Complete!")
print(f"Total Injections: {summary['total_injections']}")
print(f"Successful: {summary['successful_injections']}")
print(f"Violations: {summary['total_violations']}")
if summary['total_violations'] == 0:
print("✅ All architectural boundaries properly maintained!")
else:
print("❌ Architectural violations detected - review results")
elif args.command == 'chaos-report':
print("📄 Generating chaos engineering report")
report = engine.generate_chaos_report(args.results_file)
print(report)
if __name__ == "__main__":
main()