Some checks failed
Test Suite / unit-tests (3.11) (push) Has been cancelled
Test Suite / unit-tests (3.12) (push) Has been cancelled
Test Suite / integration-tests (push) Has been cancelled
Test Suite / e2e-tests (push) Has been cancelled
Test Suite / performance-tests (push) Has been cancelled
Test Suite / code-quality (push) Has been cancelled
Test Suite / security-scan (push) Has been cancelled
Test Suite / test-summary (push) Has been cancelled
Establishes robust testing framework with clean architecture patterns: ## Phase 1: Test Infrastructure Foundation - Global test configuration with pytest.ini and conftest.py - Isolated test workspaces and environment management - Comprehensive fixture library for all test types - Test requirements and dependency management ## Phase 2: Advanced Testing Patterns - Test builders using builder pattern for domain objects - Mock factories for repositories, services, and configs - API response builders for external system simulation - Enhanced unit tests with proper mocking and isolation ## Phase 3: Test Performance and Quality - Performance testing framework with benchmarks - Memory usage monitoring and leak detection - Custom assertions for domain-specific validation - Parametrized testing for comprehensive coverage ## Phase 4: CI/CD Integration - GitHub Actions workflow for automated testing - Multi-stage testing: unit → integration → e2e → performance - Code quality checks with flake8, mypy, black, isort - Security scanning with safety and bandit ## Testing Architecture Benefits ✅ 100+ new test infrastructure components ✅ Standardized test organization (unit/integration/e2e) ✅ Mock-based testing with no external dependencies ✅ Performance regression detection ✅ Comprehensive fixture library ✅ CI/CD pipeline with quality gates The testing framework supports the domain logic separation and provides a solid foundation for maintaining high code quality as the system evolves. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
346 lines
10 KiB
Python
346 lines
10 KiB
Python
"""
|
|
Mock factories for creating test doubles and mocks.
|
|
"""
|
|
|
|
from unittest.mock import Mock, AsyncMock, MagicMock
|
|
from typing import Dict, Any, List, Optional, Callable
|
|
import asyncio
|
|
from datetime import datetime, timezone
|
|
|
|
|
|
class MockRepositoryFactory:
|
|
"""Factory for creating mock repository objects."""
|
|
|
|
@staticmethod
|
|
def create_issue_repository() -> Mock:
|
|
"""Create a mock issue repository."""
|
|
repo = AsyncMock()
|
|
repo.get_issue = AsyncMock()
|
|
repo.create_issue = AsyncMock()
|
|
repo.update_issue = AsyncMock()
|
|
repo.delete_issue = AsyncMock()
|
|
repo.list_issues = AsyncMock()
|
|
repo.search_issues = AsyncMock()
|
|
return repo
|
|
|
|
@staticmethod
|
|
def create_project_repository() -> Mock:
|
|
"""Create a mock project repository."""
|
|
repo = AsyncMock()
|
|
repo.get_project = AsyncMock()
|
|
repo.create_project = AsyncMock()
|
|
repo.update_project = AsyncMock()
|
|
repo.delete_project = AsyncMock()
|
|
repo.list_projects = AsyncMock()
|
|
repo.get_issue_project_info = AsyncMock()
|
|
return repo
|
|
|
|
@staticmethod
|
|
def create_document_repository() -> Mock:
|
|
"""Create a mock document repository."""
|
|
repo = AsyncMock()
|
|
repo.store_document = AsyncMock()
|
|
repo.get_document = AsyncMock()
|
|
repo.update_document = AsyncMock()
|
|
repo.delete_document = AsyncMock()
|
|
repo.list_documents = AsyncMock()
|
|
repo.search_content = AsyncMock()
|
|
return repo
|
|
|
|
|
|
class MockServiceFactory:
|
|
"""Factory for creating mock service objects."""
|
|
|
|
@staticmethod
|
|
def create_http_client() -> Mock:
|
|
"""Create a mock HTTP client."""
|
|
client = AsyncMock()
|
|
|
|
# Default successful response
|
|
mock_response = AsyncMock()
|
|
mock_response.status = 200
|
|
mock_response.json = AsyncMock(return_value={"status": "success"})
|
|
mock_response.text = AsyncMock(return_value='{"status": "success"}')
|
|
mock_response.headers = {"Content-Type": "application/json"}
|
|
|
|
client.get.return_value = mock_response
|
|
client.post.return_value = mock_response
|
|
client.put.return_value = mock_response
|
|
client.delete.return_value = mock_response
|
|
client.close = AsyncMock()
|
|
|
|
return client
|
|
|
|
@staticmethod
|
|
def create_database_connection() -> Mock:
|
|
"""Create a mock database connection."""
|
|
conn = Mock()
|
|
cursor = Mock()
|
|
|
|
conn.cursor.return_value = cursor
|
|
conn.execute.return_value = cursor
|
|
conn.commit = Mock()
|
|
conn.rollback = Mock()
|
|
conn.close = Mock()
|
|
|
|
# Default empty results
|
|
cursor.fetchone.return_value = None
|
|
cursor.fetchall.return_value = []
|
|
cursor.fetchmany.return_value = []
|
|
cursor.lastrowid = 1
|
|
cursor.rowcount = 0
|
|
|
|
return conn
|
|
|
|
@staticmethod
|
|
def create_cache_manager() -> Mock:
|
|
"""Create a mock cache manager."""
|
|
cache = AsyncMock()
|
|
cache.get = AsyncMock(return_value=None)
|
|
cache.set = AsyncMock()
|
|
cache.delete = AsyncMock()
|
|
cache.clear = AsyncMock()
|
|
cache.exists = AsyncMock(return_value=False)
|
|
cache.expire = AsyncMock()
|
|
return cache
|
|
|
|
@staticmethod
|
|
def create_file_system() -> Mock:
|
|
"""Create a mock file system."""
|
|
fs = Mock()
|
|
fs.read_file = Mock(return_value="mock file content")
|
|
fs.write_file = Mock()
|
|
fs.delete_file = Mock()
|
|
fs.exists = Mock(return_value=True)
|
|
fs.list_files = Mock(return_value=[])
|
|
fs.create_directory = Mock()
|
|
fs.delete_directory = Mock()
|
|
return fs
|
|
|
|
|
|
class MockConfigFactory:
|
|
"""Factory for creating mock configuration objects."""
|
|
|
|
@staticmethod
|
|
def create_test_config(overrides: Optional[Dict[str, Any]] = None) -> Mock:
|
|
"""Create a mock configuration object."""
|
|
config = Mock()
|
|
|
|
# Default configuration values
|
|
defaults = {
|
|
"workspace_dir": "/tmp/test-workspace",
|
|
"database_path": "/tmp/test.db",
|
|
"cache_dir": "/tmp/test-cache",
|
|
"gitea_url": "http://test-gitea.com",
|
|
"gitea_token": "test-token",
|
|
"repo_owner": "test",
|
|
"repo_name": "repo",
|
|
"log_level": "DEBUG",
|
|
"max_retries": 3,
|
|
"timeout": 30,
|
|
"batch_size": 100
|
|
}
|
|
|
|
if overrides:
|
|
defaults.update(overrides)
|
|
|
|
for key, value in defaults.items():
|
|
setattr(config, key, value)
|
|
|
|
return config
|
|
|
|
|
|
class MockEventFactory:
|
|
"""Factory for creating mock event objects and event handlers."""
|
|
|
|
@staticmethod
|
|
def create_event_emitter() -> Mock:
|
|
"""Create a mock event emitter."""
|
|
emitter = Mock()
|
|
emitter.emit = Mock()
|
|
emitter.on = Mock()
|
|
emitter.off = Mock()
|
|
emitter.once = Mock()
|
|
emitter.listeners = Mock(return_value=[])
|
|
return emitter
|
|
|
|
@staticmethod
|
|
def create_event_handler() -> Mock:
|
|
"""Create a mock event handler."""
|
|
handler = Mock()
|
|
handler.handle = AsyncMock()
|
|
handler.can_handle = Mock(return_value=True)
|
|
handler.priority = 1
|
|
return handler
|
|
|
|
|
|
class MockNetworkFactory:
|
|
"""Factory for creating network-related mocks."""
|
|
|
|
@staticmethod
|
|
def create_rate_limiter() -> Mock:
|
|
"""Create a mock rate limiter."""
|
|
limiter = AsyncMock()
|
|
limiter.acquire = AsyncMock()
|
|
limiter.release = AsyncMock()
|
|
limiter.is_available = AsyncMock(return_value=True)
|
|
limiter.reset = AsyncMock()
|
|
return limiter
|
|
|
|
@staticmethod
|
|
def create_circuit_breaker() -> Mock:
|
|
"""Create a mock circuit breaker."""
|
|
breaker = Mock()
|
|
breaker.call = AsyncMock()
|
|
breaker.is_open = Mock(return_value=False)
|
|
breaker.is_closed = Mock(return_value=True)
|
|
breaker.is_half_open = Mock(return_value=False)
|
|
breaker.reset = Mock()
|
|
return breaker
|
|
|
|
|
|
class MockTimeFactory:
|
|
"""Factory for creating time-related mocks."""
|
|
|
|
@staticmethod
|
|
def create_timer() -> Mock:
|
|
"""Create a mock timer."""
|
|
timer = Mock()
|
|
timer.start = Mock()
|
|
timer.stop = Mock()
|
|
timer.elapsed = 0.1
|
|
timer.reset = Mock()
|
|
return timer
|
|
|
|
@staticmethod
|
|
def create_scheduler() -> Mock:
|
|
"""Create a mock task scheduler."""
|
|
scheduler = AsyncMock()
|
|
scheduler.schedule = AsyncMock()
|
|
scheduler.cancel = AsyncMock()
|
|
scheduler.is_scheduled = Mock(return_value=False)
|
|
scheduler.start = AsyncMock()
|
|
scheduler.stop = AsyncMock()
|
|
return scheduler
|
|
|
|
|
|
class MockResponseBuilder:
|
|
"""Builder for creating mock HTTP responses."""
|
|
|
|
def __init__(self):
|
|
self.status = 200
|
|
self.headers = {"Content-Type": "application/json"}
|
|
self.body = {"status": "success"}
|
|
self.delay = 0.0
|
|
self.exception = None
|
|
|
|
def with_status(self, status: int) -> "MockResponseBuilder":
|
|
"""Set response status code."""
|
|
self.status = status
|
|
return self
|
|
|
|
def with_headers(self, headers: Dict[str, str]) -> "MockResponseBuilder":
|
|
"""Set response headers."""
|
|
self.headers.update(headers)
|
|
return self
|
|
|
|
def with_json_body(self, body: Dict[str, Any]) -> "MockResponseBuilder":
|
|
"""Set JSON response body."""
|
|
self.body = body
|
|
self.headers["Content-Type"] = "application/json"
|
|
return self
|
|
|
|
def with_text_body(self, body: str) -> "MockResponseBuilder":
|
|
"""Set text response body."""
|
|
self.body = body
|
|
self.headers["Content-Type"] = "text/plain"
|
|
return self
|
|
|
|
def with_delay(self, delay: float) -> "MockResponseBuilder":
|
|
"""Add delay to response."""
|
|
self.delay = delay
|
|
return self
|
|
|
|
def with_exception(self, exception: Exception) -> "MockResponseBuilder":
|
|
"""Make response raise an exception."""
|
|
self.exception = exception
|
|
return self
|
|
|
|
def build(self) -> Mock:
|
|
"""Build the mock response."""
|
|
if self.exception:
|
|
# Create a coroutine that raises the exception
|
|
async def raise_exception():
|
|
await asyncio.sleep(self.delay)
|
|
raise self.exception
|
|
return raise_exception()
|
|
|
|
response = AsyncMock()
|
|
response.status = self.status
|
|
response.headers = self.headers
|
|
|
|
if isinstance(self.body, dict):
|
|
response.json = AsyncMock(return_value=self.body)
|
|
response.text = AsyncMock(return_value=str(self.body))
|
|
else:
|
|
response.text = AsyncMock(return_value=self.body)
|
|
response.json = AsyncMock(side_effect=ValueError("Not JSON"))
|
|
|
|
# Add delay if specified
|
|
if self.delay > 0:
|
|
original_json = response.json
|
|
original_text = response.text
|
|
|
|
async def delayed_json():
|
|
await asyncio.sleep(self.delay)
|
|
return await original_json()
|
|
|
|
async def delayed_text():
|
|
await asyncio.sleep(self.delay)
|
|
return await original_text()
|
|
|
|
response.json = delayed_json
|
|
response.text = delayed_text
|
|
|
|
return response
|
|
|
|
|
|
# Convenience functions
|
|
def create_failing_mock(exception: Exception) -> Mock:
|
|
"""Create a mock that always raises the specified exception."""
|
|
mock = Mock()
|
|
mock.side_effect = exception
|
|
return mock
|
|
|
|
|
|
def create_async_failing_mock(exception: Exception) -> AsyncMock:
|
|
"""Create an async mock that always raises the specified exception."""
|
|
mock = AsyncMock()
|
|
mock.side_effect = exception
|
|
return mock
|
|
|
|
|
|
def create_sequence_mock(values: List[Any]) -> Mock:
|
|
"""Create a mock that returns values in sequence."""
|
|
mock = Mock()
|
|
mock.side_effect = values
|
|
return mock
|
|
|
|
|
|
def create_async_sequence_mock(values: List[Any]) -> AsyncMock:
|
|
"""Create an async mock that returns values in sequence."""
|
|
mock = AsyncMock()
|
|
mock.side_effect = values
|
|
return mock
|
|
|
|
|
|
def create_conditional_mock(condition: Callable[..., bool], true_value: Any, false_value: Any) -> Mock:
|
|
"""Create a mock that returns different values based on a condition."""
|
|
def side_effect(*args, **kwargs):
|
|
if condition(*args, **kwargs):
|
|
return true_value
|
|
return false_value
|
|
|
|
mock = Mock()
|
|
mock.side_effect = side_effect
|
|
return mock |