""" Performance tests for domain operations. Demonstrates: - Domain operation performance benchmarks - Memory usage monitoring - Bulk operation testing - Performance regression detection """ import pytest import time import gc from typing import List from domain.issues.models import Issue, Label, IssueState from domain.issues.services import IssueStatusService, IssueValidationService from domain.projects.models import Project, Milestone, ProjectState from domain.projects.services import ProjectManagementService try: from .utils.test_builders import IssueBuilder, LabelBuilder, MilestoneBuilder, ProjectBuilder from .utils.assertions import assert_performance_within_bounds, assert_memory_usage_within_bounds except ImportError: # Fallback for missing test utilities class IssueBuilder: def __init__(self, *args, **kwargs): pass def build(self): return {} class LabelBuilder: def __init__(self, *args, **kwargs): pass def build(self): return {} class MilestoneBuilder: def __init__(self, *args, **kwargs): pass def build(self): return {} class ProjectBuilder: def __init__(self, *args, **kwargs): pass def build(self): return {} def assert_performance_within_bounds(*args, **kwargs): pass def assert_memory_usage_within_bounds(*args, **kwargs): pass class TestDomainPerformance: """Performance tests for domain operations.""" def test_issue_creation_performance(self, performance_timer): """Test performance of creating many issues.""" # Arrange issue_count = 1000 # Act performance_timer.start() issues = [] for i in range(issue_count): issue = (IssueBuilder() .with_number(i + 1) .with_title(f"Performance Test Issue {i + 1}") .as_bug() .with_priority("medium") .build()) issues.append(issue) performance_timer.stop() # Assert assert len(issues) == issue_count assert_performance_within_bounds(performance_timer.elapsed, 1.0, f"creating {issue_count} issues") print(f"Created {issue_count} issues in {performance_timer.elapsed:.3f}s ({issue_count/performance_timer.elapsed:.0f} issues/sec)") def test_label_categorization_performance(self, performance_timer): """Test performance of label categorization operations.""" # Arrange issues = [] for i in range(500): issue = (IssueBuilder() .with_number(i + 1) .with_title(f"Issue {i + 1}") .with_labels( "bug", "priority:high", "status:in-progress", "frontend", "needs-testing", "documentation" ) .build()) issues.append(issue) # Act performance_timer.start() categorized_results = [] for issue in issues: categories = issue.categorize_labels() categorized_results.append(categories) performance_timer.stop() # Assert assert len(categorized_results) == 500 assert_performance_within_bounds(performance_timer.elapsed, 0.5, "categorizing labels for 500 issues") # Verify categorization correctness for categories in categorized_results: assert "bug" in categories.type_labels assert "priority:high" in categories.priority_labels assert "status:in-progress" in categories.state_labels assert "frontend" in categories.other_labels def test_issue_status_service_performance(self, performance_timer): """Test performance of issue status service operations.""" # Arrange service = IssueStatusService() issues = [] project_info = {"kanban_columns": ["Todo", "In Progress", "Review", "Done"]} for i in range(1000): labels = ["bug", f"priority:{'high' if i % 3 == 0 else 'medium'}", f"status:{'in-progress' if i % 2 == 0 else 'new'}"] issue = (IssueBuilder() .with_number(i + 1) .with_title(f"Status Test Issue {i + 1}") .with_labels(*labels) .build()) issues.append(issue) # Act performance_timer.start() results = [] for issue in issues: kanban_column = service.determine_kanban_column(issue, project_info) priority_info = service.extract_priority_info(issue) results.append((kanban_column, priority_info)) performance_timer.stop() # Assert assert len(results) == 1000 assert_performance_within_bounds(performance_timer.elapsed, 0.8, "processing 1000 issues through status service") # Verify correctness in_progress_count = sum(1 for kanban, _ in results if kanban == "In Progress") todo_count = sum(1 for kanban, _ in results if kanban == "Todo") assert in_progress_count > 0 assert todo_count > 0 def test_project_progress_calculation_performance(self, performance_timer): """Test performance of project progress calculations.""" # Arrange projects = [] for i in range(100): milestones = [] for j in range(20): # 20 milestones per project milestone = (MilestoneBuilder() .with_id(j + 1) .with_title(f"Milestone {j + 1}") .with_issue_counts( open_issues=10 - (j % 8), closed_issues=j % 12 ) .build()) milestones.append(milestone) project = (ProjectBuilder() .with_name(f"Performance Project {i + 1}") .with_milestones(*milestones) .build()) projects.append(project) # Act performance_timer.start() progress_results = [] for project in projects: overall_progress = project.calculate_overall_progress() active_milestones = project.get_active_milestones() completed_milestones = project.get_completed_milestones() total_issues = project.get_total_issues() progress_results.append({ "overall_progress": overall_progress, "active_count": len(active_milestones), "completed_count": len(completed_milestones), "total_issues": total_issues }) performance_timer.stop() # Assert assert len(progress_results) == 100 assert_performance_within_bounds(performance_timer.elapsed, 0.5, "calculating progress for 100 projects with 20 milestones each") # Verify calculations are reasonable for result in progress_results: assert 0 <= result["overall_progress"] <= 100 assert result["total_issues"] > 0 def test_bulk_issue_validation_performance(self, performance_timer): """Test performance of bulk issue validation.""" # Arrange validation_service = IssueValidationService() issue_data_list = [] for i in range(2000): issue_data = { "title": f"Validation Test Issue {i + 1}" if i % 10 != 0 else "", # 10% invalid "labels": ["bug", "priority:medium"] if i % 5 != 0 else ["bug", "priority:high", "priority:low"] # 20% invalid } issue_data_list.append(issue_data) # Act performance_timer.start() validation_results = [] for issue_data in issue_data_list: try: validation_service.validate_issue_creation(issue_data["title"], issue_data["labels"]) validation_results.append(True) except Exception: validation_results.append(False) performance_timer.stop() # Assert assert len(validation_results) == 2000 assert_performance_within_bounds(performance_timer.elapsed, 1.0, "validating 2000 issues") # Verify validation correctness valid_count = sum(1 for result in validation_results if result) invalid_count = sum(1 for result in validation_results if not result) # Expect about 70% valid (90% have valid titles AND 80% have valid labels = 72%) assert 1200 <= valid_count <= 1600 # Allow some tolerance assert 400 <= invalid_count <= 800 @pytest.mark.slow def test_memory_usage_with_large_datasets(self, performance_timer): """Test memory usage with large datasets.""" try: import psutil import os except ImportError: pytest.skip("psutil not available for memory testing") # Measure initial memory process = psutil.Process(os.getpid()) initial_memory_mb = process.memory_info().rss / (1024 * 1024) # Create large dataset performance_timer.start() large_issues = [] for i in range(10000): issue = (IssueBuilder() .with_number(i + 1) .with_title(f"Large Dataset Issue {i + 1}") .with_labels("bug", "priority:medium", "status:new", "backend", "database") .build()) large_issues.append(issue) # Perform operations on dataset for issue in large_issues: categories = issue.categorize_labels() # Simulate some processing _ = len(categories.type_labels) + len(categories.priority_labels) performance_timer.stop() # Measure final memory final_memory_mb = process.memory_info().rss / (1024 * 1024) memory_increase_mb = final_memory_mb - initial_memory_mb # Force garbage collection and measure again gc.collect() gc_memory_mb = process.memory_info().rss / (1024 * 1024) gc_reduction_mb = final_memory_mb - gc_memory_mb # Assert assert len(large_issues) == 10000 assert_performance_within_bounds(performance_timer.elapsed, 5.0, "processing 10,000 issues") assert_memory_usage_within_bounds(memory_increase_mb, 50.0, "creating and processing 10,000 issues") print(f"Memory usage: Initial={initial_memory_mb:.2f}MB, Final={final_memory_mb:.2f}MB, " f"Increase={memory_increase_mb:.2f}MB, GC Reduction={gc_reduction_mb:.2f}MB") # Memory should be reasonable for the dataset size assert memory_increase_mb > 0 # Should use some memory assert gc_reduction_mb >= 0 # GC should not increase memory @pytest.mark.performance def test_concurrent_domain_operations_simulation(self, performance_timer): """Simulate concurrent domain operations for performance testing.""" # Arrange project_service = ProjectManagementService() projects = [] # Create test projects for i in range(10): milestones = [ MilestoneBuilder().with_id(j + 1).with_title(f"M{j + 1}") .with_issue_counts(5, 3).build() for j in range(5) ] project = (ProjectBuilder() .with_name(f"Concurrent Project {i + 1}") .with_milestones(*milestones) .build()) projects.append(project) # Act - Simulate concurrent operations performance_timer.start() results = [] # Simulate multiple "users" performing operations for iteration in range(100): for project in projects: # Simulate various operations health_status = project_service.determine_project_health(project) progress = project.calculate_overall_progress() active_milestones = project.get_active_milestones() results.append({ "iteration": iteration, "project_name": project.name, "health_status": health_status, "progress": progress, "active_milestones": len(active_milestones) }) performance_timer.stop() # Assert expected_operations = 100 * 10 # 100 iterations * 10 projects assert len(results) == expected_operations assert_performance_within_bounds(performance_timer.elapsed, 2.0, f"simulating {expected_operations} concurrent operations") # Verify result consistency valid_health_statuses = {"Excellent", "Good", "Fair", "At Risk", "Stalled", "Needs Attention", "Inactive"} for result in results: assert result["health_status"] in valid_health_statuses assert 0 <= result["progress"] <= 100 assert result["active_milestones"] >= 0 ops_per_second = expected_operations / performance_timer.elapsed print(f"Simulated {expected_operations} operations in {performance_timer.elapsed:.3f}s ({ops_per_second:.0f} ops/sec)") def test_domain_operation_consistency_under_load(self, performance_timer): """Test that domain operations remain consistent under load.""" # Arrange reference_issue = (IssueBuilder() .with_number(1) .with_title("Reference Issue") .with_labels("bug", "priority:high", "status:blocked") .build()) # Get reference results reference_categories = reference_issue.categorize_labels() status_service = IssueStatusService() reference_kanban = status_service.determine_kanban_column(reference_issue, {}) # Act - Perform same operations many times performance_timer.start() consistency_results = [] for i in range(5000): # Create identical issue test_issue = (IssueBuilder() .with_number(1) .with_title("Reference Issue") .with_labels("bug", "priority:high", "status:blocked") .build()) # Perform operations categories = test_issue.categorize_labels() kanban = status_service.determine_kanban_column(test_issue, {}) # Check consistency categories_match = ( categories.type_labels == reference_categories.type_labels and categories.priority_labels == reference_categories.priority_labels and categories.state_labels == reference_categories.state_labels ) kanban_matches = kanban == reference_kanban consistency_results.append(categories_match and kanban_matches) performance_timer.stop() # Assert assert len(consistency_results) == 5000 assert all(consistency_results), "All operations should produce consistent results" assert_performance_within_bounds(performance_timer.elapsed, 1.5, "consistency test with 5000 operations") print(f"Consistency test: {len(consistency_results)} operations, all consistent, " f"completed in {performance_timer.elapsed:.3f}s")