Files
markitect-main/tests/test_issue_145_performance_benchmark.py
tegwick 567f01121e
Some checks failed
Test Suite / unit-tests (3.11) (push) Has been cancelled
Test Suite / unit-tests (3.12) (push) Has been cancelled
Test Suite / integration-tests (push) Has been cancelled
Test Suite / e2e-tests (push) Has been cancelled
Test Suite / performance-tests (push) Has been cancelled
Test Suite / code-quality (push) Has been cancelled
Test Suite / security-scan (push) Has been cancelled
Test Suite / test-summary (push) Has been cancelled
feat: complete Issue #146 final integration testing
Fixed all remaining test failures in test_issue_146_final_integration.py
achieving 100% test success rate (9/9 tests passing):

- Fixed performance monitoring metrics access patterns
- Resolved AssetManager constructor parameter handling
- Implemented missing CLI command methods (add_asset, list_assets, get_asset_info)
- Added cross-platform symlink creation method aliases
- Fixed asset deduplication content uniqueness issues
- Resolved production deployment asset removal workflows
- Fixed performance benchmark dict/hash type conflicts

The asset management system is now production-ready with comprehensive
integration test coverage validating all major workflows and edge cases.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-15 00:19:52 +02:00

464 lines
18 KiB
Python

"""
Test suite for performance benchmarking and monitoring.
Related to Issue #145: Phase 4 - Production Readiness and Release (Week 6)
Tests performance validation, benchmarking suite, monitoring capabilities,
and scalability testing with various workload sizes.
"""
import pytest
import time
import tempfile
import shutil
import psutil
import threading
from pathlib import Path
from unittest.mock import Mock, patch, MagicMock
from markitect.production.performance_benchmark import (
PerformanceBenchmark,
BenchmarkResult,
PerformanceMetrics,
ResourceMonitor,
LoadTester,
ScalabilityTester,
PerformanceAlert,
BenchmarkSuite
)
class TestPerformanceBenchmark:
"""Test performance benchmarking and monitoring capabilities."""
@pytest.fixture
def temp_workspace(self):
"""Create temporary workspace for testing."""
temp_dir = tempfile.mkdtemp()
yield Path(temp_dir)
shutil.rmtree(temp_dir, ignore_errors=True)
@pytest.fixture
def benchmark(self, temp_workspace):
"""Create PerformanceBenchmark instance."""
return PerformanceBenchmark(
workspace_path=temp_workspace,
enable_monitoring=True,
enable_alerts=True
)
@pytest.fixture
def sample_assets(self, temp_workspace):
"""Create sample assets for testing."""
assets = []
for i in range(100):
asset_file = temp_workspace / f"asset_{i:03d}.txt"
asset_file.write_text(f"Content for asset {i}" * 10) # ~200 bytes each
assets.append(asset_file)
return assets
def test_load_testing_with_large_asset_count(self, benchmark, temp_workspace):
"""Test load testing with 10,000+ assets across different systems."""
# Create large number of test assets
large_asset_count = 1000 # Reduced for testing, but structure for 10,000+
load_tester = LoadTester(benchmark)
result = load_tester.test_large_scale_operations(
asset_count=large_asset_count,
operations=["create", "read", "update", "delete"],
concurrent_workers=4
)
assert result.asset_count == large_asset_count
assert result.total_operations == large_asset_count * 4 # 4 operations per asset
assert result.success_rate >= 0.95 # 95% success rate minimum
assert result.average_operation_time < 0.1 # <100ms per operation
assert result.peak_memory_usage_mb is not None
assert result.peak_cpu_usage_percent is not None
def test_memory_usage_profiling_and_optimization(self, benchmark):
"""Test memory usage profiling and optimization."""
resource_monitor = ResourceMonitor()
# Start memory monitoring
monitoring_session = resource_monitor.start_memory_profiling()
# Simulate memory-intensive operations
large_data = []
for i in range(1000):
large_data.append("x" * 1024) # 1KB strings
# Get memory profile
profile_result = resource_monitor.get_memory_profile(monitoring_session)
assert profile_result.peak_memory_mb > 0
assert profile_result.memory_growth_rate is not None
assert profile_result.memory_leaks_detected is not None
assert profile_result.gc_statistics is not None
# Test memory optimization suggestions
optimization_suggestions = resource_monitor.analyze_memory_usage(profile_result)
assert optimization_suggestions is not None
assert len(optimization_suggestions) > 0
def test_cpu_usage_monitoring_during_bulk_operations(self, benchmark, sample_assets):
"""Test CPU usage monitoring during bulk operations."""
resource_monitor = ResourceMonitor()
# Start CPU monitoring
cpu_session = resource_monitor.start_cpu_monitoring()
# Simulate CPU-intensive bulk operations
def cpu_intensive_task():
"""Simulate CPU-intensive processing."""
for asset in sample_assets[:50]: # Process subset for testing
content = asset.read_text()
# Simulate processing
processed = content.upper().lower() * 10
# Run task and monitor
start_time = time.time()
cpu_intensive_task()
end_time = time.time()
cpu_result = resource_monitor.get_cpu_profile(cpu_session)
assert cpu_result.duration_seconds == pytest.approx(end_time - start_time, rel=0.1)
assert cpu_result.average_cpu_percent >= 0
assert cpu_result.peak_cpu_percent >= 0
assert cpu_result.cpu_efficiency_score is not None
def test_io_performance_optimization_for_large_files(self, benchmark, temp_workspace):
"""Test I/O performance optimization for large files."""
# Create large test file
large_file = temp_workspace / "large_test_file.bin"
large_content = b"x" * (10 * 1024 * 1024) # 10MB file
large_file.write_bytes(large_content)
io_tester = benchmark.get_io_tester()
# Test different I/O strategies
strategies = ["buffered", "unbuffered", "mmap", "async"]
results = {}
for strategy in strategies:
result = io_tester.test_file_io_performance(
file_path=large_file,
strategy=strategy,
operations=["read", "write"]
)
results[strategy] = result
assert result.strategy == strategy
assert result.read_throughput_mbps > 0
assert result.write_throughput_mbps > 0
# Verify optimization recommendations
optimization = io_tester.recommend_optimal_strategy(results)
assert optimization.recommended_strategy in strategies
assert optimization.performance_improvement_percent > 0
def test_network_performance_testing_for_shared_storage(self, benchmark):
"""Test network performance testing for shared storage."""
network_tester = benchmark.get_network_tester()
# Test network storage scenarios
storage_types = ["nfs", "smb", "s3", "local"]
for storage_type in storage_types:
with patch.object(network_tester, '_test_storage_type') as mock_test:
mock_test.return_value = BenchmarkResult(
storage_type=storage_type,
latency_ms=50 if storage_type == "local" else 150,
throughput_mbps=100 if storage_type == "local" else 50,
connection_stability=0.99
)
result = network_tester.test_network_storage_performance(storage_type)
assert result.storage_type == storage_type
assert result.latency_ms > 0
assert result.throughput_mbps > 0
assert result.connection_stability >= 0.95
def test_automated_performance_regression_testing(self, benchmark):
"""Test automated performance regression testing."""
regression_tester = benchmark.get_regression_tester()
# Establish baseline performance
baseline_results = {
"asset_creation_time": 0.05, # 50ms
"asset_read_time": 0.02, # 20ms
"bulk_operation_time": 2.0, # 2 seconds
"memory_usage_mb": 50
}
regression_tester.set_baseline(baseline_results)
# Test current performance
current_results = {
"asset_creation_time": 0.06, # Slightly slower
"asset_read_time": 0.018, # Slightly faster
"bulk_operation_time": 2.5, # Regression detected
"memory_usage_mb": 55 # Higher memory usage
}
regression_analysis = regression_tester.analyze_regression(current_results)
assert regression_analysis.has_regressions is True
assert "bulk_operation_time" in regression_analysis.regressed_metrics
assert regression_analysis.performance_change_percent < 0 # Negative = worse
def test_asset_operation_timing_benchmarks(self, benchmark, sample_assets):
"""Test asset operation timing benchmarks."""
timing_benchmark = benchmark.get_timing_benchmark()
operations_to_test = [
"create_asset",
"read_asset",
"update_asset",
"delete_asset",
"list_assets",
"search_assets"
]
benchmark_results = {}
for operation in operations_to_test:
result = timing_benchmark.benchmark_operation(
operation=operation,
test_assets=sample_assets[:10], # Use subset for testing
iterations=5
)
benchmark_results[operation] = result
assert result.operation_name == operation
assert result.average_time_ms > 0
assert result.min_time_ms > 0
assert result.max_time_ms >= result.min_time_ms
assert result.percentile_95_ms > 0
# Verify SLA compliance
sla_results = timing_benchmark.check_sla_compliance(benchmark_results)
assert sla_results.operations_within_sla >= 0.8 # 80% operations within SLA
def test_memory_usage_benchmarks_across_platforms(self, benchmark):
"""Test memory usage benchmarks across platforms."""
memory_benchmark = benchmark.get_memory_benchmark()
platform_tests = ["linux", "windows", "macos"]
for platform in platform_tests:
with patch('platform.system', return_value=platform.capitalize()):
result = memory_benchmark.benchmark_platform_memory_usage(
test_scenarios=[
"baseline",
"100_assets",
"1000_assets",
"bulk_operations"
]
)
assert result.platform == platform
assert result.baseline_memory_mb > 0
assert result.memory_scaling_factor > 0
assert result.peak_memory_mb > result.baseline_memory_mb
def test_storage_efficiency_measurements(self, benchmark, temp_workspace):
"""Test storage efficiency measurements."""
storage_benchmark = benchmark.get_storage_benchmark()
# Create test data with various patterns
test_scenarios = [
{"name": "small_files", "count": 100, "size_kb": 1},
{"name": "medium_files", "count": 50, "size_kb": 100},
{"name": "large_files", "count": 5, "size_kb": 10000}
]
efficiency_results = {}
for scenario in test_scenarios:
# Create test files
scenario_dir = temp_workspace / scenario["name"]
scenario_dir.mkdir()
for i in range(scenario["count"]):
file_path = scenario_dir / f"file_{i}.dat"
content = b"x" * (scenario["size_kb"] * 1024)
file_path.write_bytes(content)
# Measure storage efficiency
result = storage_benchmark.measure_storage_efficiency(scenario_dir)
efficiency_results[scenario["name"]] = result
assert result.total_files == scenario["count"]
assert result.total_size_mb > 0
assert result.compression_ratio >= 0
assert result.fragmentation_score >= 0
# Analyze storage patterns
analysis = storage_benchmark.analyze_storage_patterns(efficiency_results)
assert analysis.optimal_file_size_kb > 0
assert analysis.storage_recommendations is not None
def test_scalability_testing_with_various_workload_sizes(self, benchmark):
"""Test scalability testing with various workload sizes."""
scalability_tester = ScalabilityTester(benchmark)
workload_sizes = [100, 500, 1000, 5000] # Asset counts
scalability_results = []
for workload_size in workload_sizes:
result = scalability_tester.test_workload_scalability(
asset_count=workload_size,
concurrent_users=min(workload_size // 100, 10), # Scale users with workload
test_duration_seconds=30
)
scalability_results.append(result)
assert result.workload_size == workload_size
assert result.throughput_ops_per_second > 0
assert result.average_response_time_ms > 0
assert result.error_rate <= 0.05 # <5% error rate
# Analyze scalability patterns
scalability_analysis = scalability_tester.analyze_scalability_curve(scalability_results)
assert scalability_analysis.linear_scalability_score >= 0
assert scalability_analysis.breaking_point_workload > 0
assert scalability_analysis.scalability_bottlenecks is not None
def test_real_time_performance_metrics_collection(self, benchmark):
"""Test real-time performance metrics collection."""
metrics_collector = benchmark.get_metrics_collector()
# Start real-time collection
collection_session = metrics_collector.start_real_time_collection(
metrics=["cpu", "memory", "disk_io", "network_io"],
collection_interval_ms=100
)
# Simulate activity for monitoring
time.sleep(1.0) # Collect for 1 second
# Stop collection and get results
metrics_data = metrics_collector.stop_collection(collection_session)
assert metrics_data.duration_seconds >= 0.9 # Approximately 1 second
assert len(metrics_data.cpu_samples) > 5 # Multiple samples
assert len(metrics_data.memory_samples) > 5
assert metrics_data.average_cpu_percent >= 0
assert metrics_data.average_memory_mb > 0
def test_performance_alerting_for_degraded_operations(self, benchmark):
"""Test performance alerting for degraded operations."""
alert_manager = benchmark.get_alert_manager()
# Configure performance thresholds
thresholds = {
"response_time_ms": 100,
"error_rate_percent": 5,
"memory_usage_mb": 200,
"cpu_usage_percent": 80
}
alert_manager.configure_thresholds(thresholds)
# Simulate degraded performance scenarios
degraded_scenarios = [
{"metric": "response_time_ms", "value": 150, "should_alert": True},
{"metric": "error_rate_percent", "value": 8, "should_alert": True},
{"metric": "memory_usage_mb", "value": 180, "should_alert": False},
{"metric": "cpu_usage_percent", "value": 85, "should_alert": True}
]
for scenario in degraded_scenarios:
alert_result = alert_manager.check_metric(
metric_name=scenario["metric"],
current_value=scenario["value"]
)
if scenario["should_alert"]:
assert alert_result.alert_triggered is True
assert alert_result.severity in ["WARNING", "CRITICAL"]
assert alert_result.alert_message is not None
else:
assert alert_result.alert_triggered is False
def test_resource_usage_tracking_and_reporting(self, benchmark):
"""Test resource usage tracking and reporting."""
resource_tracker = benchmark.get_resource_tracker()
# Start tracking session
tracking_session = resource_tracker.start_tracking(
track_processes=True,
track_file_handles=True,
track_network_connections=True
)
# Simulate resource usage
temp_files = []
for i in range(10):
temp_file = tempfile.NamedTemporaryFile(delete=False)
temp_files.append(temp_file)
# Generate tracking report
usage_report = resource_tracker.generate_report(tracking_session)
assert usage_report.peak_memory_mb > 0
assert usage_report.peak_cpu_percent >= 0
assert usage_report.file_handles_opened >= 10
assert usage_report.resource_efficiency_score is not None
# Cleanup
for temp_file in temp_files:
temp_file.close()
os.unlink(temp_file.name)
def test_performance_tuning_recommendations(self, benchmark):
"""Test performance tuning recommendations."""
tuning_advisor = benchmark.get_tuning_advisor()
# Provide system characteristics
system_profile = {
"cpu_cores": 4,
"memory_gb": 8,
"storage_type": "SSD",
"network_bandwidth_mbps": 100,
"typical_workload_size": 1000
}
# Get tuning recommendations
recommendations = tuning_advisor.generate_recommendations(
system_profile=system_profile,
performance_history=benchmark.get_historical_performance()
)
assert recommendations.configuration_changes is not None
assert recommendations.memory_settings is not None
assert recommendations.io_settings is not None
assert recommendations.expected_improvement_percent > 0
def test_bottleneck_identification_and_resolution(self, benchmark):
"""Test bottleneck identification and resolution."""
bottleneck_analyzer = benchmark.get_bottleneck_analyzer()
# Simulate various bottleneck scenarios
performance_data = {
"cpu_utilization": 95, # High CPU - potential bottleneck
"memory_utilization": 60, # Normal memory
"disk_io_wait": 15, # High I/O wait - potential bottleneck
"network_latency": 200 # High latency - potential bottleneck
}
analysis_result = bottleneck_analyzer.identify_bottlenecks(performance_data)
assert analysis_result.bottlenecks_found > 0
assert "CPU" in analysis_result.bottleneck_types
assert "DISK_IO" in analysis_result.bottleneck_types
assert analysis_result.resolution_strategies is not None
assert analysis_result.priority_order is not None