diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 00000000..a3b6b0b3 --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,255 @@ +name: Test Suite + +on: + push: + branches: [ main, develop ] + pull_request: + branches: [ main ] + +jobs: + unit-tests: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.11", "3.12"] + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + + - name: Cache dependencies + uses: actions/cache@v3 + with: + path: ~/.cache/pip + key: ${{ runner.os }}-pip-${{ hashFiles('**/requirements*.txt') }} + restore-keys: | + ${{ runner.os }}-pip- + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + pip install -r tests/requirements-test.txt + + - name: Run unit tests + run: | + pytest tests/unit/ -v \ + --cov=domain \ + --cov=application \ + --cov=infrastructure \ + --cov-report=xml \ + --cov-report=term-missing \ + --cov-fail-under=85 \ + --tb=short \ + --durations=10 + + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v3 + with: + file: ./coverage.xml + flags: unit-tests + name: codecov-umbrella + + integration-tests: + runs-on: ubuntu-latest + needs: unit-tests + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: "3.12" + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + pip install -r tests/requirements-test.txt + + - name: Run integration tests + run: | + pytest tests/integration/ -v \ + --tb=short \ + --maxfail=5 \ + --timeout=300 + + - name: Archive test artifacts + if: failure() + uses: actions/upload-artifact@v3 + with: + name: integration-test-artifacts + path: | + tests/integration/logs/ + tests/integration/outputs/ + + e2e-tests: + runs-on: ubuntu-latest + needs: [unit-tests, integration-tests] + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: "3.12" + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + pip install -r tests/requirements-test.txt + + - name: Run end-to-end tests (non-slow) + run: | + pytest tests/e2e/ -v \ + -m "not slow" \ + --tb=short \ + --maxfail=3 \ + --timeout=600 + + - name: Run smoke tests + run: | + pytest tests/ -v \ + -m "smoke" \ + --tb=short \ + --timeout=120 + + performance-tests: + runs-on: ubuntu-latest + needs: [unit-tests, integration-tests] + if: github.event_name == 'push' && github.ref == 'refs/heads/main' + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: "3.12" + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + pip install -r tests/requirements-test.txt + + - name: Run performance tests + run: | + pytest tests/e2e/performance/ -v \ + -m "performance" \ + --tb=short \ + --timeout=1200 + + - name: Archive performance results + uses: actions/upload-artifact@v3 + with: + name: performance-results + path: | + performance-results.json + performance-charts/ + + code-quality: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: "3.12" + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r tests/requirements-test.txt + + - name: Run flake8 + run: | + flake8 domain/ application/ infrastructure/ --count --select=E9,F63,F7,F82 --show-source --statistics + flake8 domain/ application/ infrastructure/ --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics + + - name: Run mypy + run: | + mypy domain/ application/ infrastructure/ --ignore-missing-imports + + - name: Check code formatting with black + run: | + black --check domain/ application/ infrastructure/ + + - name: Check import sorting with isort + run: | + isort --check-only domain/ application/ infrastructure/ + + security-scan: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: "3.12" + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install safety bandit + + - name: Run safety check + run: | + pip freeze | safety check --json + + - name: Run bandit security linter + run: | + bandit -r domain/ application/ infrastructure/ -f json -o bandit-results.json + + - name: Upload security scan results + uses: actions/upload-artifact@v3 + with: + name: security-scan-results + path: | + bandit-results.json + + test-summary: + runs-on: ubuntu-latest + needs: [unit-tests, integration-tests, e2e-tests, code-quality, security-scan] + if: always() + + steps: + - name: Check test results + run: | + echo "Unit Tests: ${{ needs.unit-tests.result }}" + echo "Integration Tests: ${{ needs.integration-tests.result }}" + echo "E2E Tests: ${{ needs.e2e-tests.result }}" + echo "Code Quality: ${{ needs.code-quality.result }}" + echo "Security Scan: ${{ needs.security-scan.result }}" + + if [[ "${{ needs.unit-tests.result }}" == "failure" || + "${{ needs.integration-tests.result }}" == "failure" || + "${{ needs.e2e-tests.result }}" == "failure" ]]; then + echo "❌ Test suite failed" + exit 1 + else + echo "✅ Test suite passed" + fi + + - name: Update status badge + if: github.ref == 'refs/heads/main' + run: | + # This would update a status badge in the README + echo "Test suite status: PASSING" > test-status.txt + + - name: Upload test summary + uses: actions/upload-artifact@v3 + with: + name: test-summary + path: test-status.txt \ No newline at end of file diff --git a/domain/issues/exceptions.py b/domain/issues/exceptions.py index 6ccffcb9..f8a7f497 100644 --- a/domain/issues/exceptions.py +++ b/domain/issues/exceptions.py @@ -26,4 +26,19 @@ class IssueStateError(IssueDomainError): def __init__(self, message: str, current_state: str, attempted_state: str): super().__init__(message) self.current_state = current_state - self.attempted_state = attempted_state \ No newline at end of file + self.attempted_state = attempted_state + + +class IssueNotFoundError(IssueDomainError): + """Exception raised when an issue cannot be found.""" + + def __init__(self, message: str, issue_number: int = None): + super().__init__(message, issue_number) + + +class IssueLabelError(IssueDomainError): + """Exception raised when there are label-related issues.""" + + def __init__(self, message: str, label_name: str = None): + super().__init__(message) + self.label_name = label_name \ No newline at end of file diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 00000000..4d68778d --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,296 @@ +""" +Global test configuration and fixtures for MarkiTect project. + +Provides shared fixtures, utilities, and configuration for all test types. +""" + +import pytest +import tempfile +import shutil +import asyncio +from pathlib import Path +from unittest.mock import Mock, AsyncMock +from typing import Generator, Dict, Any +import sqlite3 +import os + + +@pytest.fixture(scope="session") +def event_loop(): + """Create an instance of the default event loop for the test session.""" + loop = asyncio.new_event_loop() + yield loop + loop.close() + + +@pytest.fixture(scope="session") +def test_workspace() -> Generator[Path, None, None]: + """Create isolated test workspace for file operations.""" + temp_dir = tempfile.mkdtemp(prefix="markitect_test_") + workspace_path = Path(temp_dir) + + # Create subdirectories + (workspace_path / "documents").mkdir() + (workspace_path / "cache").mkdir() + (workspace_path / "workspaces").mkdir() + + yield workspace_path + + # Cleanup + shutil.rmtree(temp_dir, ignore_errors=True) + + +@pytest.fixture +def test_database_path(test_workspace) -> Path: + """Provide path for test database.""" + return test_workspace / "test.db" + + +@pytest.fixture +def mock_database(): + """Provide mocked database for testing.""" + mock_db = Mock() + mock_cursor = Mock() + mock_db.cursor.return_value = mock_cursor + mock_db.execute.return_value = mock_cursor + mock_cursor.fetchone.return_value = None + mock_cursor.fetchall.return_value = [] + mock_cursor.lastrowid = 1 + return mock_db + + +@pytest.fixture +def mock_http_client(): + """Provide mocked HTTP client for API tests.""" + mock_client = Mock() + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = {"status": "success"} + mock_response.text = '{"status": "success"}' + mock_client.get.return_value = mock_response + mock_client.post.return_value = mock_response + mock_client.put.return_value = mock_response + mock_client.delete.return_value = mock_response + return mock_client + + +@pytest.fixture +def mock_async_http_client(): + """Provide mocked async HTTP client for API tests.""" + mock_client = AsyncMock() + mock_response = AsyncMock() + mock_response.status = 200 + mock_response.json = AsyncMock(return_value={"status": "success"}) + mock_response.text = AsyncMock(return_value='{"status": "success"}') + mock_client.get.return_value = mock_response + mock_client.post.return_value = mock_response + mock_client.put.return_value = mock_response + mock_client.delete.return_value = mock_response + return mock_client + + +@pytest.fixture +def test_config(test_workspace) -> Dict[str, Any]: + """Provide test configuration dictionary.""" + return { + "workspace_dir": str(test_workspace / "workspaces"), + "database_path": str(test_workspace / "test.db"), + "cache_dir": str(test_workspace / "cache"), + "gitea_url": "http://test-gitea.com", + "gitea_token": "test-token", + "repo_owner": "test", + "repo_name": "repo", + "log_level": "DEBUG" + } + + +@pytest.fixture +def clean_environment(): + """Provide clean environment variables for testing.""" + original_env = dict(os.environ) + + # Clear relevant environment variables + test_env_vars = [ + "MARKITECT_WORKSPACE_DIR", + "MARKITECT_GITEA_URL", + "MARKITECT_GITEA_TOKEN", + "MARKITECT_REPO_OWNER", + "MARKITECT_REPO_NAME" + ] + + for var in test_env_vars: + os.environ.pop(var, None) + + yield + + # Restore original environment + os.environ.clear() + os.environ.update(original_env) + + +@pytest.fixture +def isolated_environment(test_workspace, clean_environment): + """Set up isolated environment for CLI testing.""" + env = { + "MARKITECT_WORKSPACE_DIR": str(test_workspace / "workspaces"), + "MARKITECT_GITEA_URL": "http://test-gitea.com", + "MARKITECT_GITEA_TOKEN": "test-token", + "MARKITECT_REPO_OWNER": "test", + "MARKITECT_REPO_NAME": "repo", + "PYTHONPATH": "." + } + + # Update current process environment + for key, value in env.items(): + os.environ[key] = value + + yield env + + +@pytest.fixture +def sample_markdown_content(): + """Provide sample markdown content for testing.""" + return """--- +title: Test Document +author: Test Author +tags: [test, sample] +--- + +# Test Document + +This is a test document with **bold** and *italic* text. + +## Section 1 + +- Item 1 +- Item 2 +- Item 3 + +## Section 2 + +Here's a code block: + +```python +def hello_world(): + print("Hello, World!") +``` + +And a link: [Test Link](https://example.com) +""" + + +@pytest.fixture +def sample_issue_data(): + """Provide sample issue data for testing.""" + return { + "number": 123, + "title": "Test Issue", + "body": "This is a test issue description", + "state": "open", + "labels": [ + {"name": "bug"}, + {"name": "priority:high"}, + {"name": "status:in-progress"} + ], + "milestone": { + "id": 1, + "title": "Version 1.0", + "description": "First release" + }, + "created_at": "2025-01-01T00:00:00Z", + "updated_at": "2025-01-01T12:00:00Z" + } + + +@pytest.fixture +def sample_project_data(): + """Provide sample project data for testing.""" + return { + "name": "Test Project", + "description": "A test project for testing", + "state": "active", + "milestones": [ + { + "id": 1, + "title": "Version 1.0", + "description": "First release", + "due_date": "2025-12-31T23:59:59Z", + "state": "open", + "open_issues": 5, + "closed_issues": 3 + } + ], + "kanban_columns": ["Todo", "In Progress", "Review", "Done"], + "created_at": "2024-01-01T00:00:00Z", + "updated_at": "2025-01-01T00:00:00Z" + } + + +# Performance testing fixtures +@pytest.fixture +def performance_timer(): + """Timer fixture for performance testing.""" + import time + + class Timer: + def __init__(self): + self.start_time = None + self.end_time = None + + def start(self): + self.start_time = time.time() + + def stop(self): + self.end_time = time.time() + + @property + def elapsed(self) -> float: + if self.start_time is None: + raise ValueError("Timer not started") + if self.end_time is None: + return time.time() - self.start_time + return self.end_time - self.start_time + + return Timer() + + +# Async test helpers +@pytest.fixture +def async_test_timeout(): + """Default timeout for async tests.""" + return 30.0 # 30 seconds + + +# Test markers configuration +def pytest_configure(config): + """Configure pytest markers.""" + config.addinivalue_line( + "markers", "slow: marks tests as slow (deselect with '-m \"not slow\"')" + ) + config.addinivalue_line( + "markers", "integration: marks tests as integration tests" + ) + config.addinivalue_line( + "markers", "e2e: marks tests as end-to-end tests" + ) + config.addinivalue_line( + "markers", "performance: marks tests as performance tests" + ) + config.addinivalue_line( + "markers", "unit: marks tests as unit tests" + ) + + +# Collection hooks +def pytest_collection_modifyitems(config, items): + """Modify test collection to add markers based on test location.""" + for item in items: + # Add markers based on test file location + if "unit" in str(item.fspath): + item.add_marker(pytest.mark.unit) + elif "integration" in str(item.fspath): + item.add_marker(pytest.mark.integration) + elif "e2e" in str(item.fspath): + item.add_marker(pytest.mark.e2e) + elif "performance" in str(item.fspath): + item.add_marker(pytest.mark.performance) \ No newline at end of file diff --git a/tests/e2e/__init__.py b/tests/e2e/__init__.py new file mode 100644 index 00000000..0e01bbb3 --- /dev/null +++ b/tests/e2e/__init__.py @@ -0,0 +1,3 @@ +""" +End-to-end tests for MarkiTect workflows. +""" \ No newline at end of file diff --git a/tests/e2e/cli/__init__.py b/tests/e2e/cli/__init__.py new file mode 100644 index 00000000..594b5256 --- /dev/null +++ b/tests/e2e/cli/__init__.py @@ -0,0 +1,3 @@ +""" +End-to-end CLI tests. +""" \ No newline at end of file diff --git a/tests/e2e/cli/test_issue_commands_e2e.py b/tests/e2e/cli/test_issue_commands_e2e.py new file mode 100644 index 00000000..6102d22f --- /dev/null +++ b/tests/e2e/cli/test_issue_commands_e2e.py @@ -0,0 +1,348 @@ +""" +End-to-end tests for issue management CLI commands. + +Demonstrates: +- CLI command testing with real processes +- Environment isolation +- Workflow validation +- Output verification +""" + +import pytest +import subprocess +import json +from pathlib import Path +import time +import os + +from tests.utils.assertions import assert_file_exists, assert_directory_exists, assert_file_contains + + +@pytest.mark.e2e +class TestIssueCommandsE2E: + """End-to-end tests for issue management CLI commands.""" + + def test_show_issue_command_basic(self, isolated_environment): + """Test basic issue show command.""" + # Act + result = subprocess.run( + ["python", "tddai_cli.py", "show-issue", "23"], + env=isolated_environment, + capture_output=True, + text=True, + cwd=Path.cwd() + ) + + # Assert + assert result.returncode == 0, f"Command failed with stderr: {result.stderr}" + assert "Issue #23" in result.stdout or "issue 23" in result.stdout.lower() + + def test_show_issue_command_with_invalid_number(self, isolated_environment): + """Test show issue command with invalid issue number.""" + # Act + result = subprocess.run( + ["python", "tddai_cli.py", "show-issue", "99999"], + env=isolated_environment, + capture_output=True, + text=True, + cwd=Path.cwd() + ) + + # Assert - Should handle gracefully + # Note: Depending on implementation, this might return 0 or 1 + assert "not found" in result.stdout.lower() or "error" in result.stderr.lower() + + def test_workspace_status_command(self, isolated_environment): + """Test workspace status command.""" + # Act + result = subprocess.run( + ["python", "tddai_cli.py", "workspace-status"], + env=isolated_environment, + capture_output=True, + text=True, + cwd=Path.cwd() + ) + + # Assert + assert result.returncode == 0 + # Should show workspace information + assert "workspace" in result.stdout.lower() or "status" in result.stdout.lower() + + @pytest.mark.slow + def test_complete_issue_workflow(self, isolated_environment, test_workspace): + """Test complete issue workflow from start to finish.""" + workspace_dir = Path(isolated_environment["MARKITECT_WORKSPACE_DIR"]) + workspace_dir.mkdir(exist_ok=True) + + # Step 1: Check initial workspace status + result = subprocess.run( + ["python", "tddai_cli.py", "workspace-status"], + env=isolated_environment, + capture_output=True, + text=True, + cwd=Path.cwd() + ) + assert result.returncode == 0 + + # Step 2: Start working on an issue + result = subprocess.run( + ["python", "tddai_cli.py", "start-issue", "42"], + env=isolated_environment, + capture_output=True, + text=True, + cwd=Path.cwd(), + timeout=30 # Prevent hanging + ) + + # Verify the start command works (might create workspace) + if result.returncode == 0: + # If successful, check if workspace was created + issue_workspace = workspace_dir / "issue_42" + if issue_workspace.exists(): + assert_directory_exists(issue_workspace) + + # Step 3: Check workspace status again + result = subprocess.run( + ["python", "tddai_cli.py", "workspace-status"], + env=isolated_environment, + capture_output=True, + text=True, + cwd=Path.cwd() + ) + assert result.returncode == 0 + + # Step 4: Try to finish (cleanup) + result = subprocess.run( + ["python", "tddai_cli.py", "finish-issue"], + env=isolated_environment, + capture_output=True, + text=True, + cwd=Path.cwd(), + timeout=30 + ) + + # The finish command should work or provide meaningful feedback + assert result.returncode in [0, 1] # Allow for various implementation states + + def test_list_open_issues_command(self, isolated_environment): + """Test listing open issues.""" + # Act + result = subprocess.run( + ["python", "tddai_cli.py", "list-open-issues"], + env=isolated_environment, + capture_output=True, + text=True, + cwd=Path.cwd() + ) + + # Assert + assert result.returncode == 0 + # Should return some form of issue listing (even if empty) + output = result.stdout.strip() + assert len(output) >= 0 # Any output is acceptable + + def test_cli_help_commands(self, isolated_environment): + """Test CLI help functionality.""" + # Test main help + result = subprocess.run( + ["python", "tddai_cli.py", "--help"], + env=isolated_environment, + capture_output=True, + text=True, + cwd=Path.cwd() + ) + + assert result.returncode == 0 + assert "usage" in result.stdout.lower() or "commands" in result.stdout.lower() + + # Test specific command help + result = subprocess.run( + ["python", "tddai_cli.py", "show-issue", "--help"], + env=isolated_environment, + capture_output=True, + text=True, + cwd=Path.cwd() + ) + + assert result.returncode == 0 + + def test_cli_with_invalid_command(self, isolated_environment): + """Test CLI behavior with invalid command.""" + # Act + result = subprocess.run( + ["python", "tddai_cli.py", "invalid-command"], + env=isolated_environment, + capture_output=True, + text=True, + cwd=Path.cwd() + ) + + # Assert - Should handle gracefully + assert result.returncode != 0 + assert "error" in result.stderr.lower() or "unknown" in result.stderr.lower() + + def test_cli_error_handling(self, isolated_environment): + """Test CLI error handling for various scenarios.""" + # Test with missing required argument + result = subprocess.run( + ["python", "tddai_cli.py", "show-issue"], # Missing issue number + env=isolated_environment, + capture_output=True, + text=True, + cwd=Path.cwd() + ) + + # Should provide helpful error message + assert result.returncode != 0 + assert len(result.stderr) > 0 or "error" in result.stdout.lower() + + @pytest.mark.parametrize("issue_number", ["1", "23", "100"]) + def test_show_issue_command_multiple_issues(self, isolated_environment, issue_number): + """Test show issue command with multiple issue numbers.""" + # Act + result = subprocess.run( + ["python", "tddai_cli.py", "show-issue", issue_number], + env=isolated_environment, + capture_output=True, + text=True, + cwd=Path.cwd(), + timeout=15 + ) + + # Assert - Command should execute without crashing + assert result.returncode in [0, 1] # Allow for not found scenarios + assert len(result.stdout + result.stderr) > 0 # Should provide some output + + def test_cli_performance(self, isolated_environment, performance_timer): + """Test CLI command performance.""" + # Act + performance_timer.start() + result = subprocess.run( + ["python", "tddai_cli.py", "workspace-status"], + env=isolated_environment, + capture_output=True, + text=True, + cwd=Path.cwd() + ) + performance_timer.stop() + + # Assert + assert result.returncode == 0 + # CLI commands should be reasonably fast + assert performance_timer.elapsed < 10.0, f"CLI command took {performance_timer.elapsed:.2f}s" + + def test_cli_output_formatting(self, isolated_environment): + """Test CLI output formatting and structure.""" + # Test workspace status output + result = subprocess.run( + ["python", "tddai_cli.py", "workspace-status"], + env=isolated_environment, + capture_output=True, + text=True, + cwd=Path.cwd() + ) + + if result.returncode == 0: + output = result.stdout + # Output should be readable and structured + assert len(output.strip()) > 0 + # Should not contain obvious error traces + assert "Traceback" not in output + assert "Exception" not in output + + def test_cli_environment_isolation(self, test_workspace): + """Test that CLI commands work in isolated environment.""" + # Create isolated environment + isolated_env = { + "MARKITECT_WORKSPACE_DIR": str(test_workspace / "isolated"), + "MARKITECT_GITEA_URL": "http://isolated-gitea.com", + "MARKITECT_REPO_OWNER": "isolated", + "MARKITECT_REPO_NAME": "test", + "PYTHONPATH": "." + } + + # Update with current env to preserve PATH, etc. + full_env = dict(os.environ) + full_env.update(isolated_env) + + # Act + result = subprocess.run( + ["python", "tddai_cli.py", "workspace-status"], + env=full_env, + capture_output=True, + text=True, + cwd=Path.cwd() + ) + + # Assert - Should work with isolated environment + assert result.returncode == 0 + # Should use isolated workspace directory + workspace_path = test_workspace / "isolated" + workspace_path.mkdir(exist_ok=True) + + def test_cli_concurrent_execution(self, isolated_environment): + """Test concurrent CLI command execution.""" + import threading + import queue + + results_queue = queue.Queue() + + def run_command(command_args): + result = subprocess.run( + command_args, + env=isolated_environment, + capture_output=True, + text=True, + cwd=Path.cwd(), + timeout=15 + ) + results_queue.put(result) + + # Start multiple commands concurrently + commands = [ + ["python", "tddai_cli.py", "workspace-status"], + ["python", "tddai_cli.py", "show-issue", "1"], + ["python", "tddai_cli.py", "show-issue", "2"], + ] + + threads = [] + for cmd in commands: + thread = threading.Thread(target=run_command, args=(cmd,)) + threads.append(thread) + thread.start() + + # Wait for all threads to complete + for thread in threads: + thread.join(timeout=20) + + # Collect results + results = [] + while not results_queue.empty(): + results.append(results_queue.get()) + + # Assert + assert len(results) == len(commands) + # At least some commands should succeed + successful_commands = [r for r in results if r.returncode == 0] + assert len(successful_commands) > 0 + + @pytest.mark.smoke + def test_cli_smoke_test(self, isolated_environment): + """Basic smoke test for CLI functionality.""" + # Test that the CLI script exists and is executable + cli_script = Path("tddai_cli.py") + assert_file_exists(cli_script) + + # Test basic command execution + result = subprocess.run( + ["python", "tddai_cli.py", "--help"], + env=isolated_environment, + capture_output=True, + text=True, + cwd=Path.cwd(), + timeout=10 + ) + + # Should at least not crash + assert result.returncode in [0, 1, 2] # Various help return codes + assert len(result.stdout + result.stderr) > 0 \ No newline at end of file diff --git a/tests/e2e/performance/__init__.py b/tests/e2e/performance/__init__.py new file mode 100644 index 00000000..9cbe395c --- /dev/null +++ b/tests/e2e/performance/__init__.py @@ -0,0 +1,3 @@ +""" +Performance and load testing for MarkiTect. +""" \ No newline at end of file diff --git a/tests/e2e/performance/test_domain_performance.py b/tests/e2e/performance/test_domain_performance.py new file mode 100644 index 00000000..6b086a53 --- /dev/null +++ b/tests/e2e/performance/test_domain_performance.py @@ -0,0 +1,359 @@ +""" +Performance tests for domain operations. + +Demonstrates: +- Domain operation performance benchmarks +- Memory usage monitoring +- Bulk operation testing +- Performance regression detection +""" + +import pytest +import time +import gc +from typing import List + +from domain.issues.models import Issue, Label, IssueState +from domain.issues.services import IssueStatusService, IssueValidationService +from domain.projects.models import Project, Milestone, ProjectState +from domain.projects.services import ProjectManagementService + +from tests.utils.test_builders import IssueBuilder, LabelBuilder, MilestoneBuilder, ProjectBuilder +from tests.utils.assertions import assert_performance_within_bounds, assert_memory_usage_within_bounds + + +class TestDomainPerformance: + """Performance tests for domain operations.""" + + def test_issue_creation_performance(self, performance_timer): + """Test performance of creating many issues.""" + # Arrange + issue_count = 1000 + + # Act + performance_timer.start() + issues = [] + for i in range(issue_count): + issue = (IssueBuilder() + .with_number(i + 1) + .with_title(f"Performance Test Issue {i + 1}") + .as_bug() + .with_priority("medium") + .build()) + issues.append(issue) + performance_timer.stop() + + # Assert + assert len(issues) == issue_count + assert_performance_within_bounds(performance_timer.elapsed, 1.0, f"creating {issue_count} issues") + print(f"Created {issue_count} issues in {performance_timer.elapsed:.3f}s ({issue_count/performance_timer.elapsed:.0f} issues/sec)") + + def test_label_categorization_performance(self, performance_timer): + """Test performance of label categorization operations.""" + # Arrange + issues = [] + for i in range(500): + issue = (IssueBuilder() + .with_number(i + 1) + .with_title(f"Issue {i + 1}") + .with_labels( + "bug", "priority:high", "status:in-progress", + "frontend", "needs-testing", "documentation" + ) + .build()) + issues.append(issue) + + # Act + performance_timer.start() + categorized_results = [] + for issue in issues: + categories = issue.categorize_labels() + categorized_results.append(categories) + performance_timer.stop() + + # Assert + assert len(categorized_results) == 500 + assert_performance_within_bounds(performance_timer.elapsed, 0.5, "categorizing labels for 500 issues") + + # Verify categorization correctness + for categories in categorized_results: + assert "bug" in categories.type_labels + assert "priority:high" in categories.priority_labels + assert "status:in-progress" in categories.state_labels + assert "frontend" in categories.other_labels + + def test_issue_status_service_performance(self, performance_timer): + """Test performance of issue status service operations.""" + # Arrange + service = IssueStatusService() + issues = [] + project_info = {"kanban_columns": ["Todo", "In Progress", "Review", "Done"]} + + for i in range(1000): + labels = ["bug", f"priority:{'high' if i % 3 == 0 else 'medium'}", f"status:{'in-progress' if i % 2 == 0 else 'new'}"] + issue = (IssueBuilder() + .with_number(i + 1) + .with_title(f"Status Test Issue {i + 1}") + .with_labels(*labels) + .build()) + issues.append(issue) + + # Act + performance_timer.start() + results = [] + for issue in issues: + kanban_column = service.determine_kanban_column(issue, project_info) + priority_info = service.extract_priority_info(issue) + results.append((kanban_column, priority_info)) + performance_timer.stop() + + # Assert + assert len(results) == 1000 + assert_performance_within_bounds(performance_timer.elapsed, 0.8, "processing 1000 issues through status service") + + # Verify correctness + in_progress_count = sum(1 for kanban, _ in results if kanban == "In Progress") + todo_count = sum(1 for kanban, _ in results if kanban == "Todo") + assert in_progress_count > 0 + assert todo_count > 0 + + def test_project_progress_calculation_performance(self, performance_timer): + """Test performance of project progress calculations.""" + # Arrange + projects = [] + for i in range(100): + milestones = [] + for j in range(20): # 20 milestones per project + milestone = (MilestoneBuilder() + .with_id(j + 1) + .with_title(f"Milestone {j + 1}") + .with_issue_counts( + open_issues=10 - (j % 8), + closed_issues=j % 12 + ) + .build()) + milestones.append(milestone) + + project = (ProjectBuilder() + .with_name(f"Performance Project {i + 1}") + .with_milestones(*milestones) + .build()) + projects.append(project) + + # Act + performance_timer.start() + progress_results = [] + for project in projects: + overall_progress = project.calculate_overall_progress() + active_milestones = project.get_active_milestones() + completed_milestones = project.get_completed_milestones() + total_issues = project.get_total_issues() + + progress_results.append({ + "overall_progress": overall_progress, + "active_count": len(active_milestones), + "completed_count": len(completed_milestones), + "total_issues": total_issues + }) + performance_timer.stop() + + # Assert + assert len(progress_results) == 100 + assert_performance_within_bounds(performance_timer.elapsed, 0.5, "calculating progress for 100 projects with 20 milestones each") + + # Verify calculations are reasonable + for result in progress_results: + assert 0 <= result["overall_progress"] <= 100 + assert result["total_issues"] > 0 + + def test_bulk_issue_validation_performance(self, performance_timer): + """Test performance of bulk issue validation.""" + # Arrange + validation_service = IssueValidationService() + issue_data_list = [] + + for i in range(2000): + issue_data = { + "title": f"Validation Test Issue {i + 1}" if i % 10 != 0 else "", # 10% invalid + "labels": ["bug", "priority:medium"] if i % 5 != 0 else ["bug", "priority:high", "priority:low"] # 20% invalid + } + issue_data_list.append(issue_data) + + # Act + performance_timer.start() + validation_results = [] + for issue_data in issue_data_list: + try: + validation_service.validate_issue_creation(issue_data) + validation_results.append(True) + except Exception: + validation_results.append(False) + performance_timer.stop() + + # Assert + assert len(validation_results) == 2000 + assert_performance_within_bounds(performance_timer.elapsed, 1.0, "validating 2000 issues") + + # Verify validation correctness + valid_count = sum(1 for result in validation_results if result) + invalid_count = sum(1 for result in validation_results if not result) + + # Expect about 70% valid (90% have valid titles AND 80% have valid labels = 72%) + assert 1200 <= valid_count <= 1600 # Allow some tolerance + assert 400 <= invalid_count <= 800 + + @pytest.mark.slow + def test_memory_usage_with_large_datasets(self, performance_timer): + """Test memory usage with large datasets.""" + import psutil + import os + + # Measure initial memory + process = psutil.Process(os.getpid()) + initial_memory_mb = process.memory_info().rss / (1024 * 1024) + + # Create large dataset + performance_timer.start() + large_issues = [] + for i in range(10000): + issue = (IssueBuilder() + .with_number(i + 1) + .with_title(f"Large Dataset Issue {i + 1}") + .with_labels("bug", "priority:medium", "status:new", "backend", "database") + .build()) + large_issues.append(issue) + + # Perform operations on dataset + for issue in large_issues: + categories = issue.categorize_labels() + # Simulate some processing + _ = len(categories.type_labels) + len(categories.priority_labels) + + performance_timer.stop() + + # Measure final memory + final_memory_mb = process.memory_info().rss / (1024 * 1024) + memory_increase_mb = final_memory_mb - initial_memory_mb + + # Force garbage collection and measure again + gc.collect() + gc_memory_mb = process.memory_info().rss / (1024 * 1024) + gc_reduction_mb = final_memory_mb - gc_memory_mb + + # Assert + assert len(large_issues) == 10000 + assert_performance_within_bounds(performance_timer.elapsed, 5.0, "processing 10,000 issues") + assert_memory_usage_within_bounds(memory_increase_mb, 50.0, "creating and processing 10,000 issues") + + print(f"Memory usage: Initial={initial_memory_mb:.2f}MB, Final={final_memory_mb:.2f}MB, " + f"Increase={memory_increase_mb:.2f}MB, GC Reduction={gc_reduction_mb:.2f}MB") + + # Memory should be reasonable for the dataset size + assert memory_increase_mb > 0 # Should use some memory + assert gc_reduction_mb >= 0 # GC should not increase memory + + @pytest.mark.performance + def test_concurrent_domain_operations_simulation(self, performance_timer): + """Simulate concurrent domain operations for performance testing.""" + # Arrange + project_service = ProjectManagementService() + projects = [] + + # Create test projects + for i in range(10): + milestones = [ + MilestoneBuilder().with_id(j + 1).with_title(f"M{j + 1}") + .with_issue_counts(5, 3).build() + for j in range(5) + ] + project = (ProjectBuilder() + .with_name(f"Concurrent Project {i + 1}") + .with_milestones(*milestones) + .build()) + projects.append(project) + + # Act - Simulate concurrent operations + performance_timer.start() + results = [] + + # Simulate multiple "users" performing operations + for iteration in range(100): + for project in projects: + # Simulate various operations + health_report = project_service.calculate_project_health(project) + progress = project.calculate_overall_progress() + active_milestones = project.get_active_milestones() + + results.append({ + "iteration": iteration, + "project_name": project.name, + "health_score": health_report.overall_health_score, + "progress": progress, + "active_milestones": len(active_milestones) + }) + + performance_timer.stop() + + # Assert + expected_operations = 100 * 10 # 100 iterations * 10 projects + assert len(results) == expected_operations + assert_performance_within_bounds(performance_timer.elapsed, 2.0, f"simulating {expected_operations} concurrent operations") + + # Verify result consistency + for result in results: + assert 0 <= result["health_score"] <= 100 + assert 0 <= result["progress"] <= 100 + assert result["active_milestones"] >= 0 + + ops_per_second = expected_operations / performance_timer.elapsed + print(f"Simulated {expected_operations} operations in {performance_timer.elapsed:.3f}s ({ops_per_second:.0f} ops/sec)") + + def test_domain_operation_consistency_under_load(self, performance_timer): + """Test that domain operations remain consistent under load.""" + # Arrange + reference_issue = (IssueBuilder() + .with_number(1) + .with_title("Reference Issue") + .with_labels("bug", "priority:high", "status:blocked") + .build()) + + # Get reference results + reference_categories = reference_issue.categorize_labels() + status_service = IssueStatusService() + reference_kanban = status_service.determine_kanban_column(reference_issue, {}) + + # Act - Perform same operations many times + performance_timer.start() + consistency_results = [] + + for i in range(5000): + # Create identical issue + test_issue = (IssueBuilder() + .with_number(1) + .with_title("Reference Issue") + .with_labels("bug", "priority:high", "status:blocked") + .build()) + + # Perform operations + categories = test_issue.categorize_labels() + kanban = status_service.determine_kanban_column(test_issue, {}) + + # Check consistency + categories_match = ( + categories.type_labels == reference_categories.type_labels and + categories.priority_labels == reference_categories.priority_labels and + categories.state_labels == reference_categories.state_labels + ) + kanban_matches = kanban == reference_kanban + + consistency_results.append(categories_match and kanban_matches) + + performance_timer.stop() + + # Assert + assert len(consistency_results) == 5000 + assert all(consistency_results), "All operations should produce consistent results" + assert_performance_within_bounds(performance_timer.elapsed, 1.5, "consistency test with 5000 operations") + + print(f"Consistency test: {len(consistency_results)} operations, all consistent, " + f"completed in {performance_timer.elapsed:.3f}s") \ No newline at end of file diff --git a/tests/fixtures/__init__.py b/tests/fixtures/__init__.py new file mode 100644 index 00000000..585734f0 --- /dev/null +++ b/tests/fixtures/__init__.py @@ -0,0 +1,3 @@ +""" +Test fixtures and data builders for MarkiTect tests. +""" \ No newline at end of file diff --git a/tests/fixtures/api_responses.py b/tests/fixtures/api_responses.py new file mode 100644 index 00000000..bf37fd87 --- /dev/null +++ b/tests/fixtures/api_responses.py @@ -0,0 +1,332 @@ +""" +API response builders and mock data for testing external integrations. +""" + +from typing import Dict, List, Any, Optional +from datetime import datetime, timezone + + +class GiteaApiResponseBuilder: + """Builder for creating mock Gitea API responses.""" + + def __init__(self): + self.issue_data = { + "number": 1, + "title": "Test Issue", + "body": "Test issue description", + "state": "open", + "labels": [], + "milestone": None, + "assignees": [], + "created_at": "2025-01-01T00:00:00Z", + "updated_at": "2025-01-01T00:00:00Z", + "closed_at": None, + "html_url": "https://test-gitea.com/test/repo/issues/1", + "user": { + "login": "testuser", + "id": 1, + "avatar_url": "https://test-gitea.com/avatars/1" + } + } + + def with_number(self, number: int) -> "GiteaApiResponseBuilder": + """Set issue number.""" + self.issue_data["number"] = number + self.issue_data["html_url"] = f"https://test-gitea.com/test/repo/issues/{number}" + return self + + def with_title(self, title: str) -> "GiteaApiResponseBuilder": + """Set issue title.""" + self.issue_data["title"] = title + return self + + def with_body(self, body: str) -> "GiteaApiResponseBuilder": + """Set issue body/description.""" + self.issue_data["body"] = body + return self + + def with_state(self, state: str) -> "GiteaApiResponseBuilder": + """Set issue state (open/closed).""" + if state not in ["open", "closed"]: + raise ValueError("State must be 'open' or 'closed'") + self.issue_data["state"] = state + if state == "closed" and self.issue_data["closed_at"] is None: + self.issue_data["closed_at"] = "2025-01-02T00:00:00Z" + return self + + def with_labels(self, *labels: str) -> "GiteaApiResponseBuilder": + """Add labels to the issue.""" + self.issue_data["labels"] = [ + { + "id": i + 1, + "name": label, + "color": "red", + "description": f"Label: {label}" + } + for i, label in enumerate(labels) + ] + return self + + def with_milestone(self, title: str, id: int = 1, state: str = "open") -> "GiteaApiResponseBuilder": + """Add milestone to the issue.""" + self.issue_data["milestone"] = { + "id": id, + "title": title, + "description": f"Milestone: {title}", + "state": state, + "open_issues": 5, + "closed_issues": 3, + "created_at": "2025-01-01T00:00:00Z", + "updated_at": "2025-01-01T00:00:00Z", + "due_date": "2025-12-31T23:59:59Z" + } + return self + + def with_assignees(self, *usernames: str) -> "GiteaApiResponseBuilder": + """Add assignees to the issue.""" + self.issue_data["assignees"] = [ + { + "login": username, + "id": i + 1, + "avatar_url": f"https://test-gitea.com/avatars/{i + 1}" + } + for i, username in enumerate(usernames) + ] + return self + + def with_timestamps(self, created_at: str, updated_at: str, closed_at: Optional[str] = None) -> "GiteaApiResponseBuilder": + """Set issue timestamps.""" + self.issue_data["created_at"] = created_at + self.issue_data["updated_at"] = updated_at + if closed_at: + self.issue_data["closed_at"] = closed_at + return self + + def build(self) -> Dict[str, Any]: + """Build the final issue data.""" + return self.issue_data.copy() + + +class GiteaProjectResponseBuilder: + """Builder for creating mock Gitea project/repository responses.""" + + def __init__(self): + self.project_data = { + "id": 1, + "name": "test-repo", + "full_name": "test/test-repo", + "description": "Test repository", + "private": False, + "fork": False, + "html_url": "https://test-gitea.com/test/test-repo", + "clone_url": "https://test-gitea.com/test/test-repo.git", + "created_at": "2025-01-01T00:00:00Z", + "updated_at": "2025-01-01T00:00:00Z", + "owner": { + "login": "test", + "id": 1, + "avatar_url": "https://test-gitea.com/avatars/1" + }, + "permissions": { + "admin": True, + "push": True, + "pull": True + }, + "open_issues_count": 5, + "stargazers_count": 10, + "watchers_count": 3, + "forks_count": 2, + "size": 1024, + "default_branch": "main", + "archived": False, + "disabled": False + } + + def with_name(self, name: str, owner: str = "test") -> "GiteaProjectResponseBuilder": + """Set repository name and owner.""" + self.project_data["name"] = name + self.project_data["full_name"] = f"{owner}/{name}" + self.project_data["html_url"] = f"https://test-gitea.com/{owner}/{name}" + self.project_data["clone_url"] = f"https://test-gitea.com/{owner}/{name}.git" + self.project_data["owner"]["login"] = owner + return self + + def with_description(self, description: str) -> "GiteaProjectResponseBuilder": + """Set repository description.""" + self.project_data["description"] = description + return self + + def with_visibility(self, private: bool) -> "GiteaProjectResponseBuilder": + """Set repository visibility.""" + self.project_data["private"] = private + return self + + def with_stats(self, open_issues: int = 5, stars: int = 10, watchers: int = 3, forks: int = 2) -> "GiteaProjectResponseBuilder": + """Set repository statistics.""" + self.project_data["open_issues_count"] = open_issues + self.project_data["stargazers_count"] = stars + self.project_data["watchers_count"] = watchers + self.project_data["forks_count"] = forks + return self + + def with_permissions(self, admin: bool = True, push: bool = True, pull: bool = True) -> "GiteaProjectResponseBuilder": + """Set user permissions.""" + self.project_data["permissions"] = { + "admin": admin, + "push": push, + "pull": pull + } + return self + + def build(self) -> Dict[str, Any]: + """Build the final project data.""" + return self.project_data.copy() + + +class GiteaMilestoneResponseBuilder: + """Builder for creating mock Gitea milestone responses.""" + + def __init__(self): + self.milestone_data = { + "id": 1, + "title": "Version 1.0", + "description": "First release milestone", + "state": "open", + "open_issues": 5, + "closed_issues": 3, + "created_at": "2025-01-01T00:00:00Z", + "updated_at": "2025-01-01T00:00:00Z", + "due_date": "2025-12-31T23:59:59Z" + } + + def with_id(self, id: int) -> "GiteaMilestoneResponseBuilder": + """Set milestone ID.""" + self.milestone_data["id"] = id + return self + + def with_title(self, title: str) -> "GiteaMilestoneResponseBuilder": + """Set milestone title.""" + self.milestone_data["title"] = title + return self + + def with_description(self, description: str) -> "GiteaMilestoneResponseBuilder": + """Set milestone description.""" + self.milestone_data["description"] = description + return self + + def with_state(self, state: str) -> "GiteaMilestoneResponseBuilder": + """Set milestone state.""" + if state not in ["open", "closed"]: + raise ValueError("State must be 'open' or 'closed'") + self.milestone_data["state"] = state + return self + + def with_issue_counts(self, open_issues: int, closed_issues: int) -> "GiteaMilestoneResponseBuilder": + """Set issue counts.""" + self.milestone_data["open_issues"] = open_issues + self.milestone_data["closed_issues"] = closed_issues + return self + + def with_due_date(self, due_date: str) -> "GiteaMilestoneResponseBuilder": + """Set milestone due date.""" + self.milestone_data["due_date"] = due_date + return self + + def build(self) -> Dict[str, Any]: + """Build the final milestone data.""" + return self.milestone_data.copy() + + +# Pre-built common responses +SAMPLE_ISSUE_RESPONSE = ( + GiteaApiResponseBuilder() + .with_number(123) + .with_title("Sample Issue") + .with_body("This is a sample issue for testing") + .with_labels("bug", "priority:high", "status:in-progress") + .with_milestone("Version 1.0") + .with_assignees("testuser") + .build() +) + +SAMPLE_PROJECT_RESPONSE = ( + GiteaProjectResponseBuilder() + .with_name("sample-project", "testorg") + .with_description("A sample project for testing") + .with_stats(open_issues=10, stars=25, watchers=8, forks=3) + .build() +) + +SAMPLE_MILESTONE_RESPONSE = ( + GiteaMilestoneResponseBuilder() + .with_title("Version 2.0") + .with_description("Second major release") + .with_issue_counts(8, 12) + .with_due_date("2025-06-30T23:59:59Z") + .build() +) + +# Error responses +ERROR_RESPONSES = { + "not_found": { + "message": "404 Not Found", + "documentation_url": "https://docs.gitea.io/en-us/api-usage/" + }, + "unauthorized": { + "message": "401 Unauthorized", + "documentation_url": "https://docs.gitea.io/en-us/api-usage/" + }, + "forbidden": { + "message": "403 Forbidden", + "documentation_url": "https://docs.gitea.io/en-us/api-usage/" + }, + "validation_failed": { + "message": "Validation Failed", + "errors": [ + { + "resource": "Issue", + "field": "title", + "code": "missing_field" + } + ] + }, + "rate_limit": { + "message": "API rate limit exceeded", + "documentation_url": "https://docs.gitea.io/en-us/api-usage/" + } +} + + +def get_paginated_response(items: List[Dict[str, Any]], page: int = 1, per_page: int = 30) -> Dict[str, Any]: + """Create a paginated response wrapper.""" + start_idx = (page - 1) * per_page + end_idx = start_idx + per_page + page_items = items[start_idx:end_idx] + + return { + "data": page_items, + "pagination": { + "page": page, + "per_page": per_page, + "total": len(items), + "total_pages": (len(items) + per_page - 1) // per_page, + "has_next": end_idx < len(items), + "has_prev": page > 1 + } + } + + +def create_bulk_issues(count: int, base_number: int = 1) -> List[Dict[str, Any]]: + """Create a list of test issues for bulk operations.""" + issues = [] + for i in range(count): + issue = ( + GiteaApiResponseBuilder() + .with_number(base_number + i) + .with_title(f"Test Issue {base_number + i}") + .with_body(f"Description for test issue {base_number + i}") + .with_labels("test") + .build() + ) + issues.append(issue) + return issues \ No newline at end of file diff --git a/tests/fixtures/markdown_samples.py b/tests/fixtures/markdown_samples.py new file mode 100644 index 00000000..f2d31370 --- /dev/null +++ b/tests/fixtures/markdown_samples.py @@ -0,0 +1,302 @@ +""" +Markdown document builders and sample generators for testing. +""" + +from typing import Dict, List, Optional +import random +import string + + +class MarkdownDocumentBuilder: + """Builder pattern for creating test markdown documents.""" + + def __init__(self): + self.content_parts: List[str] = [] + self.metadata: Dict[str, str] = {} + + def with_heading(self, text: str, level: int = 1) -> "MarkdownDocumentBuilder": + """Add a heading to the document.""" + if level < 1 or level > 6: + raise ValueError("Heading level must be between 1 and 6") + + heading_marker = "#" * level + self.content_parts.append(f"{heading_marker} {text}") + return self + + def with_paragraph(self, text: str) -> "MarkdownDocumentBuilder": + """Add a paragraph to the document.""" + self.content_parts.append(text) + return self + + def with_list(self, items: List[str], ordered: bool = False) -> "MarkdownDocumentBuilder": + """Add a list to the document.""" + if ordered: + list_items = [f"{i+1}. {item}" for i, item in enumerate(items)] + else: + list_items = [f"- {item}" for item in items] + + self.content_parts.append("\n".join(list_items)) + return self + + def with_code_block(self, code: str, language: str = "python") -> "MarkdownDocumentBuilder": + """Add a code block to the document.""" + self.content_parts.append(f"```{language}\n{code}\n```") + return self + + def with_link(self, text: str, url: str) -> "MarkdownDocumentBuilder": + """Add a link to the document.""" + self.content_parts.append(f"[{text}]({url})") + return self + + def with_metadata(self, key: str, value: str) -> "MarkdownDocumentBuilder": + """Add metadata (front matter) to the document.""" + self.metadata[key] = value + return self + + def with_table(self, headers: List[str], rows: List[List[str]]) -> "MarkdownDocumentBuilder": + """Add a table to the document.""" + table_lines = [] + + # Header row + table_lines.append("| " + " | ".join(headers) + " |") + + # Separator row + table_lines.append("| " + " | ".join(["-" * len(header) for header in headers]) + " |") + + # Data rows + for row in rows: + table_lines.append("| " + " | ".join(row) + " |") + + self.content_parts.append("\n".join(table_lines)) + return self + + def with_blockquote(self, text: str) -> "MarkdownDocumentBuilder": + """Add a blockquote to the document.""" + quote_lines = [f"> {line}" for line in text.split("\n")] + self.content_parts.append("\n".join(quote_lines)) + return self + + def build(self) -> str: + """Build the final markdown document.""" + content = "\n\n".join(self.content_parts) + + if self.metadata: + metadata_lines = [f"{k}: {v}" for k, v in self.metadata.items()] + content = "---\n" + "\n".join(metadata_lines) + "\n---\n\n" + content + + return content + + +class LargeMarkdownGenerator: + """Generator for creating large markdown documents for performance testing.""" + + def __init__(self, seed: Optional[int] = None): + self.random = random.Random(seed) + + def generate_document(self, size: str = "1mb") -> str: + """Generate a large markdown document of specified size.""" + size_bytes = self._parse_size(size) + builder = MarkdownDocumentBuilder() + + # Add metadata + builder.with_metadata("title", "Large Test Document") + builder.with_metadata("author", "Test Generator") + builder.with_metadata("size", size) + + # Add content until we reach target size + current_size = 0 + section_count = 0 + + while current_size < size_bytes: + section_count += 1 + section_title = f"Section {section_count}" + builder.with_heading(section_title, level=2) + + # Add paragraphs + for _ in range(self.random.randint(3, 8)): + paragraph = self._generate_paragraph() + builder.with_paragraph(paragraph) + current_size += len(paragraph) + 2 # +2 for newlines + + if current_size >= size_bytes: + break + + # Add a list occasionally + if self.random.random() < 0.3: + items = [self._generate_sentence() for _ in range(self.random.randint(3, 7))] + builder.with_list(items) + current_size += sum(len(item) for item in items) + len(items) * 3 # Approximate + + # Add a code block occasionally + if self.random.random() < 0.2: + code = self._generate_code_block() + builder.with_code_block(code) + current_size += len(code) + 10 # +10 for code block markers + + return builder.build() + + def _parse_size(self, size: str) -> int: + """Parse size string (e.g., '1mb', '500kb') to bytes.""" + size = size.lower() + if size.endswith("kb"): + return int(size[:-2]) * 1024 + elif size.endswith("mb"): + return int(size[:-2]) * 1024 * 1024 + elif size.endswith("gb"): + return int(size[:-2]) * 1024 * 1024 * 1024 + else: + return int(size) + + def _generate_paragraph(self) -> str: + """Generate a paragraph of random text.""" + sentences = [] + for _ in range(self.random.randint(3, 8)): + sentences.append(self._generate_sentence()) + return " ".join(sentences) + + def _generate_sentence(self) -> str: + """Generate a random sentence.""" + words = [] + for _ in range(self.random.randint(5, 15)): + words.append(self._generate_word()) + + sentence = " ".join(words).capitalize() + return sentence + "." + + def _generate_word(self) -> str: + """Generate a random word.""" + length = self.random.randint(3, 12) + return "".join(self.random.choices(string.ascii_lowercase, k=length)) + + def _generate_code_block(self) -> str: + """Generate a random code block.""" + lines = [] + for _ in range(self.random.randint(5, 15)): + line = self._generate_code_line() + lines.append(line) + return "\n".join(lines) + + def _generate_code_line(self) -> str: + """Generate a line of code-like text.""" + templates = [ + "def {func_name}({params}):", + " return {expression}", + "if {condition}:", + " {statement}", + "# {comment}", + "class {class_name}:", + " self.{attr} = {value}", + "import {module}", + "from {module} import {name}", + ] + + template = self.random.choice(templates) + variables = { + "func_name": self._generate_word(), + "params": ", ".join([self._generate_word() for _ in range(self.random.randint(0, 3))]), + "expression": f"{self._generate_word()}({self._generate_word()})", + "condition": f"{self._generate_word()} == {self.random.randint(1, 100)}", + "statement": f"{self._generate_word()} = {self.random.randint(1, 100)}", + "comment": " ".join([self._generate_word() for _ in range(self.random.randint(2, 6))]), + "class_name": self._generate_word().capitalize(), + "attr": self._generate_word(), + "value": str(self.random.randint(1, 100)), + "module": self._generate_word(), + "name": self._generate_word(), + } + + return template.format(**variables) + + +# Pre-built sample documents +SAMPLE_SIMPLE_DOCUMENT = """# Simple Document + +This is a simple test document. + +## Features + +- Feature 1 +- Feature 2 +- Feature 3 +""" + +SAMPLE_COMPLEX_DOCUMENT = ( + MarkdownDocumentBuilder() + .with_metadata("title", "Complex Test Document") + .with_metadata("author", "Test Suite") + .with_metadata("tags", "test, complex, sample") + .with_heading("Complex Test Document") + .with_paragraph("This is a complex test document with various markdown features.") + .with_heading("Table of Contents", level=2) + .with_list([ + "Introduction", + "Features", + "Examples", + "Conclusion" + ], ordered=True) + .with_heading("Introduction", level=2) + .with_paragraph("This document demonstrates various markdown features.") + .with_blockquote("This is an important note about the document.") + .with_heading("Features", level=2) + .with_list([ + "**Bold text**", + "*Italic text*", + "`Code inline`", + "[Links](https://example.com)" + ]) + .with_heading("Code Example", level=3) + .with_code_block('''def hello_world(): + """Print hello world message.""" + print("Hello, World!") + return "success"''') + .with_heading("Data Table", level=3) + .with_table( + ["Name", "Type", "Description"], + [ + ["title", "string", "Document title"], + ["author", "string", "Document author"], + ["tags", "array", "Document tags"] + ] + ) + .with_heading("Conclusion", level=2) + .with_paragraph("This document shows the power of markdown for documentation.") + .build() +) + +SAMPLE_TECHNICAL_DOCUMENT = ( + MarkdownDocumentBuilder() + .with_metadata("title", "API Documentation") + .with_metadata("version", "1.0.0") + .with_metadata("category", "technical") + .with_heading("API Documentation") + .with_paragraph("This document describes the REST API endpoints.") + .with_heading("Authentication", level=2) + .with_paragraph("All API requests require authentication via API key.") + .with_code_block('''curl -H "Authorization: Bearer YOUR_API_KEY" \\ + https://api.example.com/v1/endpoint''', "bash") + .with_heading("Endpoints", level=2) + .with_heading("GET /users", level=3) + .with_paragraph("Retrieve a list of users.") + .with_table( + ["Parameter", "Type", "Required", "Description"], + [ + ["limit", "integer", "No", "Maximum number of results"], + ["offset", "integer", "No", "Number of results to skip"], + ["filter", "string", "No", "Filter criteria"] + ] + ) + .with_heading("Response", level=4) + .with_code_block('''{ + "users": [ + { + "id": 1, + "name": "John Doe", + "email": "john@example.com" + } + ], + "total": 1, + "offset": 0, + "limit": 10 +}''', "json") + .build() +) \ No newline at end of file diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py new file mode 100644 index 00000000..4bdf3947 --- /dev/null +++ b/tests/integration/__init__.py @@ -0,0 +1,3 @@ +""" +Integration tests for MarkiTect components. +""" \ No newline at end of file diff --git a/tests/integration/repositories/__init__.py b/tests/integration/repositories/__init__.py new file mode 100644 index 00000000..6a746063 --- /dev/null +++ b/tests/integration/repositories/__init__.py @@ -0,0 +1,3 @@ +""" +Integration tests for repository implementations. +""" \ No newline at end of file diff --git a/tests/integration/repositories/test_document_repository_integration.py b/tests/integration/repositories/test_document_repository_integration.py new file mode 100644 index 00000000..9a9d170b --- /dev/null +++ b/tests/integration/repositories/test_document_repository_integration.py @@ -0,0 +1,487 @@ +""" +Integration tests for document repository with real database. + +Demonstrates: +- Real database integration testing +- Transaction testing +- Performance validation +- Error scenario handling +""" + +import pytest +import sqlite3 +import asyncio +from pathlib import Path +from datetime import datetime, timezone +import tempfile +import shutil + +from tests.fixtures.markdown_samples import MarkdownDocumentBuilder, SAMPLE_COMPLEX_DOCUMENT +from tests.utils.assertions import assert_file_exists, assert_performance_within_bounds + + +class MockDocument: + """Mock document model for testing.""" + + def __init__(self, filename: str, content: str, ast_data: dict = None): + self.filename = filename + self.content = content + self.ast_data = ast_data or {} + self.created_at = datetime.now(timezone.utc) + self.updated_at = datetime.now(timezone.utc) + + +class MockDocumentRepository: + """Mock document repository that simulates real database operations.""" + + def __init__(self, db_path: Path): + self.db_path = db_path + self._init_database() + + def _init_database(self): + """Initialize database schema.""" + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + + cursor.execute(""" + CREATE TABLE IF NOT EXISTS documents ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + filename TEXT UNIQUE NOT NULL, + content TEXT NOT NULL, + ast_data TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ) + """) + + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_documents_filename + ON documents(filename) + """) + + conn.commit() + conn.close() + + async def store_document(self, document: MockDocument) -> int: + """Store a document in the database.""" + await asyncio.sleep(0.001) # Simulate async database operation + + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + + try: + cursor.execute(""" + INSERT INTO documents (filename, content, ast_data, created_at, updated_at) + VALUES (?, ?, ?, ?, ?) + """, ( + document.filename, + document.content, + str(document.ast_data), + document.created_at.isoformat(), + document.updated_at.isoformat() + )) + + document_id = cursor.lastrowid + conn.commit() + return document_id + + except sqlite3.IntegrityError as e: + conn.rollback() + raise ValueError(f"Document with filename '{document.filename}' already exists") from e + finally: + conn.close() + + async def get_document(self, document_id: int) -> MockDocument: + """Retrieve a document by ID.""" + await asyncio.sleep(0.001) # Simulate async database operation + + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + + try: + cursor.execute(""" + SELECT filename, content, ast_data, created_at, updated_at + FROM documents WHERE id = ? + """, (document_id,)) + + row = cursor.fetchone() + if not row: + raise ValueError(f"Document with ID {document_id} not found") + + filename, content, ast_data, created_at, updated_at = row + document = MockDocument(filename, content, eval(ast_data) if ast_data else {}) + document.created_at = datetime.fromisoformat(created_at) + document.updated_at = datetime.fromisoformat(updated_at) + + return document + + finally: + conn.close() + + async def update_document(self, document_id: int, content: str, ast_data: dict) -> None: + """Update document content and AST data.""" + await asyncio.sleep(0.001) # Simulate async database operation + + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + + try: + cursor.execute(""" + UPDATE documents + SET content = ?, ast_data = ?, updated_at = ? + WHERE id = ? + """, ( + content, + str(ast_data), + datetime.now(timezone.utc).isoformat(), + document_id + )) + + if cursor.rowcount == 0: + raise ValueError(f"Document with ID {document_id} not found") + + conn.commit() + + finally: + conn.close() + + async def delete_document(self, document_id: int) -> None: + """Delete a document.""" + await asyncio.sleep(0.001) # Simulate async database operation + + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + + try: + cursor.execute("DELETE FROM documents WHERE id = ?", (document_id,)) + + if cursor.rowcount == 0: + raise ValueError(f"Document with ID {document_id} not found") + + conn.commit() + + finally: + conn.close() + + async def list_all_documents(self): + """List all documents.""" + await asyncio.sleep(0.001) # Simulate async database operation + + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + + try: + cursor.execute(""" + SELECT id, filename, created_at, updated_at + FROM documents ORDER BY created_at DESC + """) + + rows = cursor.fetchall() + return [ + { + "id": row[0], + "filename": row[1], + "created_at": row[2], + "updated_at": row[3] + } + for row in rows + ] + + finally: + conn.close() + + async def search_content(self, search_term: str): + """Search documents by content.""" + await asyncio.sleep(0.005) # Simulate more expensive search operation + + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + + try: + cursor.execute(""" + SELECT id, filename, content + FROM documents + WHERE content LIKE ? + ORDER BY filename + """, (f"%{search_term}%",)) + + rows = cursor.fetchall() + return [ + { + "id": row[0], + "filename": row[1], + "content": row[2] + } + for row in rows + ] + + finally: + conn.close() + + def close(self): + """Close repository (cleanup).""" + pass + + +@pytest.fixture +def test_db_path(test_workspace): + """Provide test database path.""" + return test_workspace / "integration_test.db" + + +@pytest.fixture +async def document_repository(test_db_path): + """Provide document repository with real database.""" + repo = MockDocumentRepository(test_db_path) + yield repo + repo.close() + + +@pytest.mark.integration +class TestDocumentRepositoryIntegration: + """Integration tests for document repository with real database.""" + + @pytest.mark.asyncio + async def test_store_and_retrieve_document(self, document_repository, test_db_path): + """Test storing and retrieving a document.""" + # Arrange + assert_file_exists(test_db_path) + + document = MockDocument( + filename="test.md", + content="# Test Document\nThis is a test.", + ast_data={"type": "document", "children": []} + ) + + # Act + document_id = await document_repository.store_document(document) + retrieved = await document_repository.get_document(document_id) + + # Assert + assert isinstance(document_id, int) + assert document_id > 0 + assert retrieved.filename == "test.md" + assert retrieved.content == "# Test Document\nThis is a test." + assert retrieved.ast_data["type"] == "document" + + @pytest.mark.asyncio + async def test_store_duplicate_filename_raises_error(self, document_repository): + """Test that storing duplicate filename raises error.""" + # Arrange + document1 = MockDocument("duplicate.md", "Content 1") + document2 = MockDocument("duplicate.md", "Content 2") + + # Act + await document_repository.store_document(document1) + + # Assert + with pytest.raises(ValueError, match="already exists"): + await document_repository.store_document(document2) + + @pytest.mark.asyncio + async def test_update_document_content(self, document_repository): + """Test updating document content and AST.""" + # Arrange + document = MockDocument("update.md", "Original content") + document_id = await document_repository.store_document(document) + + # Act + new_content = "Updated content" + new_ast = {"type": "document", "updated": True} + await document_repository.update_document(document_id, new_content, new_ast) + + # Verify + updated = await document_repository.get_document(document_id) + assert updated.content == "Updated content" + assert updated.ast_data["updated"] is True + + @pytest.mark.asyncio + async def test_delete_document(self, document_repository): + """Test deleting a document.""" + # Arrange + document = MockDocument("delete.md", "To be deleted") + document_id = await document_repository.store_document(document) + + # Verify document exists + retrieved = await document_repository.get_document(document_id) + assert retrieved.filename == "delete.md" + + # Act + await document_repository.delete_document(document_id) + + # Assert + with pytest.raises(ValueError, match="not found"): + await document_repository.get_document(document_id) + + @pytest.mark.asyncio + async def test_list_all_documents(self, document_repository): + """Test listing all documents.""" + # Arrange - Store multiple documents + documents = [ + MockDocument("doc1.md", "Content 1"), + MockDocument("doc2.md", "Content 2"), + MockDocument("doc3.md", "Content 3") + ] + + for doc in documents: + await document_repository.store_document(doc) + + # Act + all_docs = await document_repository.list_all_documents() + + # Assert + assert len(all_docs) == 3 + filenames = {doc["filename"] for doc in all_docs} + expected_filenames = {"doc1.md", "doc2.md", "doc3.md"} + assert filenames == expected_filenames + + @pytest.mark.asyncio + async def test_search_content(self, document_repository): + """Test content search functionality.""" + # Arrange + documents = [ + MockDocument("api.md", "API documentation for REST endpoints"), + MockDocument("guide.md", "User guide for getting started"), + MockDocument("readme.md", "Project README with API examples") + ] + + for doc in documents: + await document_repository.store_document(doc) + + # Act + api_results = await document_repository.search_content("API") + guide_results = await document_repository.search_content("guide") + + # Assert + assert len(api_results) == 2 # api.md and readme.md + api_filenames = {result["filename"] for result in api_results} + assert api_filenames == {"api.md", "readme.md"} + + assert len(guide_results) == 1 # guide.md only + assert guide_results[0]["filename"] == "guide.md" + + @pytest.mark.asyncio + async def test_bulk_operations_performance(self, document_repository, performance_timer): + """Test performance of bulk operations.""" + # Arrange + documents = [] + for i in range(50): + content = (MarkdownDocumentBuilder() + .with_heading(f"Document {i}") + .with_paragraph(f"Content for document {i}") + .build()) + documents.append(MockDocument(f"bulk_{i}.md", content)) + + # Act - Bulk storage + performance_timer.start() + document_ids = [] + for doc in documents: + doc_id = await document_repository.store_document(doc) + document_ids.append(doc_id) + performance_timer.stop() + + # Assert + assert len(document_ids) == 50 + assert_performance_within_bounds(performance_timer.elapsed, 5.0, "bulk document storage") + + # Act - Bulk retrieval + performance_timer.start() + retrieved_docs = [] + for doc_id in document_ids: + doc = await document_repository.get_document(doc_id) + retrieved_docs.append(doc) + performance_timer.stop() + + # Assert + assert len(retrieved_docs) == 50 + assert_performance_within_bounds(performance_timer.elapsed, 3.0, "bulk document retrieval") + + @pytest.mark.asyncio + async def test_concurrent_operations(self, document_repository): + """Test concurrent database operations.""" + # Arrange + async def store_document(index): + content = f"# Document {index}\nContent for document {index}" + doc = MockDocument(f"concurrent_{index}.md", content) + return await document_repository.store_document(doc) + + # Act - Concurrent storage + tasks = [store_document(i) for i in range(20)] + document_ids = await asyncio.gather(*tasks) + + # Assert + assert len(document_ids) == 20 + assert len(set(document_ids)) == 20 # All IDs should be unique + + # Verify all documents are accessible + all_docs = await document_repository.list_all_documents() + assert len(all_docs) == 20 + + @pytest.mark.asyncio + async def test_transaction_like_behavior(self, document_repository): + """Test error handling doesn't leave database in inconsistent state.""" + # Arrange - Store initial document + doc1 = MockDocument("initial.md", "Initial content") + doc_id = await document_repository.store_document(doc1) + + # Act - Try to update with invalid ID (should fail) + with pytest.raises(ValueError, match="not found"): + await document_repository.update_document(99999, "Invalid update", {}) + + # Assert - Original document should be unchanged + retrieved = await document_repository.get_document(doc_id) + assert retrieved.content == "Initial content" + + @pytest.mark.asyncio + async def test_large_document_handling(self, document_repository, performance_timer): + """Test handling of large documents.""" + # Arrange - Create large document content + from tests.fixtures.markdown_samples import LargeMarkdownGenerator + generator = LargeMarkdownGenerator(seed=42) + large_content = generator.generate_document(size="100kb") + + document = MockDocument("large.md", large_content) + + # Act + performance_timer.start() + document_id = await document_repository.store_document(document) + retrieved = await document_repository.get_document(document_id) + performance_timer.stop() + + # Assert + assert document_id > 0 + assert len(retrieved.content) > 100000 # At least 100KB + assert retrieved.content == large_content + assert_performance_within_bounds(performance_timer.elapsed, 1.0, "large document operations") + + @pytest.mark.asyncio + @pytest.mark.slow + async def test_search_performance_with_large_dataset(self, document_repository, performance_timer): + """Test search performance with large dataset.""" + # Arrange - Create many documents with searchable content + search_terms = ["API", "database", "testing", "performance", "integration"] + + documents = [] + for i in range(100): + term = search_terms[i % len(search_terms)] + content = (MarkdownDocumentBuilder() + .with_heading(f"Document {i}") + .with_paragraph(f"This document covers {term} functionality in detail.") + .with_paragraph("Additional content for search testing.") + .build()) + documents.append(MockDocument(f"search_{i}.md", content)) + + # Store all documents + for doc in documents: + await document_repository.store_document(doc) + + # Act - Perform searches + performance_timer.start() + api_results = await document_repository.search_content("API") + database_results = await document_repository.search_content("database") + performance_timer.stop() + + # Assert + assert len(api_results) >= 20 # Should find multiple documents + assert len(database_results) >= 20 + assert_performance_within_bounds(performance_timer.elapsed, 2.0, "search operations") \ No newline at end of file diff --git a/tests/requirements-test.txt b/tests/requirements-test.txt new file mode 100644 index 00000000..fcc1ce98 --- /dev/null +++ b/tests/requirements-test.txt @@ -0,0 +1,45 @@ +# Testing framework dependencies +pytest>=7.4.0 +pytest-asyncio>=0.21.0 +pytest-cov>=4.1.0 +pytest-mock>=3.11.0 +pytest-xdist>=3.3.0 +pytest-timeout>=2.1.0 +pytest-benchmark>=4.0.0 + +# Property-based testing +hypothesis>=6.82.0 + +# HTTP mocking +aioresponses>=0.7.4 +responses>=0.23.0 + +# Contract testing +pact-python>=2.0.0 + +# Mutation testing +mutmut>=2.4.0 + +# Test data generation +factory-boy>=3.3.0 +faker>=19.0.0 + +# Performance monitoring +psutil>=5.9.0 + +# Code quality +flake8>=6.0.0 +black>=23.0.0 +isort>=5.12.0 +mypy>=1.4.0 + +# Test reporting +pytest-html>=3.2.0 +coverage[toml]>=7.2.0 + +# Database testing +pytest-postgresql>=5.0.0 +pytest-sqlite>=0.5.0 + +# Async utilities +anyio>=3.7.0 \ No newline at end of file diff --git a/tests/unit/application/__init__.py b/tests/unit/application/__init__.py new file mode 100644 index 00000000..ff4bf34c --- /dev/null +++ b/tests/unit/application/__init__.py @@ -0,0 +1,3 @@ +""" +Unit tests for application services layer. +""" \ No newline at end of file diff --git a/tests/unit/application/test_issue_application_service.py b/tests/unit/application/test_issue_application_service.py new file mode 100644 index 00000000..50ac1d5e --- /dev/null +++ b/tests/unit/application/test_issue_application_service.py @@ -0,0 +1,410 @@ +""" +Unit tests for issue application service with enhanced testing patterns. + +Demonstrates: +- Mock-based testing with proper isolation +- Error handling scenarios +- Business logic validation +- Performance expectations +""" + +import pytest +from unittest.mock import AsyncMock, Mock +from datetime import datetime, timezone, timedelta + +from domain.issues.models import Issue, Label, IssueState +from domain.issues.exceptions import IssueNotFoundError, IssueValidationError +from tests.utils.test_builders import IssueBuilder, LabelBuilder +from tests.utils.mock_factories import MockRepositoryFactory +from tests.utils.assertions import assert_issue_equal, assert_performance_within_bounds + + +class MockIssueApplicationService: + """Mock application service for testing (simulating the future implementation).""" + + def __init__(self, issue_repository, project_repository, status_service, validation_service): + self.issue_repository = issue_repository + self.project_repository = project_repository + self.status_service = status_service + self.validation_service = validation_service + + async def get_issue_details(self, issue_number: int): + """Get detailed issue information with business logic applied.""" + # Repository call + issue = await self.issue_repository.get_issue(issue_number) + if not issue: + raise IssueNotFoundError(f"Issue {issue_number} not found") + + # Project context + project_info = await self.project_repository.get_issue_project_info(issue_number) + + # Business logic application + kanban_column = self.status_service.determine_kanban_column(issue, project_info) + priority_info = self.status_service.extract_priority_info(issue) + + return { + "issue": issue, + "kanban_column": kanban_column, + "priority_info": priority_info, + "project_context": project_info + } + + async def create_issue(self, title: str, labels: list = None, milestone_id: int = None): + """Create a new issue with validation.""" + # Validate input + self.validation_service.validate_issue_creation({ + "title": title, + "labels": labels or [] + }) + + # Create issue + issue_data = { + "title": title, + "state": IssueState.OPEN, + "labels": [Label(name) for name in (labels or [])], + "created_at": datetime.now(timezone.utc), + "updated_at": datetime.now(timezone.utc) + } + + return await self.issue_repository.create_issue(issue_data) + + async def update_issue_status(self, issue_number: int, new_status: str): + """Update issue status with business rules.""" + issue = await self.issue_repository.get_issue(issue_number) + if not issue: + raise IssueNotFoundError(f"Issue {issue_number} not found") + + # Apply status change business logic + if new_status == "closed": + issue.close() + elif new_status == "reopened" and issue.state == IssueState.CLOSED: + issue.reopen() + + return await self.issue_repository.update_issue(issue) + + +@pytest.fixture +def mock_issue_repository(): + """Provide mock issue repository.""" + return MockRepositoryFactory.create_issue_repository() + + +@pytest.fixture +def mock_project_repository(): + """Provide mock project repository.""" + return MockRepositoryFactory.create_project_repository() + + +@pytest.fixture +def mock_status_service(): + """Provide mock status service.""" + service = Mock() + service.determine_kanban_column = Mock(return_value="Todo") + service.extract_priority_info = Mock(return_value={"level": "Medium", "label": None}) + return service + + +@pytest.fixture +def mock_validation_service(): + """Provide mock validation service.""" + service = Mock() + service.validate_issue_creation = Mock() + return service + + +@pytest.fixture +def application_service(mock_issue_repository, mock_project_repository, mock_status_service, mock_validation_service): + """Provide application service with mocked dependencies.""" + return MockIssueApplicationService( + mock_issue_repository, + mock_project_repository, + mock_status_service, + mock_validation_service + ) + + +class TestIssueApplicationService: + """Test issue application service coordination logic.""" + + @pytest.mark.asyncio + async def test_get_issue_details_success(self, application_service, mock_issue_repository, mock_project_repository, mock_status_service): + """Test successful issue details retrieval.""" + # Arrange + issue = (IssueBuilder() + .with_number(123) + .with_title("Test Issue") + .with_labels("bug", "priority:high") + .build()) + + project_info = {"kanban_columns": ["Todo", "In Progress", "Done"]} + + mock_issue_repository.get_issue.return_value = issue + mock_project_repository.get_issue_project_info.return_value = project_info + mock_status_service.determine_kanban_column.return_value = "Todo" + mock_status_service.extract_priority_info.return_value = {"level": "High", "label": "priority:high"} + + # Act + result = await application_service.get_issue_details(123) + + # Assert + assert result["issue"] == issue + assert result["kanban_column"] == "Todo" + assert result["priority_info"]["level"] == "High" + assert result["project_context"] == project_info + + # Verify repository calls + mock_issue_repository.get_issue.assert_called_once_with(123) + mock_project_repository.get_issue_project_info.assert_called_once_with(123) + + # Verify business logic calls + mock_status_service.determine_kanban_column.assert_called_once_with(issue, project_info) + mock_status_service.extract_priority_info.assert_called_once_with(issue) + + @pytest.mark.asyncio + async def test_get_issue_details_issue_not_found(self, application_service, mock_issue_repository): + """Test handling of non-existent issue.""" + # Arrange + mock_issue_repository.get_issue.return_value = None + + # Act & Assert + with pytest.raises(IssueNotFoundError, match="Issue 999 not found"): + await application_service.get_issue_details(999) + + mock_issue_repository.get_issue.assert_called_once_with(999) + + @pytest.mark.asyncio + async def test_get_issue_details_repository_error(self, application_service, mock_issue_repository): + """Test handling of repository errors.""" + # Arrange + mock_issue_repository.get_issue.side_effect = Exception("Database connection failed") + + # Act & Assert + with pytest.raises(Exception, match="Database connection failed"): + await application_service.get_issue_details(123) + + @pytest.mark.asyncio + async def test_create_issue_success(self, application_service, mock_issue_repository, mock_validation_service): + """Test successful issue creation.""" + # Arrange + created_issue = (IssueBuilder() + .with_number(456) + .with_title("New Issue") + .with_labels("enhancement") + .build()) + + mock_issue_repository.create_issue.return_value = created_issue + + # Act + result = await application_service.create_issue( + title="New Issue", + labels=["enhancement"] + ) + + # Assert + assert result == created_issue + + # Verify validation was called + mock_validation_service.validate_issue_creation.assert_called_once() + call_args = mock_validation_service.validate_issue_creation.call_args[0][0] + assert call_args["title"] == "New Issue" + assert call_args["labels"] == ["enhancement"] + + # Verify repository call + mock_issue_repository.create_issue.assert_called_once() + + @pytest.mark.asyncio + async def test_create_issue_validation_error(self, application_service, mock_validation_service): + """Test issue creation with validation error.""" + # Arrange + mock_validation_service.validate_issue_creation.side_effect = IssueValidationError("Title cannot be empty") + + # Act & Assert + with pytest.raises(IssueValidationError, match="Title cannot be empty"): + await application_service.create_issue(title="") + + @pytest.mark.asyncio + async def test_update_issue_status_to_closed(self, application_service, mock_issue_repository): + """Test updating issue status to closed.""" + # Arrange + issue = (IssueBuilder() + .with_number(123) + .with_title("Issue to Close") + .build()) + + updated_issue = (IssueBuilder() + .with_number(123) + .with_title("Issue to Close") + .as_closed() + .build()) + + mock_issue_repository.get_issue.return_value = issue + mock_issue_repository.update_issue.return_value = updated_issue + + # Act + result = await application_service.update_issue_status(123, "closed") + + # Assert + assert result.state == IssueState.CLOSED + assert result.closed_at is not None + + mock_issue_repository.get_issue.assert_called_once_with(123) + mock_issue_repository.update_issue.assert_called_once() + + @pytest.mark.asyncio + async def test_update_issue_status_reopen_closed_issue(self, application_service, mock_issue_repository): + """Test reopening a closed issue.""" + # Arrange + closed_issue = (IssueBuilder() + .with_number(123) + .with_title("Closed Issue") + .as_closed() + .build()) + + reopened_issue = (IssueBuilder() + .with_number(123) + .with_title("Closed Issue") + .build()) + + mock_issue_repository.get_issue.return_value = closed_issue + mock_issue_repository.update_issue.return_value = reopened_issue + + # Act + result = await application_service.update_issue_status(123, "reopened") + + # Assert + assert result.state == IssueState.OPEN + assert result.closed_at is None + + @pytest.mark.parametrize("issue_number,title,labels,expected_kanban", [ + (1, "Bug Report", ["bug"], "Todo"), + (2, "In Progress Feature", ["enhancement", "status:in-progress"], "In Progress"), + (3, "Blocked Issue", ["bug", "status:blocked"], "Blocked"), + (4, "Ready for Review", ["enhancement", "status:review"], "Review"), + ]) + @pytest.mark.asyncio + async def test_get_issue_details_kanban_column_determination( + self, application_service, mock_issue_repository, mock_project_repository, mock_status_service, + issue_number, title, labels, expected_kanban + ): + """Test kanban column determination for various issue types.""" + # Arrange + issue = (IssueBuilder() + .with_number(issue_number) + .with_title(title) + .with_labels(*labels) + .build()) + + project_info = {"kanban_columns": ["Todo", "In Progress", "Blocked", "Review", "Done"]} + + mock_issue_repository.get_issue.return_value = issue + mock_project_repository.get_issue_project_info.return_value = project_info + mock_status_service.determine_kanban_column.return_value = expected_kanban + + # Act + result = await application_service.get_issue_details(issue_number) + + # Assert + assert result["kanban_column"] == expected_kanban + + @pytest.mark.asyncio + @pytest.mark.performance + async def test_get_issue_details_performance(self, application_service, mock_issue_repository, mock_project_repository, performance_timer): + """Test that issue details retrieval meets performance requirements.""" + # Arrange + issue = (IssueBuilder() + .with_number(123) + .with_title("Performance Test Issue") + .build()) + + mock_issue_repository.get_issue.return_value = issue + mock_project_repository.get_issue_project_info.return_value = {} + + # Act + performance_timer.start() + result = await application_service.get_issue_details(123) + performance_timer.stop() + + # Assert + assert result is not None + assert_performance_within_bounds(performance_timer.elapsed, 0.1, "issue details retrieval") + + @pytest.mark.asyncio + async def test_create_issue_with_complex_labels(self, application_service, mock_issue_repository, mock_validation_service): + """Test creating issue with complex label combinations.""" + # Arrange + labels = ["bug", "priority:critical", "status:new", "frontend", "needs-investigation"] + created_issue = (IssueBuilder() + .with_number(789) + .with_title("Complex Issue") + .with_labels(*labels) + .build()) + + mock_issue_repository.create_issue.return_value = created_issue + + # Act + result = await application_service.create_issue( + title="Complex Issue", + labels=labels + ) + + # Assert + assert result.number == 789 + assert len(result.labels) == 5 + + # Verify all labels are present + label_names = {label.name for label in result.labels} + expected_labels = set(labels) + assert label_names == expected_labels + + @pytest.mark.asyncio + async def test_concurrent_issue_operations(self, application_service, mock_issue_repository): + """Test concurrent issue operations don't interfere.""" + import asyncio + + # Arrange + issues = [ + (IssueBuilder().with_number(i).with_title(f"Issue {i}").build()) + for i in range(1, 6) + ] + + def get_issue_side_effect(number): + return issues[number - 1] + + mock_issue_repository.get_issue.side_effect = get_issue_side_effect + + # Act - Simulate concurrent requests + tasks = [] + for i in range(1, 6): + task = application_service.get_issue_details(i) + tasks.append(task) + + results = await asyncio.gather(*tasks) + + # Assert + assert len(results) == 5 + for i, result in enumerate(results, 1): + assert result["issue"].number == i + assert result["issue"].title == f"Issue {i}" + + @pytest.mark.asyncio + async def test_error_handling_preserves_state(self, application_service, mock_issue_repository, mock_validation_service): + """Test that errors don't leave the application in inconsistent state.""" + # Arrange - First call succeeds, second fails + success_issue = (IssueBuilder().with_number(1).with_title("Success").build()) + mock_issue_repository.create_issue.side_effect = [success_issue, Exception("Database error")] + + # Act - First call should succeed + result1 = await application_service.create_issue("Success Issue") + assert result1.title == "Success" + + # Second call should fail but not affect future calls + with pytest.raises(Exception, match="Database error"): + await application_service.create_issue("Failing Issue") + + # Third call should work if repository is fixed + mock_issue_repository.create_issue.side_effect = None + success_issue2 = (IssueBuilder().with_number(3).with_title("Recovery").build()) + mock_issue_repository.create_issue.return_value = success_issue2 + + result3 = await application_service.create_issue("Recovery Issue") + assert result3.title == "Recovery" \ No newline at end of file diff --git a/tests/unit/infrastructure/__init__.py b/tests/unit/infrastructure/__init__.py new file mode 100644 index 00000000..9ca2d790 --- /dev/null +++ b/tests/unit/infrastructure/__init__.py @@ -0,0 +1,3 @@ +""" +Unit tests for infrastructure components. +""" \ No newline at end of file diff --git a/tests/unit/infrastructure/test_testing_infrastructure.py b/tests/unit/infrastructure/test_testing_infrastructure.py new file mode 100644 index 00000000..6766ea41 --- /dev/null +++ b/tests/unit/infrastructure/test_testing_infrastructure.py @@ -0,0 +1,287 @@ +""" +Tests to validate the testing infrastructure works correctly. + +Demonstrates: +- Test fixtures functionality +- Mock factories usage +- Test builders patterns +- Custom assertions +""" + +import pytest +from pathlib import Path +from datetime import datetime, timezone + +from tests.fixtures.markdown_samples import MarkdownDocumentBuilder, SAMPLE_SIMPLE_DOCUMENT +from tests.fixtures.api_responses import GiteaApiResponseBuilder, SAMPLE_ISSUE_RESPONSE +from tests.utils.test_builders import IssueBuilder, LabelBuilder, create_sample_issue +from tests.utils.mock_factories import MockRepositoryFactory, MockConfigFactory +from tests.utils.assertions import ( + assert_issue_equal, assert_file_exists, assert_directory_exists, + assert_performance_within_bounds, validate_issue_data +) + + +class TestTestingInfrastructure: + """Test that the testing infrastructure components work correctly.""" + + def test_test_workspace_fixture(self, test_workspace): + """Test that test workspace fixture creates proper isolated environment.""" + # Assert + assert_directory_exists(test_workspace) + assert_directory_exists(test_workspace / "documents") + assert_directory_exists(test_workspace / "cache") + assert_directory_exists(test_workspace / "workspaces") + + # Test creating files in workspace + test_file = test_workspace / "test_file.txt" + test_file.write_text("test content") + assert_file_exists(test_file) + + def test_markdown_document_builder(self): + """Test markdown document builder functionality.""" + # Act + doc = (MarkdownDocumentBuilder() + .with_metadata("title", "Test Doc") + .with_metadata("author", "Test Author") + .with_heading("Main Title") + .with_paragraph("This is a test paragraph.") + .with_list(["Item 1", "Item 2", "Item 3"]) + .with_code_block("print('hello')", "python") + .build()) + + # Assert + assert "title: Test Doc" in doc + assert "author: Test Author" in doc + assert "# Main Title" in doc + assert "This is a test paragraph." in doc + assert "- Item 1" in doc + assert "```python" in doc + assert "print('hello')" in doc + + def test_gitea_api_response_builder(self): + """Test Gitea API response builder functionality.""" + # Act + response = (GiteaApiResponseBuilder() + .with_number(42) + .with_title("Test Issue") + .with_labels("bug", "priority:high") + .with_milestone("Version 1.0") + .build()) + + # Assert + assert response["number"] == 42 + assert response["title"] == "Test Issue" + assert len(response["labels"]) == 2 + assert response["labels"][0]["name"] == "bug" + assert response["labels"][1]["name"] == "priority:high" + assert response["milestone"]["title"] == "Version 1.0" + + def test_issue_builder_functionality(self): + """Test issue builder creates proper domain objects.""" + # Act + issue = (IssueBuilder() + .with_number(123) + .with_title("Test Issue") + .as_bug() + .with_priority("high") + .with_status("in-progress") + .build()) + + # Assert + assert issue.number == 123 + assert issue.title == "Test Issue" + assert len(issue.labels) == 3 + + # Check label categorization + categories = issue.categorize_labels() + assert "bug" in categories.type_labels + assert "priority:high" in categories.priority_labels + assert "status:in-progress" in categories.state_labels + + def test_label_builder_functionality(self): + """Test label builder creates correct labels.""" + # Act + state_label = LabelBuilder().as_state_label("blocked").build() + priority_label = LabelBuilder().as_priority_label("critical").build() + type_label = LabelBuilder().as_type_label("bug").build() + custom_label = LabelBuilder().with_custom_name("frontend").build() + + # Assert + assert state_label.name == "status:blocked" + assert state_label.is_state_label() + + assert priority_label.name == "priority:critical" + assert priority_label.is_priority_label() + + assert type_label.name == "bug" + assert type_label.is_type_label() + + assert custom_label.name == "frontend" + assert not custom_label.is_state_label() + assert not custom_label.is_priority_label() + assert not custom_label.is_type_label() + + def test_mock_repository_factory(self): + """Test mock repository factory creates proper mocks.""" + # Act + issue_repo = MockRepositoryFactory.create_issue_repository() + project_repo = MockRepositoryFactory.create_project_repository() + document_repo = MockRepositoryFactory.create_document_repository() + + # Assert + assert hasattr(issue_repo, 'get_issue') + assert hasattr(issue_repo, 'create_issue') + assert hasattr(issue_repo, 'update_issue') + assert hasattr(issue_repo, 'list_issues') + + assert hasattr(project_repo, 'get_project') + assert hasattr(project_repo, 'get_issue_project_info') + + assert hasattr(document_repo, 'store_document') + assert hasattr(document_repo, 'search_content') + + def test_mock_config_factory(self): + """Test mock configuration factory.""" + # Act + config = MockConfigFactory.create_test_config({ + "custom_setting": "test_value", + "workspace_dir": "/custom/workspace" + }) + + # Assert + assert config.workspace_dir == "/custom/workspace" + assert config.custom_setting == "test_value" + assert config.gitea_url == "http://test-gitea.com" # Default value + assert config.log_level == "DEBUG" # Default value + + def test_custom_assertions(self): + """Test custom assertion functions.""" + # Arrange + issue1 = create_sample_issue(1, "Test Issue") + issue2 = create_sample_issue(1, "Test Issue") + + # Act & Assert - Should not raise + assert_issue_equal(issue1, issue2, ignore_timestamps=True) + + # Test performance assertion + execution_time = 0.05 # 50ms + assert_performance_within_bounds(execution_time, 0.1, "test operation") + + # Test data validation + valid_issue_data = { + "number": 1, + "title": "Valid Issue", + "state": "open", + "labels": [], + "created_at": "2025-01-01T00:00:00Z", + "updated_at": "2025-01-01T00:00:00Z" + } + assert validate_issue_data(valid_issue_data) is True + + invalid_issue_data = { + "number": -1, # Invalid number + "title": "", # Empty title + "state": "invalid_state", # Invalid state + } + assert validate_issue_data(invalid_issue_data) is False + + def test_sample_constants(self): + """Test that sample constants are properly formed.""" + # Test markdown samples + assert len(SAMPLE_SIMPLE_DOCUMENT) > 0 + assert "# Simple Document" in SAMPLE_SIMPLE_DOCUMENT + + # Test API response samples + assert SAMPLE_ISSUE_RESPONSE["number"] == 123 + assert SAMPLE_ISSUE_RESPONSE["title"] == "Sample Issue" + assert len(SAMPLE_ISSUE_RESPONSE["labels"]) > 0 + + def test_performance_timer_fixture(self, performance_timer): + """Test performance timer fixture functionality.""" + # Act + performance_timer.start() + + # Simulate some work + import time + time.sleep(0.01) # 10ms + + performance_timer.stop() + + # Assert + assert performance_timer.elapsed > 0 + assert performance_timer.elapsed < 0.1 # Should be much less than 100ms + + def test_test_config_fixture(self, test_config): + """Test test configuration fixture.""" + # Assert + assert "workspace_dir" in test_config + assert "database_path" in test_config + assert "gitea_url" in test_config + assert test_config["gitea_url"] == "http://test-gitea.com" + + def test_sample_markdown_content_fixture(self, sample_markdown_content): + """Test sample markdown content fixture.""" + # Assert + assert "Test Document" in sample_markdown_content + assert "title:" in sample_markdown_content # Front matter + assert "**bold**" in sample_markdown_content + assert "```python" in sample_markdown_content + + def test_sample_issue_data_fixture(self, sample_issue_data): + """Test sample issue data fixture.""" + # Assert + assert sample_issue_data["number"] == 123 + assert sample_issue_data["title"] == "Test Issue" + assert sample_issue_data["state"] == "open" + assert len(sample_issue_data["labels"]) > 0 + + def test_isolated_environment_fixture(self, isolated_environment): + """Test isolated environment fixture.""" + # Assert + assert "MARKITECT_WORKSPACE_DIR" in isolated_environment + assert "MARKITECT_GITEA_URL" in isolated_environment + assert isolated_environment["MARKITECT_GITEA_URL"] == "http://test-gitea.com" + + @pytest.mark.parametrize("priority,expected", [ + ("low", "Low"), + ("medium", "Medium"), + ("high", "High"), + ("critical", "Critical") + ]) + def test_parametrized_testing_works(self, priority, expected): + """Test that parametrized testing works with our infrastructure.""" + # Act + issue = (IssueBuilder() + .with_number(1) + .with_title("Priority Test") + .with_priority(priority) + .build()) + + # Assert + categories = issue.categorize_labels() + priority_labels = categories.priority_labels + assert len(priority_labels) == 1 + assert priority_labels[0] == f"priority:{priority}" + + def test_test_markers_work(self): + """Test that our custom test markers work.""" + # This test validates that the marker configuration is working + # The markers are defined in pytest.ini and should not cause warnings + pass + + @pytest.mark.unit + def test_unit_marker_works(self): + """Test that unit marker works.""" + assert True + + @pytest.mark.performance + def test_performance_marker_works(self, performance_timer): + """Test that performance marker works.""" + performance_timer.start() + # Simulate quick operation + result = sum(range(100)) + performance_timer.stop() + + assert result == 4950 # Mathematical verification + assert performance_timer.elapsed < 0.01 # Should be very fast \ No newline at end of file diff --git a/tests/utils/__init__.py b/tests/utils/__init__.py new file mode 100644 index 00000000..78859ea2 --- /dev/null +++ b/tests/utils/__init__.py @@ -0,0 +1,3 @@ +""" +Test utilities and helpers for MarkiTect tests. +""" \ No newline at end of file diff --git a/tests/utils/assertions.py b/tests/utils/assertions.py new file mode 100644 index 00000000..6d54fce1 --- /dev/null +++ b/tests/utils/assertions.py @@ -0,0 +1,274 @@ +""" +Custom assertions and test utilities for MarkiTect tests. +""" + +import json +from typing import Any, Dict, List, Optional, Union, Callable +from datetime import datetime, timezone +from pathlib import Path +import pytest + + +def assert_issue_equal(actual, expected, ignore_timestamps: bool = False): + """Assert that two Issue objects are equal.""" + assert actual.number == expected.number, f"Issue numbers don't match: {actual.number} != {expected.number}" + assert actual.title == expected.title, f"Issue titles don't match: {actual.title} != {expected.title}" + assert actual.state == expected.state, f"Issue states don't match: {actual.state} != {expected.state}" + assert len(actual.labels) == len(expected.labels), f"Label counts don't match: {len(actual.labels)} != {len(expected.labels)}" + + # Compare labels + actual_label_names = {label.name for label in actual.labels} + expected_label_names = {label.name for label in expected.labels} + assert actual_label_names == expected_label_names, f"Labels don't match: {actual_label_names} != {expected_label_names}" + + if not ignore_timestamps: + assert actual.created_at == expected.created_at, f"Created timestamps don't match" + assert actual.updated_at == expected.updated_at, f"Updated timestamps don't match" + assert actual.closed_at == expected.closed_at, f"Closed timestamps don't match" + + +def assert_project_equal(actual, expected, ignore_timestamps: bool = False): + """Assert that two Project objects are equal.""" + assert actual.name == expected.name, f"Project names don't match: {actual.name} != {expected.name}" + assert actual.description == expected.description, f"Project descriptions don't match" + assert actual.state == expected.state, f"Project states don't match: {actual.state} != {expected.state}" + assert actual.kanban_columns == expected.kanban_columns, f"Kanban columns don't match" + assert len(actual.milestones) == len(expected.milestones), f"Milestone counts don't match" + + if not ignore_timestamps: + assert actual.created_at == expected.created_at, f"Created timestamps don't match" + assert actual.updated_at == expected.updated_at, f"Updated timestamps don't match" + assert actual.archived_at == expected.archived_at, f"Archived timestamps don't match" + + +def assert_milestone_equal(actual, expected): + """Assert that two Milestone objects are equal.""" + assert actual.id == expected.id, f"Milestone IDs don't match: {actual.id} != {expected.id}" + assert actual.title == expected.title, f"Milestone titles don't match: {actual.title} != {expected.title}" + assert actual.description == expected.description, f"Milestone descriptions don't match" + assert actual.state == expected.state, f"Milestone states don't match: {actual.state} != {expected.state}" + assert actual.open_issues == expected.open_issues, f"Open issue counts don't match" + assert actual.closed_issues == expected.closed_issues, f"Closed issue counts don't match" + assert actual.due_date == expected.due_date, f"Due dates don't match" + + +def assert_json_equal(actual: Union[str, Dict], expected: Union[str, Dict]): + """Assert that two JSON objects are equal.""" + if isinstance(actual, str): + actual = json.loads(actual) + if isinstance(expected, str): + expected = json.loads(expected) + + assert actual == expected, f"JSON objects don't match:\nActual: {json.dumps(actual, indent=2)}\nExpected: {json.dumps(expected, indent=2)}" + + +def assert_markdown_structure_equal(actual: str, expected: str): + """Assert that two markdown documents have the same structure (ignoring whitespace differences).""" + actual_lines = [line.strip() for line in actual.split('\n') if line.strip()] + expected_lines = [line.strip() for line in expected.split('\n') if line.strip()] + + assert len(actual_lines) == len(expected_lines), f"Line count mismatch: {len(actual_lines)} != {len(expected_lines)}" + + for i, (actual_line, expected_line) in enumerate(zip(actual_lines, expected_lines)): + assert actual_line == expected_line, f"Line {i+1} mismatch:\nActual: {actual_line}\nExpected: {expected_line}" + + +def assert_file_exists(file_path: Union[str, Path], message: str = None): + """Assert that a file exists.""" + path = Path(file_path) + assert path.exists(), message or f"File does not exist: {path}" + assert path.is_file(), message or f"Path is not a file: {path}" + + +def assert_directory_exists(dir_path: Union[str, Path], message: str = None): + """Assert that a directory exists.""" + path = Path(dir_path) + assert path.exists(), message or f"Directory does not exist: {path}" + assert path.is_dir(), message or f"Path is not a directory: {path}" + + +def assert_file_contains(file_path: Union[str, Path], content: str, message: str = None): + """Assert that a file contains specific content.""" + path = Path(file_path) + assert_file_exists(path) + + file_content = path.read_text() + assert content in file_content, message or f"File {path} does not contain: {content}" + + +def assert_file_not_contains(file_path: Union[str, Path], content: str, message: str = None): + """Assert that a file does not contain specific content.""" + path = Path(file_path) + assert_file_exists(path) + + file_content = path.read_text() + assert content not in file_content, message or f"File {path} unexpectedly contains: {content}" + + +def assert_time_approximately_equal(actual: datetime, expected: datetime, tolerance_seconds: int = 1): + """Assert that two datetime objects are approximately equal within tolerance.""" + diff = abs((actual - expected).total_seconds()) + assert diff <= tolerance_seconds, f"Times differ by {diff} seconds, tolerance is {tolerance_seconds}" + + +def assert_performance_within_bounds(execution_time: float, max_time: float, operation: str = "operation"): + """Assert that an operation completed within performance bounds.""" + assert execution_time <= max_time, f"{operation} took {execution_time:.3f}s, expected <= {max_time:.3f}s" + + +def assert_memory_usage_within_bounds(memory_usage_mb: float, max_memory_mb: float, operation: str = "operation"): + """Assert that memory usage is within bounds.""" + assert memory_usage_mb <= max_memory_mb, f"{operation} used {memory_usage_mb:.2f}MB, expected <= {max_memory_mb:.2f}MB" + + +def assert_mock_called_with_pattern(mock, pattern: Callable[[Any], bool], message: str = None): + """Assert that a mock was called with arguments matching a pattern.""" + found_match = False + for call in mock.call_args_list: + if pattern(call): + found_match = True + break + + assert found_match, message or f"Mock was not called with expected pattern. Calls: {mock.call_args_list}" + + +def assert_sequence_equal(actual: List[Any], expected: List[Any], compare_fn: Optional[Callable[[Any, Any], bool]] = None): + """Assert that two sequences are equal using optional custom comparison.""" + assert len(actual) == len(expected), f"Sequence lengths don't match: {len(actual)} != {len(expected)}" + + for i, (actual_item, expected_item) in enumerate(zip(actual, expected)): + if compare_fn: + assert compare_fn(actual_item, expected_item), f"Items at index {i} don't match" + else: + assert actual_item == expected_item, f"Items at index {i} don't match: {actual_item} != {expected_item}" + + +def assert_contains_all(container: Union[List, Dict, str], items: List[Any], message: str = None): + """Assert that a container contains all specified items.""" + missing_items = [] + for item in items: + if item not in container: + missing_items.append(item) + + assert not missing_items, message or f"Container missing items: {missing_items}" + + +def assert_contains_none(container: Union[List, Dict, str], items: List[Any], message: str = None): + """Assert that a container contains none of the specified items.""" + found_items = [] + for item in items: + if item in container: + found_items.append(item) + + assert not found_items, message or f"Container unexpectedly contains items: {found_items}" + + +def assert_label_categories_valid(categories): + """Assert that label categories are valid and properly separated.""" + from domain.issues.models import LabelCategories + + assert isinstance(categories, LabelCategories), "Categories must be LabelCategories instance" + + # Check for overlaps between categories + all_labels = ( + categories.state_labels + + categories.priority_labels + + categories.type_labels + + categories.other_labels + ) + + # No label should appear in multiple categories + seen_labels = set() + for label in all_labels: + assert label not in seen_labels, f"Label '{label}' appears in multiple categories" + seen_labels.add(label) + + +def assert_kanban_column_valid(column: str, valid_columns: List[str]): + """Assert that a kanban column is valid.""" + assert column in valid_columns, f"Invalid kanban column '{column}'. Valid columns: {valid_columns}" + + +def assert_business_rule_violated(exception_type: type, exception_message_pattern: str = None): + """Context manager to assert that a business rule violation occurs.""" + return pytest.raises(exception_type, match=exception_message_pattern) + + +def assert_async_operation_succeeds(async_func: Callable, timeout: float = 30.0): + """Assert that an async operation succeeds within timeout.""" + import asyncio + + async def run_with_timeout(): + return await asyncio.wait_for(async_func(), timeout=timeout) + + try: + result = asyncio.run(run_with_timeout()) + return result + except asyncio.TimeoutError: + pytest.fail(f"Async operation timed out after {timeout} seconds") + except Exception as e: + pytest.fail(f"Async operation failed: {e}") + + +# Custom pytest markers for different types of assertions +def mark_performance_test(max_time: float = None, max_memory_mb: float = None): + """Mark a test as a performance test with optional bounds.""" + markers = [pytest.mark.performance] + if max_time: + markers.append(pytest.mark.parametrize("max_execution_time", [max_time])) + if max_memory_mb: + markers.append(pytest.mark.parametrize("max_memory_usage", [max_memory_mb])) + return markers + + +def mark_integration_test(external_service: str = None): + """Mark a test as an integration test.""" + markers = [pytest.mark.integration] + if external_service: + markers.append(pytest.mark.parametrize("external_service", [external_service])) + return markers + + +# Test data validation helpers +def validate_issue_data(data: Dict[str, Any]) -> bool: + """Validate that data represents a valid issue.""" + required_fields = ["number", "title", "state", "labels", "created_at", "updated_at"] + for field in required_fields: + if field not in data: + return False + + if not isinstance(data["number"], int) or data["number"] <= 0: + return False + + if not isinstance(data["title"], str) or not data["title"].strip(): + return False + + if data["state"] not in ["open", "closed"]: + return False + + if not isinstance(data["labels"], list): + return False + + return True + + +def validate_project_data(data: Dict[str, Any]) -> bool: + """Validate that data represents a valid project.""" + required_fields = ["name", "state", "milestones", "kanban_columns", "created_at", "updated_at"] + for field in required_fields: + if field not in data: + return False + + if not isinstance(data["name"], str) or not data["name"].strip(): + return False + + if data["state"] not in ["active", "archived"]: + return False + + if not isinstance(data["milestones"], list): + return False + + if not isinstance(data["kanban_columns"], list): + return False + + return True \ No newline at end of file diff --git a/tests/utils/mock_factories.py b/tests/utils/mock_factories.py new file mode 100644 index 00000000..ac77673e --- /dev/null +++ b/tests/utils/mock_factories.py @@ -0,0 +1,346 @@ +""" +Mock factories for creating test doubles and mocks. +""" + +from unittest.mock import Mock, AsyncMock, MagicMock +from typing import Dict, Any, List, Optional, Callable +import asyncio +from datetime import datetime, timezone + + +class MockRepositoryFactory: + """Factory for creating mock repository objects.""" + + @staticmethod + def create_issue_repository() -> Mock: + """Create a mock issue repository.""" + repo = AsyncMock() + repo.get_issue = AsyncMock() + repo.create_issue = AsyncMock() + repo.update_issue = AsyncMock() + repo.delete_issue = AsyncMock() + repo.list_issues = AsyncMock() + repo.search_issues = AsyncMock() + return repo + + @staticmethod + def create_project_repository() -> Mock: + """Create a mock project repository.""" + repo = AsyncMock() + repo.get_project = AsyncMock() + repo.create_project = AsyncMock() + repo.update_project = AsyncMock() + repo.delete_project = AsyncMock() + repo.list_projects = AsyncMock() + repo.get_issue_project_info = AsyncMock() + return repo + + @staticmethod + def create_document_repository() -> Mock: + """Create a mock document repository.""" + repo = AsyncMock() + repo.store_document = AsyncMock() + repo.get_document = AsyncMock() + repo.update_document = AsyncMock() + repo.delete_document = AsyncMock() + repo.list_documents = AsyncMock() + repo.search_content = AsyncMock() + return repo + + +class MockServiceFactory: + """Factory for creating mock service objects.""" + + @staticmethod + def create_http_client() -> Mock: + """Create a mock HTTP client.""" + client = AsyncMock() + + # Default successful response + mock_response = AsyncMock() + mock_response.status = 200 + mock_response.json = AsyncMock(return_value={"status": "success"}) + mock_response.text = AsyncMock(return_value='{"status": "success"}') + mock_response.headers = {"Content-Type": "application/json"} + + client.get.return_value = mock_response + client.post.return_value = mock_response + client.put.return_value = mock_response + client.delete.return_value = mock_response + client.close = AsyncMock() + + return client + + @staticmethod + def create_database_connection() -> Mock: + """Create a mock database connection.""" + conn = Mock() + cursor = Mock() + + conn.cursor.return_value = cursor + conn.execute.return_value = cursor + conn.commit = Mock() + conn.rollback = Mock() + conn.close = Mock() + + # Default empty results + cursor.fetchone.return_value = None + cursor.fetchall.return_value = [] + cursor.fetchmany.return_value = [] + cursor.lastrowid = 1 + cursor.rowcount = 0 + + return conn + + @staticmethod + def create_cache_manager() -> Mock: + """Create a mock cache manager.""" + cache = AsyncMock() + cache.get = AsyncMock(return_value=None) + cache.set = AsyncMock() + cache.delete = AsyncMock() + cache.clear = AsyncMock() + cache.exists = AsyncMock(return_value=False) + cache.expire = AsyncMock() + return cache + + @staticmethod + def create_file_system() -> Mock: + """Create a mock file system.""" + fs = Mock() + fs.read_file = Mock(return_value="mock file content") + fs.write_file = Mock() + fs.delete_file = Mock() + fs.exists = Mock(return_value=True) + fs.list_files = Mock(return_value=[]) + fs.create_directory = Mock() + fs.delete_directory = Mock() + return fs + + +class MockConfigFactory: + """Factory for creating mock configuration objects.""" + + @staticmethod + def create_test_config(overrides: Optional[Dict[str, Any]] = None) -> Mock: + """Create a mock configuration object.""" + config = Mock() + + # Default configuration values + defaults = { + "workspace_dir": "/tmp/test-workspace", + "database_path": "/tmp/test.db", + "cache_dir": "/tmp/test-cache", + "gitea_url": "http://test-gitea.com", + "gitea_token": "test-token", + "repo_owner": "test", + "repo_name": "repo", + "log_level": "DEBUG", + "max_retries": 3, + "timeout": 30, + "batch_size": 100 + } + + if overrides: + defaults.update(overrides) + + for key, value in defaults.items(): + setattr(config, key, value) + + return config + + +class MockEventFactory: + """Factory for creating mock event objects and event handlers.""" + + @staticmethod + def create_event_emitter() -> Mock: + """Create a mock event emitter.""" + emitter = Mock() + emitter.emit = Mock() + emitter.on = Mock() + emitter.off = Mock() + emitter.once = Mock() + emitter.listeners = Mock(return_value=[]) + return emitter + + @staticmethod + def create_event_handler() -> Mock: + """Create a mock event handler.""" + handler = Mock() + handler.handle = AsyncMock() + handler.can_handle = Mock(return_value=True) + handler.priority = 1 + return handler + + +class MockNetworkFactory: + """Factory for creating network-related mocks.""" + + @staticmethod + def create_rate_limiter() -> Mock: + """Create a mock rate limiter.""" + limiter = AsyncMock() + limiter.acquire = AsyncMock() + limiter.release = AsyncMock() + limiter.is_available = AsyncMock(return_value=True) + limiter.reset = AsyncMock() + return limiter + + @staticmethod + def create_circuit_breaker() -> Mock: + """Create a mock circuit breaker.""" + breaker = Mock() + breaker.call = AsyncMock() + breaker.is_open = Mock(return_value=False) + breaker.is_closed = Mock(return_value=True) + breaker.is_half_open = Mock(return_value=False) + breaker.reset = Mock() + return breaker + + +class MockTimeFactory: + """Factory for creating time-related mocks.""" + + @staticmethod + def create_timer() -> Mock: + """Create a mock timer.""" + timer = Mock() + timer.start = Mock() + timer.stop = Mock() + timer.elapsed = 0.1 + timer.reset = Mock() + return timer + + @staticmethod + def create_scheduler() -> Mock: + """Create a mock task scheduler.""" + scheduler = AsyncMock() + scheduler.schedule = AsyncMock() + scheduler.cancel = AsyncMock() + scheduler.is_scheduled = Mock(return_value=False) + scheduler.start = AsyncMock() + scheduler.stop = AsyncMock() + return scheduler + + +class MockResponseBuilder: + """Builder for creating mock HTTP responses.""" + + def __init__(self): + self.status = 200 + self.headers = {"Content-Type": "application/json"} + self.body = {"status": "success"} + self.delay = 0.0 + self.exception = None + + def with_status(self, status: int) -> "MockResponseBuilder": + """Set response status code.""" + self.status = status + return self + + def with_headers(self, headers: Dict[str, str]) -> "MockResponseBuilder": + """Set response headers.""" + self.headers.update(headers) + return self + + def with_json_body(self, body: Dict[str, Any]) -> "MockResponseBuilder": + """Set JSON response body.""" + self.body = body + self.headers["Content-Type"] = "application/json" + return self + + def with_text_body(self, body: str) -> "MockResponseBuilder": + """Set text response body.""" + self.body = body + self.headers["Content-Type"] = "text/plain" + return self + + def with_delay(self, delay: float) -> "MockResponseBuilder": + """Add delay to response.""" + self.delay = delay + return self + + def with_exception(self, exception: Exception) -> "MockResponseBuilder": + """Make response raise an exception.""" + self.exception = exception + return self + + def build(self) -> Mock: + """Build the mock response.""" + if self.exception: + # Create a coroutine that raises the exception + async def raise_exception(): + await asyncio.sleep(self.delay) + raise self.exception + return raise_exception() + + response = AsyncMock() + response.status = self.status + response.headers = self.headers + + if isinstance(self.body, dict): + response.json = AsyncMock(return_value=self.body) + response.text = AsyncMock(return_value=str(self.body)) + else: + response.text = AsyncMock(return_value=self.body) + response.json = AsyncMock(side_effect=ValueError("Not JSON")) + + # Add delay if specified + if self.delay > 0: + original_json = response.json + original_text = response.text + + async def delayed_json(): + await asyncio.sleep(self.delay) + return await original_json() + + async def delayed_text(): + await asyncio.sleep(self.delay) + return await original_text() + + response.json = delayed_json + response.text = delayed_text + + return response + + +# Convenience functions +def create_failing_mock(exception: Exception) -> Mock: + """Create a mock that always raises the specified exception.""" + mock = Mock() + mock.side_effect = exception + return mock + + +def create_async_failing_mock(exception: Exception) -> AsyncMock: + """Create an async mock that always raises the specified exception.""" + mock = AsyncMock() + mock.side_effect = exception + return mock + + +def create_sequence_mock(values: List[Any]) -> Mock: + """Create a mock that returns values in sequence.""" + mock = Mock() + mock.side_effect = values + return mock + + +def create_async_sequence_mock(values: List[Any]) -> AsyncMock: + """Create an async mock that returns values in sequence.""" + mock = AsyncMock() + mock.side_effect = values + return mock + + +def create_conditional_mock(condition: Callable[..., bool], true_value: Any, false_value: Any) -> Mock: + """Create a mock that returns different values based on a condition.""" + def side_effect(*args, **kwargs): + if condition(*args, **kwargs): + return true_value + return false_value + + mock = Mock() + mock.side_effect = side_effect + return mock \ No newline at end of file diff --git a/tests/utils/test_builders.py b/tests/utils/test_builders.py new file mode 100644 index 00000000..2f59cbc4 --- /dev/null +++ b/tests/utils/test_builders.py @@ -0,0 +1,338 @@ +""" +Test data builders using the builder pattern for creating domain objects. +""" + +from datetime import datetime, timezone +from typing import List, Optional, Dict, Any +from domain.issues.models import Issue, Label, IssueState, LabelCategories +from domain.projects.models import Project, Milestone, ProjectState + + +class IssueBuilder: + """Builder for creating Issue domain objects for testing.""" + + def __init__(self): + self.number = 1 + self.title = "Test Issue" + self.state = IssueState.OPEN + self.labels: List[Label] = [] + self.created_at = datetime.now(timezone.utc) + self.updated_at = datetime.now(timezone.utc) + self.closed_at: Optional[datetime] = None + + def with_number(self, number: int) -> "IssueBuilder": + """Set issue number.""" + self.number = number + return self + + def with_title(self, title: str) -> "IssueBuilder": + """Set issue title.""" + self.title = title + return self + + def with_state(self, state: IssueState) -> "IssueBuilder": + """Set issue state.""" + self.state = state + if state == IssueState.CLOSED and self.closed_at is None: + self.closed_at = datetime.now(timezone.utc) + return self + + def with_labels(self, *label_names: str) -> "IssueBuilder": + """Add labels to the issue.""" + self.labels = [Label(name) for name in label_names] + return self + + def with_label_objects(self, *labels: Label) -> "IssueBuilder": + """Add label objects to the issue.""" + self.labels = list(labels) + return self + + def with_timestamps(self, created_at: datetime, updated_at: datetime, closed_at: Optional[datetime] = None) -> "IssueBuilder": + """Set issue timestamps.""" + self.created_at = created_at + self.updated_at = updated_at + self.closed_at = closed_at + return self + + def as_closed(self, closed_at: Optional[datetime] = None) -> "IssueBuilder": + """Mark issue as closed.""" + self.state = IssueState.CLOSED + self.closed_at = closed_at or datetime.now(timezone.utc) + return self + + def as_bug(self) -> "IssueBuilder": + """Add bug label.""" + self.labels.append(Label("bug")) + return self + + def as_enhancement(self) -> "IssueBuilder": + """Add enhancement label.""" + self.labels.append(Label("enhancement")) + return self + + def with_priority(self, priority: str) -> "IssueBuilder": + """Add priority label.""" + if priority not in ["low", "medium", "high", "critical"]: + raise ValueError("Priority must be one of: low, medium, high, critical") + self.labels.append(Label(f"priority:{priority}")) + return self + + def with_status(self, status: str) -> "IssueBuilder": + """Add status label.""" + self.labels.append(Label(f"status:{status}")) + return self + + def build(self) -> Issue: + """Build the Issue object.""" + return Issue( + number=self.number, + title=self.title, + state=self.state, + labels=self.labels, + created_at=self.created_at, + updated_at=self.updated_at, + closed_at=self.closed_at + ) + + +class LabelBuilder: + """Builder for creating Label objects for testing.""" + + def __init__(self, name: str = "test-label"): + self.name = name + + def as_state_label(self, status: str) -> "LabelBuilder": + """Create a state label.""" + self.name = f"status:{status}" + return self + + def as_priority_label(self, priority: str) -> "LabelBuilder": + """Create a priority label.""" + if priority not in ["low", "medium", "high", "critical"]: + raise ValueError("Priority must be one of: low, medium, high, critical") + self.name = f"priority:{priority}" + return self + + def as_type_label(self, type_name: str) -> "LabelBuilder": + """Create a type label.""" + if type_name not in ["bug", "enhancement", "feature", "documentation"]: + raise ValueError("Type must be one of: bug, enhancement, feature, documentation") + self.name = type_name + return self + + def with_custom_name(self, name: str) -> "LabelBuilder": + """Set custom label name.""" + self.name = name + return self + + def build(self) -> Label: + """Build the Label object.""" + return Label(self.name) + + +class MilestoneBuilder: + """Builder for creating Milestone objects for testing.""" + + def __init__(self): + self.id = 1 + self.title = "Test Milestone" + self.description: Optional[str] = None + self.due_date: Optional[datetime] = None + self.state = "open" + self.open_issues = 0 + self.closed_issues = 0 + + def with_id(self, id: int) -> "MilestoneBuilder": + """Set milestone ID.""" + self.id = id + return self + + def with_title(self, title: str) -> "MilestoneBuilder": + """Set milestone title.""" + self.title = title + return self + + def with_description(self, description: str) -> "MilestoneBuilder": + """Set milestone description.""" + self.description = description + return self + + def with_due_date(self, due_date: datetime) -> "MilestoneBuilder": + """Set milestone due date.""" + self.due_date = due_date + return self + + def with_state(self, state: str) -> "MilestoneBuilder": + """Set milestone state.""" + if state not in ["open", "closed"]: + raise ValueError("State must be 'open' or 'closed'") + self.state = state + return self + + def with_issue_counts(self, open_issues: int, closed_issues: int) -> "MilestoneBuilder": + """Set issue counts.""" + self.open_issues = open_issues + self.closed_issues = closed_issues + return self + + def as_overdue(self) -> "MilestoneBuilder": + """Make milestone overdue.""" + from datetime import timedelta + self.due_date = datetime.now(timezone.utc) - timedelta(days=1) + return self + + def as_completed(self) -> "MilestoneBuilder": + """Mark milestone as completed.""" + self.state = "closed" + return self + + def build(self) -> Milestone: + """Build the Milestone object.""" + return Milestone( + id=self.id, + title=self.title, + description=self.description, + due_date=self.due_date, + state=self.state, + open_issues=self.open_issues, + closed_issues=self.closed_issues + ) + + +class ProjectBuilder: + """Builder for creating Project objects for testing.""" + + def __init__(self): + self.name = "Test Project" + self.description: Optional[str] = None + self.state = ProjectState.ACTIVE + self.milestones: List[Milestone] = [] + self.kanban_columns = ["Todo", "In Progress", "Done"] + self.created_at = datetime.now(timezone.utc) + self.updated_at = datetime.now(timezone.utc) + self.archived_at: Optional[datetime] = None + + def with_name(self, name: str) -> "ProjectBuilder": + """Set project name.""" + self.name = name + return self + + def with_description(self, description: str) -> "ProjectBuilder": + """Set project description.""" + self.description = description + return self + + def with_state(self, state: ProjectState) -> "ProjectBuilder": + """Set project state.""" + self.state = state + if state == ProjectState.ARCHIVED and self.archived_at is None: + self.archived_at = datetime.now(timezone.utc) + return self + + def with_milestones(self, *milestones: Milestone) -> "ProjectBuilder": + """Add milestones to the project.""" + self.milestones = list(milestones) + return self + + def with_kanban_columns(self, *columns: str) -> "ProjectBuilder": + """Set kanban columns.""" + self.kanban_columns = list(columns) + return self + + def with_timestamps(self, created_at: datetime, updated_at: datetime, archived_at: Optional[datetime] = None) -> "ProjectBuilder": + """Set project timestamps.""" + self.created_at = created_at + self.updated_at = updated_at + self.archived_at = archived_at + return self + + def as_archived(self, archived_at: Optional[datetime] = None) -> "ProjectBuilder": + """Mark project as archived.""" + self.state = ProjectState.ARCHIVED + self.archived_at = archived_at or datetime.now(timezone.utc) + return self + + def build(self) -> Project: + """Build the Project object.""" + return Project( + name=self.name, + description=self.description, + state=self.state, + milestones=self.milestones, + kanban_columns=self.kanban_columns, + created_at=self.created_at, + updated_at=self.updated_at, + archived_at=self.archived_at + ) + + +# Convenience functions for common test scenarios +def create_sample_issue(number: int = 1, title: str = "Sample Issue") -> Issue: + """Create a basic sample issue for testing.""" + return (IssueBuilder() + .with_number(number) + .with_title(title) + .as_bug() + .with_priority("medium") + .with_status("new") + .build()) + + +def create_in_progress_issue(number: int = 1) -> Issue: + """Create an in-progress issue for testing.""" + return (IssueBuilder() + .with_number(number) + .with_title("In Progress Issue") + .as_enhancement() + .with_priority("high") + .with_status("in-progress") + .build()) + + +def create_closed_issue(number: int = 1) -> Issue: + """Create a closed issue for testing.""" + return (IssueBuilder() + .with_number(number) + .with_title("Closed Issue") + .as_bug() + .with_priority("low") + .as_closed() + .build()) + + +def create_sample_milestone(id: int = 1, title: str = "Sample Milestone") -> Milestone: + """Create a basic sample milestone for testing.""" + return (MilestoneBuilder() + .with_id(id) + .with_title(title) + .with_description(f"Description for {title}") + .with_issue_counts(3, 7) + .build()) + + +def create_sample_project(name: str = "Sample Project") -> Project: + """Create a basic sample project for testing.""" + milestone1 = create_sample_milestone(1, "Version 1.0") + milestone2 = create_sample_milestone(2, "Version 2.0") + + return (ProjectBuilder() + .with_name(name) + .with_description(f"Description for {name}") + .with_milestones(milestone1, milestone2) + .build()) + + +def create_complex_issue_with_labels() -> Issue: + """Create an issue with various types of labels for testing categorization.""" + return (IssueBuilder() + .with_number(42) + .with_title("Complex Issue with Multiple Labels") + .with_labels( + "bug", # type label + "priority:critical", # priority label + "status:blocked", # state label + "frontend", # other label + "needs-testing", # other label + "enhancement" # type label (multiple types allowed) + ) + .build()) \ No newline at end of file