- Created complete domain layer with pure business logic - Implemented Issue domain models with 48 passing tests - Implemented Project domain models with 31 passing tests - Added domain services for complex business operations - Established clean separation between domain, application, and infrastructure - All 250 tests passing with no breaking changes 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
42 KiB
Testing Architecture Enhancement - Gameplan
Overview
This gameplan establishes a comprehensive testing architecture that supports the domain logic separation and data access pattern improvements while ensuring high code quality, maintainability, and confidence in changes across the MarkiTect codebase.
Current Testing Architecture Problems
1. Test Organization and Structure Issues
Inconsistent Test File Organization
- Problem: Tests scattered across multiple directories without clear structure
- Current: Mix of
tests/and module-specific test files - Impact: Difficult to locate and maintain tests
Poor Test Naming Conventions
- Problem: Inconsistent naming patterns (e.g.,
test_issue_11_*,test_issue_creator.py) - Current: Tests named after issue numbers rather than functionality
- Impact: Tests don't clearly indicate what they're testing
Mixed Test Types
- Problem: Unit tests, integration tests, and end-to-end tests mixed together
- Current: No clear separation between test types
- Impact: Slow test execution, unclear test purpose
2. Test Coverage and Quality Issues
Missing Test Coverage Areas
# Current gaps identified:
- Domain logic testing (business rules not tested in isolation)
- Repository pattern testing (no mock strategies)
- Error handling scenarios (happy path bias)
- Performance and load testing (no performance regression detection)
- Configuration management testing (config scenarios not covered)
Poor Test Isolation
- Problem: Tests depend on external systems and state
- Current: Tests make real API calls, modify actual files
- Impact: Flaky tests, slow execution, test interference
3. Testing Anti-patterns Identified
Services Module Testing Issues
# Current anti-pattern in services/issue_service.py tests
class TestIssueService:
def test_get_issue_details(self):
# Problem: Real API calls in unit tests
service = IssueService()
result = service.get_issue_details(123) # Makes real HTTP request
assert result is not None
TDDAI Module Testing Problems
# Current anti-pattern in tddai tests
class TestProjectManager:
def test_create_project(self):
# Problem: File system dependencies
manager = ProjectManager()
manager.create_workspace("/tmp/test") # Creates real directories
assert os.path.exists("/tmp/test") # Depends on file system state
Testing Architecture Strategy
Test Pyramid Implementation
E2E Tests (Few)
├─ Workflow Tests
├─ CLI Integration Tests
└─ API Integration Tests
Integration Tests (Some)
├─ Service Layer Tests
├─ Repository Tests
├─ Database Tests
└─ External API Tests
Unit Tests (Many)
├─ Domain Model Tests
├─ Business Logic Tests
├─ Value Object Tests
└─ Utility Function Tests
Testing Layer Architecture
tests/
├── unit/ # Fast, isolated unit tests
│ ├── domain/ # Domain model and business logic tests
│ ├── application/ # Application service tests (mocked repos)
│ └── infrastructure/ # Infrastructure component tests
├── integration/ # Integration tests with real components
│ ├── repositories/ # Repository tests with real databases
│ ├── services/ # Service tests with real dependencies
│ └── external/ # External API integration tests
├── e2e/ # End-to-end workflow tests
│ ├── cli/ # CLI command testing
│ ├── workflows/ # Complete user workflows
│ └── performance/ # Performance and load tests
├── fixtures/ # Test data and builders
│ ├── markdown_samples.py
│ ├── api_responses.py
│ └── database_seeds.py
└── utils/ # Test utilities and helpers
├── test_builders.py
├── mock_factories.py
└── assertions.py
Implementation Gameplan
Phase 1: Foundation and Infrastructure (Week 1-2)
Task 1.1: Test Organization and Structure
# Create standardized test directory structure
tests/
├── conftest.py # Global test configuration
├── pytest.ini # Pytest configuration
├── requirements-test.txt # Test dependencies
└── [organized structure as above]
Task 1.2: Test Configuration Setup
# tests/conftest.py
import pytest
import tempfile
import shutil
from pathlib import Path
from unittest.mock import Mock
from typing import Generator
@pytest.fixture(scope="session")
def test_workspace() -> Generator[Path, None, None]:
"""Create isolated test workspace."""
temp_dir = tempfile.mkdtemp(prefix="markitect_test_")
workspace_path = Path(temp_dir)
yield workspace_path
shutil.rmtree(temp_dir)
@pytest.fixture
def mock_database():
"""Provide mocked database for testing."""
mock_db = Mock()
mock_cursor = Mock()
mock_db.cursor.return_value = mock_cursor
mock_db.execute.return_value = mock_cursor
mock_cursor.fetchone.return_value = None
mock_cursor.fetchall.return_value = []
return mock_db
@pytest.fixture
def mock_http_client():
"""Provide mocked HTTP client for API tests."""
mock_client = Mock()
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {"status": "success"}
mock_client.get.return_value = mock_response
mock_client.post.return_value = mock_response
return mock_client
Task 1.3: Test Data Builders and Factories
# tests/fixtures/markdown_samples.py
class MarkdownDocumentBuilder:
"""Builder pattern for creating test markdown documents."""
def __init__(self):
self.content_parts = []
self.metadata = {}
def with_heading(self, text: str, level: int = 1):
heading_marker = "#" * level
self.content_parts.append(f"{heading_marker} {text}")
return self
def with_paragraph(self, text: str):
self.content_parts.append(text)
return self
def with_metadata(self, key: str, value: str):
self.metadata[key] = value
return self
def build(self) -> str:
content = "\n\n".join(self.content_parts)
if self.metadata:
metadata_lines = [f"{k}: {v}" for k, v in self.metadata.items()]
content = "---\n" + "\n".join(metadata_lines) + "\n---\n\n" + content
return content
# tests/fixtures/api_responses.py
class GiteaApiResponseBuilder:
"""Builder for creating mock Gitea API responses."""
def __init__(self):
self.issue_data = {
"number": 1,
"title": "Test Issue",
"state": "open",
"labels": [],
"milestone": None,
"created_at": "2025-01-01T00:00:00Z",
"updated_at": "2025-01-01T00:00:00Z"
}
def with_number(self, number: int):
self.issue_data["number"] = number
return self
def with_title(self, title: str):
self.issue_data["title"] = title
return self
def with_labels(self, *labels: str):
self.issue_data["labels"] = [{"name": label} for label in labels]
return self
def build(self) -> dict:
return self.issue_data.copy()
Deliverables:
- Standardized test directory structure
- Global test configuration and fixtures
- Test data builders and factories
- Test utilities and helpers
Risk Level: Low (foundation work, no breaking changes)
Phase 2: Unit Testing Framework (Week 2-3)
Task 2.1: Domain Model Unit Tests
# tests/unit/domain/test_issue_models.py
import pytest
from domain.issues.models import Issue, Label, IssueState
class TestIssue:
"""Test Issue domain model behavior."""
def test_issue_creation_with_valid_data(self):
# Arrange
issue = Issue(
number=123,
title="Test Issue",
state=IssueState.OPEN,
labels=[Label("bug"), Label("priority:high")]
)
# Act & Assert
assert issue.number == 123
assert issue.title == "Test Issue"
assert issue.state == IssueState.OPEN
assert len(issue.labels) == 2
def test_issue_state_transition_rules(self):
# Arrange
issue = Issue(number=1, title="Test", state=IssueState.OPEN)
# Act
issue.close()
# Assert
assert issue.state == IssueState.CLOSED
assert issue.closed_at is not None
def test_issue_label_categorization(self):
# Arrange
issue = Issue(
number=1,
title="Test",
labels=[
Label("bug"), # type label
Label("priority:high"), # priority label
Label("status:ready"), # state label
Label("custom") # other label
]
)
# Act
categories = issue.categorize_labels()
# Assert
assert "bug" in categories.type_labels
assert "priority:high" in categories.priority_labels
assert "status:ready" in categories.state_labels
assert "custom" in categories.other_labels
Task 2.2: Business Logic Unit Tests
# tests/unit/domain/test_issue_services.py
import pytest
from domain.issues.services import IssueStatusService
from domain.issues.models import Issue, Label, IssueState
class TestIssueStatusService:
"""Test business logic for issue status determination."""
@pytest.fixture
def service(self):
return IssueStatusService()
def test_determine_kanban_column_for_new_issue(self, service):
# Arrange
issue = Issue(
number=1,
title="New Issue",
state=IssueState.OPEN,
labels=[Label("status:new")]
)
# Act
column = service.determine_kanban_column(issue)
# Assert
assert column == "Todo"
def test_determine_kanban_column_for_in_progress_issue(self, service):
# Arrange
issue = Issue(
number=1,
title="In Progress Issue",
state=IssueState.OPEN,
labels=[Label("status:in-progress")]
)
# Act
column = service.determine_kanban_column(issue)
# Assert
assert column == "In Progress"
@pytest.mark.parametrize("labels,expected_priority", [
([Label("priority:low")], "Low"),
([Label("priority:medium")], "Medium"),
([Label("priority:high")], "High"),
([Label("priority:critical")], "Critical"),
([], "Medium"), # Default priority
])
def test_extract_priority_info(self, service, labels, expected_priority):
# Arrange
issue = Issue(number=1, title="Test", labels=labels)
# Act
priority = service.extract_priority_info(issue)
# Assert
assert priority.level == expected_priority
Task 2.3: Application Service Unit Tests (with Mocks)
# tests/unit/application/test_issue_application_service.py
import pytest
from unittest.mock import Mock, AsyncMock
from application.issue_application_service import IssueApplicationService
from domain.issues.models import Issue, IssueState
from infrastructure.unit_of_work import UnitOfWork
class TestIssueApplicationService:
"""Test application service coordination logic."""
@pytest.fixture
def mock_uow(self):
uow = Mock(spec=UnitOfWork)
uow.issues = AsyncMock()
uow.projects = AsyncMock()
uow.__aenter__ = AsyncMock(return_value=uow)
uow.__aexit__ = AsyncMock(return_value=None)
return uow
@pytest.fixture
def service(self, mock_uow):
return IssueApplicationService(mock_uow)
async def test_get_issue_details_success(self, service, mock_uow):
# Arrange
issue = Issue(number=123, title="Test Issue", state=IssueState.OPEN)
project_info = Mock()
project_info.kanban_columns = ["Todo", "In Progress", "Done"]
mock_uow.issues.get_issue.return_value = issue
mock_uow.projects.get_issue_project_info.return_value = project_info
# Act
result = await service.get_issue_details(123)
# Assert
assert result.issue == issue
assert result.project_info == project_info
mock_uow.issues.get_issue.assert_called_once_with(123)
mock_uow.projects.get_issue_project_info.assert_called_once_with(123)
async def test_get_issue_details_issue_not_found(self, service, mock_uow):
# Arrange
from domain.issues.exceptions import IssueNotFoundError
mock_uow.issues.get_issue.side_effect = IssueNotFoundError("Issue not found")
# Act & Assert
with pytest.raises(IssueNotFoundError):
await service.get_issue_details(999)
Deliverables:
- Unit tests for all domain models
- Unit tests for business logic services
- Unit tests for application services with mocks
- Parameterized tests for edge cases
Risk Level: Low (isolated unit tests, no external dependencies)
Phase 3: Integration Testing Framework (Week 3-4)
Task 3.1: Repository Integration Tests
# tests/integration/repositories/test_gitea_issue_repository.py
import pytest
import aiohttp
from infrastructure.repositories.gitea_issue_repository import GiteaIssueRepository
from infrastructure.connection_manager import ConnectionManager
from tests.fixtures.api_responses import GiteaApiResponseBuilder
class TestGiteaIssueRepository:
"""Integration tests for Gitea API repository."""
@pytest.fixture
async def repository(self, test_config):
connection_manager = ConnectionManager(test_config)
repo = GiteaIssueRepository(connection_manager)
yield repo
await connection_manager.close()
@pytest.fixture
def mock_server(self, aioresponses):
"""Mock HTTP responses for integration tests."""
return aioresponses
async def test_get_issue_success(self, repository, mock_server):
# Arrange
issue_data = (GiteaApiResponseBuilder()
.with_number(123)
.with_title("Test Issue")
.with_labels("bug", "priority:high")
.build())
mock_server.get(
"http://test-gitea.com/api/v1/repos/test/repo/issues/123",
payload=issue_data
)
# Act
issue = await repository.get_issue(123)
# Assert
assert issue.number == 123
assert issue.title == "Test Issue"
assert len(issue.labels) == 2
async def test_get_issue_not_found(self, repository, mock_server):
# Arrange
mock_server.get(
"http://test-gitea.com/api/v1/repos/test/repo/issues/999",
status=404
)
# Act & Assert
from domain.issues.exceptions import IssueNotFoundError
with pytest.raises(IssueNotFoundError):
await repository.get_issue(999)
async def test_get_issue_with_retry_on_network_error(self, repository, mock_server):
# Arrange - First two requests fail, third succeeds
issue_data = GiteaApiResponseBuilder().with_number(123).build()
mock_server.get(
"http://test-gitea.com/api/v1/repos/test/repo/issues/123",
exception=aiohttp.ClientError("Network error")
)
mock_server.get(
"http://test-gitea.com/api/v1/repos/test/repo/issues/123",
exception=aiohttp.ClientError("Network error")
)
mock_server.get(
"http://test-gitea.com/api/v1/repos/test/repo/issues/123",
payload=issue_data
)
# Act
issue = await repository.get_issue(123)
# Assert
assert issue.number == 123
# Verify retry mechanism worked (3 calls total)
assert len(mock_server.requests) == 3
Task 3.2: Database Integration Tests
# tests/integration/repositories/test_sqlite_document_repository.py
import pytest
import sqlite3
from pathlib import Path
from infrastructure.repositories.sqlite_document_repository import SqliteDocumentRepository
from domain.documents.models import Document
class TestSqliteDocumentRepository:
"""Integration tests for SQLite document repository."""
@pytest.fixture
def test_db_path(self, test_workspace):
return test_workspace / "test.db"
@pytest.fixture
def repository(self, test_db_path):
repo = SqliteDocumentRepository(test_db_path)
repo.initialize_schema()
yield repo
repo.close()
async def test_store_and_retrieve_document(self, repository):
# Arrange
document = Document(
filename="test.md",
content="# Test Document\nContent here",
ast_data={"type": "document", "children": []}
)
# Act
document_id = await repository.store_document(document)
retrieved = await repository.get_document(document_id)
# Assert
assert retrieved.filename == "test.md"
assert retrieved.content == "# Test Document\nContent here"
assert retrieved.ast_data["type"] == "document"
async def test_store_duplicate_filename_raises_error(self, repository):
# Arrange
document1 = Document(filename="duplicate.md", content="Content 1")
document2 = Document(filename="duplicate.md", content="Content 2")
# Act
await repository.store_document(document1)
# Assert
from infrastructure.exceptions import DocumentStoreError
with pytest.raises(DocumentStoreError) as exc_info:
await repository.store_document(document2)
assert "already exists" in str(exc_info.value)
async def test_transaction_rollback_on_error(self, repository):
# Arrange
document = Document(filename="test.md", content="Valid content")
# Simulate a database error during storage
with pytest.raises(sqlite3.Error):
async with repository.unit_of_work():
await repository.store_document(document)
# Force an error that should rollback the transaction
await repository.execute_raw_sql("INVALID SQL")
# Assert - Document should not be stored due to rollback
documents = await repository.list_all_documents()
assert len(documents) == 0
Task 3.3: Service Integration Tests
# tests/integration/services/test_document_service_integration.py
import pytest
from pathlib import Path
from application.document_service import DocumentService
from infrastructure.unit_of_work import UnitOfWork
from tests.fixtures.markdown_samples import MarkdownDocumentBuilder
class TestDocumentServiceIntegration:
"""Integration tests for document service with real repositories."""
@pytest.fixture
def service(self, test_workspace):
uow = UnitOfWork(database_path=test_workspace / "test.db")
uow.initialize()
yield DocumentService(uow)
uow.close()
async def test_ingest_markdown_file_complete_workflow(self, service, test_workspace):
# Arrange
markdown_content = (MarkdownDocumentBuilder()
.with_heading("Test Document")
.with_paragraph("This is a test paragraph.")
.with_heading("Section 2", level=2)
.build())
test_file = test_workspace / "test.md"
test_file.write_text(markdown_content)
# Act
result = await service.ingest_file(test_file)
# Assert
assert result.document_id is not None
assert result.parse_time > 0
assert result.cache_path.exists()
# Verify document was stored correctly
document = await service.get_document(result.document_id)
assert document.filename == "test.md"
assert "Test Document" in document.content
assert document.ast_data is not None
async def test_bulk_ingestion_with_transaction(self, service, test_workspace):
# Arrange
files = []
for i in range(5):
content = f"# Document {i}\nContent for document {i}"
file_path = test_workspace / f"doc_{i}.md"
file_path.write_text(content)
files.append(file_path)
# Act
results = await service.ingest_bulk(files)
# Assert
assert len(results) == 5
for result in results:
assert result.document_id is not None
assert result.parse_time > 0
# Verify all documents are stored
all_docs = await service.list_documents()
assert len(all_docs) == 5
Deliverables:
- Repository integration tests with real databases/APIs
- Service integration tests with transaction testing
- Error handling and retry mechanism tests
- Performance and load integration tests
Risk Level: Medium (involves real external dependencies)
Phase 4: End-to-End Testing Framework (Week 4-5)
Task 4.1: CLI Command Testing
# tests/e2e/cli/test_issue_commands.py
import pytest
import subprocess
from pathlib import Path
class TestIssueCommands:
"""End-to-end tests for issue management CLI commands."""
@pytest.fixture
def isolated_environment(self, test_workspace):
"""Set up isolated environment for CLI testing."""
env = {
"MARKITECT_WORKSPACE_DIR": str(test_workspace),
"MARKITECT_GITEA_URL": "http://test-gitea.com",
"MARKITECT_REPO_OWNER": "test",
"MARKITECT_REPO_NAME": "repo"
}
return env
def test_issue_show_command(self, isolated_environment):
# Act
result = subprocess.run(
["python", "tddai_cli.py", "show-issue", "123"],
env=isolated_environment,
capture_output=True,
text=True
)
# Assert
assert result.returncode == 0
assert "Issue #123 Details" in result.stdout
assert "Title:" in result.stdout
assert "Status:" in result.stdout
def test_issue_start_workflow(self, isolated_environment):
# Act - Start working on an issue
result = subprocess.run(
["python", "tddai_cli.py", "start-issue", "456"],
env=isolated_environment,
capture_output=True,
text=True
)
# Assert
assert result.returncode == 0
assert "Starting work on issue #456" in result.stdout
# Verify workspace was created
workspace_path = Path(isolated_environment["MARKITECT_WORKSPACE_DIR"]) / "issue_456"
assert workspace_path.exists()
assert (workspace_path / "requirements.md").exists()
assert (workspace_path / "test_plan.md").exists()
def test_complete_issue_workflow(self, isolated_environment):
# Act - Complete workflow: start -> add tests -> finish
commands = [
["python", "tddai_cli.py", "start-issue", "789"],
["python", "tddai_cli.py", "add-test", "test_scenario"],
["python", "tddai_cli.py", "finish-issue"]
]
for cmd in commands:
result = subprocess.run(
cmd,
env=isolated_environment,
capture_output=True,
text=True
)
assert result.returncode == 0
# Assert - Workspace should be cleaned up
workspace_path = Path(isolated_environment["MARKITECT_WORKSPACE_DIR"]) / "issue_789"
assert not workspace_path.exists()
Task 4.2: Workflow Integration Tests
# tests/e2e/workflows/test_document_processing_workflow.py
import pytest
from pathlib import Path
import asyncio
from application.document_service import DocumentService
from application.workspace_service import WorkspaceService
from infrastructure.unit_of_work import UnitOfWork
class TestDocumentProcessingWorkflow:
"""End-to-end tests for complete document processing workflows."""
@pytest.fixture
async def services(self, test_workspace):
uow = UnitOfWork(database_path=test_workspace / "test.db")
await uow.initialize()
doc_service = DocumentService(uow)
workspace_service = WorkspaceService(uow)
yield doc_service, workspace_service
await uow.close()
async def test_full_document_lifecycle(self, services, test_workspace):
doc_service, workspace_service = services
# Arrange - Create test documents
docs_dir = test_workspace / "documents"
docs_dir.mkdir()
# Create various document types
(docs_dir / "readme.md").write_text("# Project README\nDescription here")
(docs_dir / "api.md").write_text("# API Documentation\n## Endpoints")
(docs_dir / "guide.md").write_text("# User Guide\n### Getting Started")
# Act - Process all documents
ingestion_results = []
for md_file in docs_dir.glob("*.md"):
result = await doc_service.ingest_file(md_file)
ingestion_results.append(result)
# Generate workspace summary
workspace_summary = await workspace_service.generate_summary()
# Act - Search functionality
search_results = await doc_service.search_content("API")
# Assert - All documents processed
assert len(ingestion_results) == 3
for result in ingestion_results:
assert result.document_id is not None
assert result.parse_time > 0
# Assert - Workspace summary generated
assert workspace_summary.total_documents == 3
assert workspace_summary.total_size > 0
# Assert - Search functionality works
assert len(search_results) >= 1
assert any("api.md" in result.filename for result in search_results)
async def test_large_document_processing_performance(self, services, test_workspace):
doc_service, _ = services
# Arrange - Create large document (1MB)
from tests.fixtures.markdown_samples import LargeMarkdownGenerator
generator = LargeMarkdownGenerator()
large_content = generator.generate_document(size='1mb')
large_file = test_workspace / "large_document.md"
large_file.write_text(large_content)
# Act - Measure processing time
import time
start_time = time.time()
result = await doc_service.ingest_file(large_file)
processing_time = time.time() - start_time
# Assert - Performance requirements
assert result.document_id is not None
assert processing_time < 10.0 # Should process 1MB in under 10 seconds
assert result.parse_time < 5.0 # AST parsing should be under 5 seconds
# Verify cache was created for performance
assert result.cache_path.exists()
cache_size = result.cache_path.stat().st_size
assert cache_size > 0
Task 4.3: Performance and Load Testing
# tests/e2e/performance/test_system_performance.py
import pytest
import asyncio
import time
import statistics
from concurrent.futures import ThreadPoolExecutor
from application.document_service import DocumentService
from tests.fixtures.markdown_samples import MarkdownDocumentBuilder
class TestSystemPerformance:
"""Performance and load testing for the system."""
@pytest.fixture
async def service(self, test_workspace):
from infrastructure.unit_of_work import UnitOfWork
uow = UnitOfWork(database_path=test_workspace / "perf_test.db")
await uow.initialize()
yield DocumentService(uow)
await uow.close()
async def test_concurrent_document_ingestion(self, service, test_workspace):
"""Test system behavior under concurrent load."""
# Arrange - Create multiple test documents
docs_dir = test_workspace / "concurrent_docs"
docs_dir.mkdir()
doc_files = []
for i in range(20):
content = (MarkdownDocumentBuilder()
.with_heading(f"Document {i}")
.with_paragraph(f"Content for document {i}")
.build())
doc_file = docs_dir / f"doc_{i}.md"
doc_file.write_text(content)
doc_files.append(doc_file)
# Act - Process documents concurrently
start_time = time.time()
tasks = [service.ingest_file(doc_file) for doc_file in doc_files]
results = await asyncio.gather(*tasks)
total_time = time.time() - start_time
# Assert - Performance requirements
assert len(results) == 20
assert all(result.document_id is not None for result in results)
# Should process 20 small documents in under 30 seconds
assert total_time < 30.0
# Calculate processing statistics
parse_times = [result.parse_time for result in results]
avg_parse_time = statistics.mean(parse_times)
max_parse_time = max(parse_times)
assert avg_parse_time < 1.0 # Average parse time under 1 second
assert max_parse_time < 5.0 # Max parse time under 5 seconds
async def test_memory_usage_under_load(self, service, test_workspace):
"""Test memory usage patterns during heavy processing."""
import psutil
import os
# Measure initial memory
process = psutil.Process(os.getpid())
initial_memory = process.memory_info().rss
# Arrange - Create multiple large documents
from tests.fixtures.markdown_samples import LargeMarkdownGenerator
generator = LargeMarkdownGenerator()
large_docs = []
for i in range(5):
content = generator.generate_document(size='1mb')
doc_file = test_workspace / f"large_{i}.md"
doc_file.write_text(content)
large_docs.append(doc_file)
# Act - Process large documents
for doc_file in large_docs:
await service.ingest_file(doc_file)
# Measure final memory
final_memory = process.memory_info().rss
memory_increase = final_memory - initial_memory
memory_increase_mb = memory_increase / (1024 * 1024)
# Assert - Memory usage should be reasonable
# Should not use more than 100MB additional memory for 5MB of documents
assert memory_increase_mb < 100
print(f"Memory increase: {memory_increase_mb:.2f} MB")
@pytest.mark.slow
async def test_system_stability_over_time(self, service, test_workspace):
"""Long-running stability test."""
# Run continuous processing for 5 minutes
start_time = time.time()
duration = 300 # 5 minutes
operation_count = 0
errors = []
while time.time() - start_time < duration:
try:
# Create and process a document
content = (MarkdownDocumentBuilder()
.with_heading(f"Stability Test {operation_count}")
.with_paragraph("Long-running test content")
.build())
doc_file = test_workspace / f"stability_{operation_count}.md"
doc_file.write_text(content)
await service.ingest_file(doc_file)
operation_count += 1
# Small delay between operations
await asyncio.sleep(0.1)
except Exception as e:
errors.append(str(e))
# Assert - System should remain stable
error_rate = len(errors) / operation_count if operation_count > 0 else 1
assert error_rate < 0.01 # Less than 1% error rate
assert operation_count > 100 # Should process at least 100 operations
print(f"Operations completed: {operation_count}")
print(f"Error rate: {error_rate:.2%}")
Deliverables:
- CLI command end-to-end tests
- Complete workflow integration tests
- Performance and load testing framework
- System stability and reliability tests
Risk Level: Low-Medium (end-to-end tests, performance requirements)
Phase 5: Test Migration and Optimization (Week 5-6)
Task 5.1: Migrate Existing Tests
# Migration strategy for existing tests
# Example: Migrating tests/test_issue_creator.py
# Before (current structure)
class TestIssueCreator:
def test_create_issue_success(self):
creator = IssueCreator(auth_token="test-token")
result = creator.create_issue("Test Issue", "Description")
assert result is not None
# After (new structure)
# tests/unit/application/test_issue_creator.py
class TestIssueCreator:
@pytest.fixture
def mock_repository(self):
return Mock(spec=IssueRepository)
@pytest.fixture
def creator(self, mock_repository):
return IssueCreator(mock_repository)
async def test_create_issue_success(self, creator, mock_repository):
# Arrange
mock_repository.create_issue.return_value = Issue(number=123, title="Test Issue")
# Act
result = await creator.create_issue("Test Issue", "Description")
# Assert
assert result.number == 123
mock_repository.create_issue.assert_called_once()
Task 5.2: Test Performance Optimization
# tests/utils/performance_optimization.py
import pytest
import asyncio
from typing import List, Callable
class TestPerformanceOptimizer:
"""Utilities for optimizing test execution performance."""
@staticmethod
def parallelize_tests(test_functions: List[Callable]):
"""Run multiple test functions in parallel."""
async def run_parallel():
tasks = [asyncio.create_task(test_func()) for test_func in test_functions]
return await asyncio.gather(*tasks)
return asyncio.run(run_parallel())
@staticmethod
def cache_expensive_fixtures():
"""Cache expensive test fixtures across test sessions."""
# Implementation for fixture caching
pass
# pytest configuration for performance
# pytest.ini
[tool:pytest]
addopts =
--strict-markers
--strict-config
--verbose
--tb=short
--cov=src
--cov-report=term-missing
--cov-report=html
--cov-fail-under=90
--maxfail=1
--durations=10
markers =
slow: marks tests as slow (deselect with '-m "not slow"')
integration: marks tests as integration tests
e2e: marks tests as end-to-end tests
performance: marks tests as performance tests
Task 5.3: CI/CD Integration
# .github/workflows/test.yml
name: Test Suite
on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main ]
jobs:
unit-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install -r requirements-test.txt
- name: Run unit tests
run: pytest tests/unit/ -v --cov=src --cov-report=xml
- name: Upload coverage
uses: codecov/codecov-action@v3
integration-tests:
runs-on: ubuntu-latest
needs: unit-tests
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install -r requirements-test.txt
- name: Run integration tests
run: pytest tests/integration/ -v
e2e-tests:
runs-on: ubuntu-latest
needs: [unit-tests, integration-tests]
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install -r requirements-test.txt
- name: Run end-to-end tests
run: pytest tests/e2e/ -v -m "not slow"
- name: Run performance tests
run: pytest tests/e2e/performance/ -v
if: github.event_name == 'push' && github.ref == 'refs/heads/main'
Deliverables:
- Migration of all existing tests to new architecture
- Test performance optimization and parallelization
- CI/CD pipeline integration
- Test coverage and quality gates
Risk Level: Medium (migration changes, CI/CD dependencies)
Phase 6: Advanced Testing Features (Week 6-7)
Task 6.1: Property-Based Testing
# tests/property/test_markdown_processing.py
import pytest
from hypothesis import given, strategies as st
from domain.documents.models import Document
from application.document_service import DocumentService
class TestMarkdownProcessingProperties:
"""Property-based tests for markdown processing."""
@given(st.text(alphabet=st.characters(blacklist_categories=('Cc', 'Cs'))))
async def test_any_valid_text_can_be_processed(self, text, document_service):
"""Any valid unicode text should be processable."""
# Arrange
document = Document(filename="test.md", content=text)
# Act - Should not raise exception
result = await document_service.process_document(document)
# Assert
assert result is not None
assert result.ast_data is not None
@given(st.text(min_size=1, max_size=1000))
async def test_processing_is_deterministic(self, content, document_service):
"""Same content should always produce same AST."""
# Arrange
document = Document(filename="test.md", content=content)
# Act
result1 = await document_service.process_document(document)
result2 = await document_service.process_document(document)
# Assert
assert result1.ast_data == result2.ast_data
@given(st.lists(st.text(min_size=1), min_size=1, max_size=10))
async def test_batch_processing_order_independence(self, contents, document_service):
"""Batch processing should be order-independent."""
# Arrange
documents1 = [Document(f"doc_{i}.md", content) for i, content in enumerate(contents)]
documents2 = list(reversed(documents1))
# Act
results1 = await document_service.process_batch(documents1)
results2 = await document_service.process_batch(documents2)
# Assert - Results should be equivalent regardless of order
results1_by_name = {r.filename: r.ast_data for r in results1}
results2_by_name = {r.filename: r.ast_data for r in results2}
assert results1_by_name == results2_by_name
Task 6.2: Mutation Testing
# tests/mutation/test_coverage_quality.py
"""
Mutation testing to verify test quality.
Uses mutmut or similar tools to verify tests catch logic errors.
"""
# Configuration for mutation testing
# pyproject.toml
[tool.mutmut]
paths_to_mutate = "src/"
backup = false
runner = "python -m pytest tests/unit/"
tests_dir = "tests/"
# Mutation testing command
# mutmut run --paths-to-mutate src/domain/
Task 6.3: Contract Testing
# tests/contract/test_api_contracts.py
import pytest
from pact import Consumer, Provider
from application.issue_service import IssueService
class TestGiteaApiContract:
"""Contract tests for Gitea API integration."""
@pytest.fixture
def pact(self):
pact = Consumer('markitect').has_pact_with(Provider('gitea'))
pact.start()
yield pact
pact.stop()
def test_get_issue_contract(self, pact):
# Define expected interaction
expected = {
'number': 123,
'title': 'Test Issue',
'state': 'open',
'labels': [{'name': 'bug'}]
}
(pact
.given('issue 123 exists')
.upon_receiving('a request for issue 123')
.with_request('GET', '/api/v1/repos/test/repo/issues/123')
.will_respond_with(200, body=expected))
# Test the interaction
with pact:
issue_service = IssueService(base_url=pact.uri)
issue = issue_service.get_issue(123)
assert issue.number == 123
Deliverables:
- Property-based testing framework
- Mutation testing setup
- Contract testing for external APIs
- Advanced test analysis and reporting
Risk Level: Low (advanced features, non-breaking additions)
Success Criteria and Metrics
Implementation Success Indicators:
Coverage Metrics:
- Unit Test Coverage: >90% for domain and application layers
- Integration Test Coverage: >80% for infrastructure layer
- E2E Test Coverage: >70% for critical user workflows
Performance Metrics:
- Unit Tests: All execute in <30 seconds total
- Integration Tests: All execute in <5 minutes total
- E2E Tests: Critical workflows tested in <15 minutes
Quality Metrics:
- Test Reliability: <1% flakiness rate
- Test Maintainability: Clear organization and documentation
- CI/CD Integration: Tests run automatically on all commits
- Error Detection: Mutation testing score >85%
Test Architecture Benefits:
Developer Experience:
- Fast Feedback: Unit tests provide immediate feedback
- Reliable Tests: Consistent results across environments
- Easy Debugging: Clear test failure messages and context
- Comprehensive Coverage: All critical paths tested
System Quality:
- Regression Prevention: Automated detection of breaking changes
- Performance Monitoring: Continuous performance validation
- Error Handling: Comprehensive error scenario testing
- Stability Assurance: Long-running stability validation
This comprehensive testing architecture enhancement gameplan provides a robust foundation for ensuring code quality, catching regressions early, and maintaining confidence in the system as it evolves through domain logic separation and data access improvements.