16 KiB
Data Access Pattern Improvements - Gameplan
Overview
This gameplan addresses systematic improvements to data access patterns across the MarkiTect codebase, focusing on implementing modern, maintainable, and performant data access strategies that complement the domain logic separation work.
Current Data Access Anti-patterns Identified
1. Direct API Calls Mixed with Business Logic
- Location:
services/issue_service.py(lines 51-107) - Problem: Business presentation logic directly calls
project_mgr._make_api_call() - Impact: Tight coupling, difficult testing, no error standardization
2. Subprocess-based HTTP Requests
- Location:
tddai/project_manager.py(lines 35-67) - Problem: Using
subprocess.run(['curl', ...])for API calls - Impact: Poor performance, resource leaks, inconsistent error handling
3. Scattered Database Operations
- Location:
markitect/document_manager.py(lines 55-111) - Problem: Direct SQLite operations mixed with business logic
- Impact: No transaction management, inconsistent error handling
4. Inconsistent File System Access
- Location:
tddai/workspace.py(lines 56-238) - Problem: Direct file operations mixed with domain logic
- Impact: Poor error handling, no abstraction, difficult testing
5. Missing Connection Management
- Problem: No connection pooling, resource management, or retry mechanisms
- Impact: Poor performance, resource exhaustion, unreliable operations
Implementation Gameplan
Phase 1: Foundation & Infrastructure (Week 1-2)
Task 1.1: Connection Management Infrastructure
# Create: infrastructure/connection_manager.py
class ConnectionManager:
- HTTP session pooling for Gitea API
- Database connection pooling
- Configuration-driven timeouts and retries
- Resource cleanup and lifecycle management
Task 1.2: Error Handling Standardization
# Create: infrastructure/exceptions.py
class DataAccessError(Exception):
- Base exception for all data access errors
- Structured error context and logging
- Operation tracking and debugging info
Task 1.3: Repository Interface Definitions
# Create: infrastructure/repositories/interfaces.py
- IssueRepository (abstract)
- ProjectRepository (abstract)
- DocumentRepository (abstract)
- WorkspaceRepository (abstract)
Deliverables:
- Connection manager with HTTP session pooling
- Standardized error hierarchy
- Abstract repository interfaces
- Configuration for data sources
Risk Level: Low (additive changes only)
Phase 2: Repository Implementation (Week 2-3)
Task 2.1: Gitea Repository Implementation
# Create: infrastructure/repositories/gitea_repository.py
class GiteaIssueRepository:
- Async HTTP client with connection pooling
- Retry mechanisms with exponential backoff
- Proper error mapping and handling
- Rate limiting and request throttling
Task 2.2: Database Repository Implementation
# Create: infrastructure/repositories/sqlite_repository.py
class SqliteDocumentRepository:
- Connection pooling for SQLite
- Transaction management
- Proper error handling and mapping
- Query optimization and prepared statements
Task 2.3: File System Repository Implementation
# Create: infrastructure/repositories/filesystem_repository.py
class FilesystemWorkspaceRepository:
- Abstracted file operations
- Atomic file operations
- Path validation and security
- Error handling and recovery
Deliverables:
- Gitea API repository with async HTTP client
- SQLite repository with transaction support
- File system repository with atomic operations
- Comprehensive error handling for all repositories
Risk Level: Low-Medium (parallel implementation)
Phase 3: Unit of Work Pattern (Week 3-4)
Task 3.1: Transaction Coordination
# Create: infrastructure/unit_of_work.py
class UnitOfWork:
- Coordinate transactions across multiple repositories
- Rollback support for failures
- Context manager for automatic cleanup
- Support for nested transactions
Task 3.2: Caching Strategy
# Create: infrastructure/caching/cache_manager.py
class CacheManager:
- Multi-level caching (memory, disk, Redis)
- Cache invalidation strategies
- Performance monitoring
- TTL and eviction policies
Deliverables:
- Unit of Work implementation
- Caching infrastructure
- Transaction coordination
- Performance monitoring
Risk Level: Medium (involves transaction logic)
Phase 4: Service Layer Migration (Week 4-6)
Task 4.1: Issue Service Refactoring
# Refactor: services/issue_service.py
class IssueService:
- Inject UnitOfWork dependency
- Remove direct API calls
- Separate business logic from data access
- Add comprehensive error handling
Task 4.2: Document Service Refactoring
# Refactor: markitect/document_manager.py → services/document_service.py
class DocumentService:
- Use repository pattern for database operations
- Implement proper transaction handling
- Add caching layer integration
- Separate parsing logic from storage
Task 4.3: Workspace Service Refactoring
# Refactor: tddai/workspace.py → services/workspace_service.py
class WorkspaceService:
- Abstract file system operations
- Add proper error handling
- Implement atomic workspace operations
- Add workspace state management
Deliverables:
- Refactored IssueService using repositories
- New DocumentService with transaction support
- New WorkspaceService with atomic operations
- Backward compatibility adapters
Risk Level: Medium-High (core service changes)
Phase 5: Performance Optimization (Week 6-7)
Task 5.1: Query Optimization
# Implement query objects for complex operations
class IssueQueries:
- Parameterized queries for common operations
- Batch operations for multiple issues
- Pagination support
- Index optimization recommendations
Task 5.2: Async/Await Implementation
# Convert synchronous operations to async
- Async repository methods
- Concurrent data fetching
- Parallel processing where applicable
- Non-blocking I/O operations
Task 5.3: Monitoring and Metrics
# Create: infrastructure/monitoring/data_metrics.py
class DataAccessMetrics:
- Query performance tracking
- Error rate monitoring
- Connection pool utilization
- Cache hit/miss ratios
Deliverables:
- Async repository implementations
- Query optimization strategies
- Performance monitoring
- Batch operation support
Risk Level: Medium (performance changes)
Phase 6: Testing & Migration (Week 7-8)
Task 6.1: Comprehensive Testing
# Test Coverage:
- Unit tests for all repositories (mocked dependencies)
- Integration tests with real databases/APIs
- Performance tests for critical operations
- Error handling and recovery tests
Task 6.2: Gradual Migration
# Migration Strategy:
- Feature flags for repository switching
- Parallel running of old and new systems
- Gradual consumer migration
- Monitoring and rollback capabilities
Deliverables:
- Complete test suite for data access layer
- Migration scripts and tools
- Performance benchmarks
- Documentation and runbooks
Risk Level: Low-Medium (testing and gradual rollout)
Specific Implementation Examples
Example 1: IssueService Transformation
Before (Current Anti-pattern):
class IssueService:
def get_issue_details(self, issue_number: int) -> Dict[str, Any]:
# Direct dependency creation
from tddai.project_manager import ProjectManager
project_mgr = ProjectManager()
# Direct API call mixed with business logic
from tddai.config import get_config
config = get_config()
issue_url = f"{config.issues_api_url}/{issue_number}"
detailed_issue = project_mgr._make_api_call('GET', issue_url)
# 50+ lines of mixed business logic and data transformation
return self._process_issue_data(detailed_issue)
After (Repository Pattern):
class IssueService:
def __init__(self, uow: UnitOfWork):
self.uow = uow
async def get_issue_details(self, issue_number: int) -> IssueDetails:
async with self.uow:
# Clean separation: repository handles data access
issue = await self.uow.issues.get_issue(issue_number)
project_info = await self.uow.projects.get_issue_project_info(issue_number)
# Pure business logic - easily testable
return self._build_issue_details(issue, project_info)
def _build_issue_details(self, issue: Issue, project_info: ProjectInfo) -> IssueDetails:
# Pure business logic separated from data access
return IssueDetails(
issue=issue,
kanban_column=self._determine_kanban_column(issue, project_info),
priority_info=self._extract_priority_info(issue),
state_info=self._extract_state_info(issue)
)
Example 2: Connection Management
Before (Subprocess-based HTTP):
class GiteaHttpClient:
def _make_request(self, method: str, url: str, data: Optional[Dict[str, Any]] = None):
# New subprocess for every request - very inefficient
cmd = ['curl', '-s', '-X', method]
if data:
cmd.extend(['-d', json.dumps(data)])
cmd.append(url)
result = subprocess.run(cmd, stdout=PIPE, stderr=PIPE, text=True)
# Poor error handling
if result.returncode != 0:
raise Exception(f"HTTP request failed: {result.stderr}")
return json.loads(result.stdout)
After (Proper HTTP Client with Pooling):
class ConnectionManager:
def __init__(self, config: DataSourceConfig):
self.config = config
self._http_session = None
async def get_http_session(self) -> aiohttp.ClientSession:
if self._http_session is None:
connector = aiohttp.TCPConnector(
limit=self.config.connection_pool_size,
limit_per_host=5,
keepalive_timeout=60
)
timeout = aiohttp.ClientTimeout(total=self.config.request_timeout)
self._http_session = aiohttp.ClientSession(
connector=connector,
timeout=timeout,
headers={'Authorization': f'token {self.config.gitea_token}'}
)
return self._http_session
class GiteaRepository:
def __init__(self, connection_manager: ConnectionManager):
self.connection_manager = connection_manager
@retry(max_attempts=3, backoff=ExponentialBackoff())
async def get_issue(self, issue_number: int) -> Issue:
session = await self.connection_manager.get_http_session()
async with session.get(f'/api/v1/repos/.../issues/{issue_number}') as response:
if response.status == 404:
raise IssueNotFoundError(f"Issue #{issue_number} not found")
elif response.status >= 400:
raise GiteaApiError(f"API error: {response.status}")
data = await response.json()
return Issue.from_api_data(data)
Example 3: Transaction Management
Before (No Transaction Support):
class DocumentManager:
def ingest_file(self, file_path: Path) -> Dict[str, Any]:
# Multiple separate operations - if any fails, inconsistent state
content = self._read_file_content(file_path)
ast, parse_time = self._parse_content_to_ast(content)
cache_file, cache_time = self._create_performance_cache(file_path.name, ast)
# Database operation could fail after cache is created
self._store_in_database(file_path.name, content)
return self._build_ingestion_result(file_path, parse_time, cache_time)
After (Unit of Work with Transactions):
class DocumentService:
def __init__(self, uow: UnitOfWork):
self.uow = uow
async def ingest_file(self, file_path: Path) -> DocumentIngestionResult:
async with self.uow:
# All operations in single transaction
content = await self._read_file_content(file_path)
ast, parse_time = await self._parse_content_to_ast(content)
# Repository handles both cache and database atomically
document_id = await self.uow.documents.store_document(
filename=file_path.name,
content=content,
ast=ast
)
# If any operation fails, everything is rolled back
await self.uow.cache.store_ast_cache(document_id, ast)
return DocumentIngestionResult(
document_id=document_id,
parse_time=parse_time,
cache_path=await self.uow.documents.get_cache_path(document_id)
)
Risk Assessment & Mitigation
High-Risk Areas:
- Service Layer Refactoring - Could break existing functionality
- Database Transaction Changes - Risk of data corruption
- External API Changes - Risk of connectivity issues
Mitigation Strategies:
- Parallel Implementation - Keep old code until new code is proven
- Feature Flags - Toggle between old and new implementations
- Comprehensive Testing - Unit, integration, and end-to-end tests
- Gradual Migration - Migrate one service at a time
- Monitoring - Real-time performance and error monitoring
Rollback Plan:
- Feature flags allow instant rollback to previous implementation
- Database migrations are reversible
- Configuration changes can be reverted via environment variables
- Each phase is independently deployable and reversible
Performance Benefits Expected
HTTP Client Improvements:
- Before: New subprocess per request (~100-200ms overhead)
- After: Connection pooling (~5-10ms per request)
- Improvement: 10-20x faster API operations
Database Operations:
- Before: New connection per operation
- After: Connection pooling and prepared statements
- Improvement: 3-5x faster database operations
Error Recovery:
- Before: Silent failures and inconsistent error handling
- After: Automatic retries and structured error reporting
- Improvement: 90% reduction in transient failures
Resource Utilization:
- Before: Resource leaks from subprocess and connection management
- After: Proper resource pooling and cleanup
- Improvement: 50-70% reduction in resource usage
Testing Strategy
Unit Testing:
- Repository interfaces with mock implementations
- Business logic separated from data access
- Error handling and edge cases
- Performance characteristics
Integration Testing:
- Real database and API interactions
- Transaction rollback scenarios
- Connection pooling behavior
- Retry mechanism validation
Performance Testing:
- Load testing for concurrent operations
- Memory usage and leak detection
- Connection pool utilization
- Cache effectiveness measurement
Monitoring & Observability
Metrics to Track:
- Request latency percentiles (p50, p95, p99)
- Error rates by operation type
- Connection pool utilization
- Cache hit/miss ratios
- Database query performance
- API rate limiting compliance
Alerting:
- High error rates or latency spikes
- Connection pool exhaustion
- Database deadlocks or timeouts
- API rate limit violations
- Cache performance degradation
This comprehensive gameplan provides a systematic approach to modernizing data access patterns while maintaining system stability and ensuring measurable performance improvements.