chore: history cleanup
This commit is contained in:
483
history/ADHOC_Data_Access_Improvements_GAMEPLAN.md
Normal file
483
history/ADHOC_Data_Access_Improvements_GAMEPLAN.md
Normal file
@@ -0,0 +1,483 @@
|
||||
# Data Access Pattern Improvements - Gameplan
|
||||
|
||||
## Overview
|
||||
|
||||
This gameplan addresses systematic improvements to data access patterns across the MarkiTect codebase, focusing on implementing modern, maintainable, and performant data access strategies that complement the domain logic separation work.
|
||||
|
||||
## Current Data Access Anti-patterns Identified
|
||||
|
||||
### 1. **Direct API Calls Mixed with Business Logic**
|
||||
- **Location**: `services/issue_service.py` (lines 51-107)
|
||||
- **Problem**: Business presentation logic directly calls `project_mgr._make_api_call()`
|
||||
- **Impact**: Tight coupling, difficult testing, no error standardization
|
||||
|
||||
### 2. **Subprocess-based HTTP Requests**
|
||||
- **Location**: `tddai/project_manager.py` (lines 35-67)
|
||||
- **Problem**: Using `subprocess.run(['curl', ...])` for API calls
|
||||
- **Impact**: Poor performance, resource leaks, inconsistent error handling
|
||||
|
||||
### 3. **Scattered Database Operations**
|
||||
- **Location**: `markitect/document_manager.py` (lines 55-111)
|
||||
- **Problem**: Direct SQLite operations mixed with business logic
|
||||
- **Impact**: No transaction management, inconsistent error handling
|
||||
|
||||
### 4. **Inconsistent File System Access**
|
||||
- **Location**: `tddai/workspace.py` (lines 56-238)
|
||||
- **Problem**: Direct file operations mixed with domain logic
|
||||
- **Impact**: Poor error handling, no abstraction, difficult testing
|
||||
|
||||
### 5. **Missing Connection Management**
|
||||
- **Problem**: No connection pooling, resource management, or retry mechanisms
|
||||
- **Impact**: Poor performance, resource exhaustion, unreliable operations
|
||||
|
||||
## Implementation Gameplan
|
||||
|
||||
### **Phase 1: Foundation & Infrastructure (Week 1-2)**
|
||||
|
||||
#### **Task 1.1: Connection Management Infrastructure**
|
||||
```python
|
||||
# Create: infrastructure/connection_manager.py
|
||||
class ConnectionManager:
|
||||
- HTTP session pooling for Gitea API
|
||||
- Database connection pooling
|
||||
- Configuration-driven timeouts and retries
|
||||
- Resource cleanup and lifecycle management
|
||||
```
|
||||
|
||||
#### **Task 1.2: Error Handling Standardization**
|
||||
```python
|
||||
# Create: infrastructure/exceptions.py
|
||||
class DataAccessError(Exception):
|
||||
- Base exception for all data access errors
|
||||
- Structured error context and logging
|
||||
- Operation tracking and debugging info
|
||||
```
|
||||
|
||||
#### **Task 1.3: Repository Interface Definitions**
|
||||
```python
|
||||
# Create: infrastructure/repositories/interfaces.py
|
||||
- IssueRepository (abstract)
|
||||
- ProjectRepository (abstract)
|
||||
- DocumentRepository (abstract)
|
||||
- WorkspaceRepository (abstract)
|
||||
```
|
||||
|
||||
**Deliverables:**
|
||||
- [ ] Connection manager with HTTP session pooling
|
||||
- [ ] Standardized error hierarchy
|
||||
- [ ] Abstract repository interfaces
|
||||
- [ ] Configuration for data sources
|
||||
|
||||
**Risk Level**: Low (additive changes only)
|
||||
|
||||
### **Phase 2: Repository Implementation (Week 2-3)**
|
||||
|
||||
#### **Task 2.1: Gitea Repository Implementation**
|
||||
```python
|
||||
# Create: infrastructure/repositories/gitea_repository.py
|
||||
class GiteaIssueRepository:
|
||||
- Async HTTP client with connection pooling
|
||||
- Retry mechanisms with exponential backoff
|
||||
- Proper error mapping and handling
|
||||
- Rate limiting and request throttling
|
||||
```
|
||||
|
||||
#### **Task 2.2: Database Repository Implementation**
|
||||
```python
|
||||
# Create: infrastructure/repositories/sqlite_repository.py
|
||||
class SqliteDocumentRepository:
|
||||
- Connection pooling for SQLite
|
||||
- Transaction management
|
||||
- Proper error handling and mapping
|
||||
- Query optimization and prepared statements
|
||||
```
|
||||
|
||||
#### **Task 2.3: File System Repository Implementation**
|
||||
```python
|
||||
# Create: infrastructure/repositories/filesystem_repository.py
|
||||
class FilesystemWorkspaceRepository:
|
||||
- Abstracted file operations
|
||||
- Atomic file operations
|
||||
- Path validation and security
|
||||
- Error handling and recovery
|
||||
```
|
||||
|
||||
**Deliverables:**
|
||||
- [ ] Gitea API repository with async HTTP client
|
||||
- [ ] SQLite repository with transaction support
|
||||
- [ ] File system repository with atomic operations
|
||||
- [ ] Comprehensive error handling for all repositories
|
||||
|
||||
**Risk Level**: Low-Medium (parallel implementation)
|
||||
|
||||
### **Phase 3: Unit of Work Pattern (Week 3-4)**
|
||||
|
||||
#### **Task 3.1: Transaction Coordination**
|
||||
```python
|
||||
# Create: infrastructure/unit_of_work.py
|
||||
class UnitOfWork:
|
||||
- Coordinate transactions across multiple repositories
|
||||
- Rollback support for failures
|
||||
- Context manager for automatic cleanup
|
||||
- Support for nested transactions
|
||||
```
|
||||
|
||||
#### **Task 3.2: Caching Strategy**
|
||||
```python
|
||||
# Create: infrastructure/caching/cache_manager.py
|
||||
class CacheManager:
|
||||
- Multi-level caching (memory, disk, Redis)
|
||||
- Cache invalidation strategies
|
||||
- Performance monitoring
|
||||
- TTL and eviction policies
|
||||
```
|
||||
|
||||
**Deliverables:**
|
||||
- [ ] Unit of Work implementation
|
||||
- [ ] Caching infrastructure
|
||||
- [ ] Transaction coordination
|
||||
- [ ] Performance monitoring
|
||||
|
||||
**Risk Level**: Medium (involves transaction logic)
|
||||
|
||||
### **Phase 4: Service Layer Migration (Week 4-6)**
|
||||
|
||||
#### **Task 4.1: Issue Service Refactoring**
|
||||
```python
|
||||
# Refactor: services/issue_service.py
|
||||
class IssueService:
|
||||
- Inject UnitOfWork dependency
|
||||
- Remove direct API calls
|
||||
- Separate business logic from data access
|
||||
- Add comprehensive error handling
|
||||
```
|
||||
|
||||
#### **Task 4.2: Document Service Refactoring**
|
||||
```python
|
||||
# Refactor: markitect/document_manager.py → services/document_service.py
|
||||
class DocumentService:
|
||||
- Use repository pattern for database operations
|
||||
- Implement proper transaction handling
|
||||
- Add caching layer integration
|
||||
- Separate parsing logic from storage
|
||||
```
|
||||
|
||||
#### **Task 4.3: Workspace Service Refactoring**
|
||||
```python
|
||||
# Refactor: tddai/workspace.py → services/workspace_service.py
|
||||
class WorkspaceService:
|
||||
- Abstract file system operations
|
||||
- Add proper error handling
|
||||
- Implement atomic workspace operations
|
||||
- Add workspace state management
|
||||
```
|
||||
|
||||
**Deliverables:**
|
||||
- [ ] Refactored IssueService using repositories
|
||||
- [ ] New DocumentService with transaction support
|
||||
- [ ] New WorkspaceService with atomic operations
|
||||
- [ ] Backward compatibility adapters
|
||||
|
||||
**Risk Level**: Medium-High (core service changes)
|
||||
|
||||
### **Phase 5: Performance Optimization (Week 6-7)**
|
||||
|
||||
#### **Task 5.1: Query Optimization**
|
||||
```python
|
||||
# Implement query objects for complex operations
|
||||
class IssueQueries:
|
||||
- Parameterized queries for common operations
|
||||
- Batch operations for multiple issues
|
||||
- Pagination support
|
||||
- Index optimization recommendations
|
||||
```
|
||||
|
||||
#### **Task 5.2: Async/Await Implementation**
|
||||
```python
|
||||
# Convert synchronous operations to async
|
||||
- Async repository methods
|
||||
- Concurrent data fetching
|
||||
- Parallel processing where applicable
|
||||
- Non-blocking I/O operations
|
||||
```
|
||||
|
||||
#### **Task 5.3: Monitoring and Metrics**
|
||||
```python
|
||||
# Create: infrastructure/monitoring/data_metrics.py
|
||||
class DataAccessMetrics:
|
||||
- Query performance tracking
|
||||
- Error rate monitoring
|
||||
- Connection pool utilization
|
||||
- Cache hit/miss ratios
|
||||
```
|
||||
|
||||
**Deliverables:**
|
||||
- [ ] Async repository implementations
|
||||
- [ ] Query optimization strategies
|
||||
- [ ] Performance monitoring
|
||||
- [ ] Batch operation support
|
||||
|
||||
**Risk Level**: Medium (performance changes)
|
||||
|
||||
### **Phase 6: Testing & Migration (Week 7-8)**
|
||||
|
||||
#### **Task 6.1: Comprehensive Testing**
|
||||
```python
|
||||
# Test Coverage:
|
||||
- Unit tests for all repositories (mocked dependencies)
|
||||
- Integration tests with real databases/APIs
|
||||
- Performance tests for critical operations
|
||||
- Error handling and recovery tests
|
||||
```
|
||||
|
||||
#### **Task 6.2: Gradual Migration**
|
||||
```python
|
||||
# Migration Strategy:
|
||||
- Feature flags for repository switching
|
||||
- Parallel running of old and new systems
|
||||
- Gradual consumer migration
|
||||
- Monitoring and rollback capabilities
|
||||
```
|
||||
|
||||
**Deliverables:**
|
||||
- [ ] Complete test suite for data access layer
|
||||
- [ ] Migration scripts and tools
|
||||
- [ ] Performance benchmarks
|
||||
- [ ] Documentation and runbooks
|
||||
|
||||
**Risk Level**: Low-Medium (testing and gradual rollout)
|
||||
|
||||
## Specific Implementation Examples
|
||||
|
||||
### **Example 1: IssueService Transformation**
|
||||
|
||||
#### **Before (Current Anti-pattern):**
|
||||
```python
|
||||
class IssueService:
|
||||
def get_issue_details(self, issue_number: int) -> Dict[str, Any]:
|
||||
# Direct dependency creation
|
||||
from tddai.project_manager import ProjectManager
|
||||
project_mgr = ProjectManager()
|
||||
|
||||
# Direct API call mixed with business logic
|
||||
from tddai.config import get_config
|
||||
config = get_config()
|
||||
issue_url = f"{config.issues_api_url}/{issue_number}"
|
||||
detailed_issue = project_mgr._make_api_call('GET', issue_url)
|
||||
|
||||
# 50+ lines of mixed business logic and data transformation
|
||||
return self._process_issue_data(detailed_issue)
|
||||
```
|
||||
|
||||
#### **After (Repository Pattern):**
|
||||
```python
|
||||
class IssueService:
|
||||
def __init__(self, uow: UnitOfWork):
|
||||
self.uow = uow
|
||||
|
||||
async def get_issue_details(self, issue_number: int) -> IssueDetails:
|
||||
async with self.uow:
|
||||
# Clean separation: repository handles data access
|
||||
issue = await self.uow.issues.get_issue(issue_number)
|
||||
project_info = await self.uow.projects.get_issue_project_info(issue_number)
|
||||
|
||||
# Pure business logic - easily testable
|
||||
return self._build_issue_details(issue, project_info)
|
||||
|
||||
def _build_issue_details(self, issue: Issue, project_info: ProjectInfo) -> IssueDetails:
|
||||
# Pure business logic separated from data access
|
||||
return IssueDetails(
|
||||
issue=issue,
|
||||
kanban_column=self._determine_kanban_column(issue, project_info),
|
||||
priority_info=self._extract_priority_info(issue),
|
||||
state_info=self._extract_state_info(issue)
|
||||
)
|
||||
```
|
||||
|
||||
### **Example 2: Connection Management**
|
||||
|
||||
#### **Before (Subprocess-based HTTP):**
|
||||
```python
|
||||
class GiteaHttpClient:
|
||||
def _make_request(self, method: str, url: str, data: Optional[Dict[str, Any]] = None):
|
||||
# New subprocess for every request - very inefficient
|
||||
cmd = ['curl', '-s', '-X', method]
|
||||
if data:
|
||||
cmd.extend(['-d', json.dumps(data)])
|
||||
cmd.append(url)
|
||||
|
||||
result = subprocess.run(cmd, stdout=PIPE, stderr=PIPE, text=True)
|
||||
# Poor error handling
|
||||
if result.returncode != 0:
|
||||
raise Exception(f"HTTP request failed: {result.stderr}")
|
||||
|
||||
return json.loads(result.stdout)
|
||||
```
|
||||
|
||||
#### **After (Proper HTTP Client with Pooling):**
|
||||
```python
|
||||
class ConnectionManager:
|
||||
def __init__(self, config: DataSourceConfig):
|
||||
self.config = config
|
||||
self._http_session = None
|
||||
|
||||
async def get_http_session(self) -> aiohttp.ClientSession:
|
||||
if self._http_session is None:
|
||||
connector = aiohttp.TCPConnector(
|
||||
limit=self.config.connection_pool_size,
|
||||
limit_per_host=5,
|
||||
keepalive_timeout=60
|
||||
)
|
||||
timeout = aiohttp.ClientTimeout(total=self.config.request_timeout)
|
||||
|
||||
self._http_session = aiohttp.ClientSession(
|
||||
connector=connector,
|
||||
timeout=timeout,
|
||||
headers={'Authorization': f'token {self.config.gitea_token}'}
|
||||
)
|
||||
return self._http_session
|
||||
|
||||
class GiteaRepository:
|
||||
def __init__(self, connection_manager: ConnectionManager):
|
||||
self.connection_manager = connection_manager
|
||||
|
||||
@retry(max_attempts=3, backoff=ExponentialBackoff())
|
||||
async def get_issue(self, issue_number: int) -> Issue:
|
||||
session = await self.connection_manager.get_http_session()
|
||||
|
||||
async with session.get(f'/api/v1/repos/.../issues/{issue_number}') as response:
|
||||
if response.status == 404:
|
||||
raise IssueNotFoundError(f"Issue #{issue_number} not found")
|
||||
elif response.status >= 400:
|
||||
raise GiteaApiError(f"API error: {response.status}")
|
||||
|
||||
data = await response.json()
|
||||
return Issue.from_api_data(data)
|
||||
```
|
||||
|
||||
### **Example 3: Transaction Management**
|
||||
|
||||
#### **Before (No Transaction Support):**
|
||||
```python
|
||||
class DocumentManager:
|
||||
def ingest_file(self, file_path: Path) -> Dict[str, Any]:
|
||||
# Multiple separate operations - if any fails, inconsistent state
|
||||
content = self._read_file_content(file_path)
|
||||
ast, parse_time = self._parse_content_to_ast(content)
|
||||
cache_file, cache_time = self._create_performance_cache(file_path.name, ast)
|
||||
|
||||
# Database operation could fail after cache is created
|
||||
self._store_in_database(file_path.name, content)
|
||||
|
||||
return self._build_ingestion_result(file_path, parse_time, cache_time)
|
||||
```
|
||||
|
||||
#### **After (Unit of Work with Transactions):**
|
||||
```python
|
||||
class DocumentService:
|
||||
def __init__(self, uow: UnitOfWork):
|
||||
self.uow = uow
|
||||
|
||||
async def ingest_file(self, file_path: Path) -> DocumentIngestionResult:
|
||||
async with self.uow:
|
||||
# All operations in single transaction
|
||||
content = await self._read_file_content(file_path)
|
||||
ast, parse_time = await self._parse_content_to_ast(content)
|
||||
|
||||
# Repository handles both cache and database atomically
|
||||
document_id = await self.uow.documents.store_document(
|
||||
filename=file_path.name,
|
||||
content=content,
|
||||
ast=ast
|
||||
)
|
||||
|
||||
# If any operation fails, everything is rolled back
|
||||
await self.uow.cache.store_ast_cache(document_id, ast)
|
||||
|
||||
return DocumentIngestionResult(
|
||||
document_id=document_id,
|
||||
parse_time=parse_time,
|
||||
cache_path=await self.uow.documents.get_cache_path(document_id)
|
||||
)
|
||||
```
|
||||
|
||||
## Risk Assessment & Mitigation
|
||||
|
||||
### **High-Risk Areas:**
|
||||
1. **Service Layer Refactoring** - Could break existing functionality
|
||||
2. **Database Transaction Changes** - Risk of data corruption
|
||||
3. **External API Changes** - Risk of connectivity issues
|
||||
|
||||
### **Mitigation Strategies:**
|
||||
1. **Parallel Implementation** - Keep old code until new code is proven
|
||||
2. **Feature Flags** - Toggle between old and new implementations
|
||||
3. **Comprehensive Testing** - Unit, integration, and end-to-end tests
|
||||
4. **Gradual Migration** - Migrate one service at a time
|
||||
5. **Monitoring** - Real-time performance and error monitoring
|
||||
|
||||
### **Rollback Plan:**
|
||||
- Feature flags allow instant rollback to previous implementation
|
||||
- Database migrations are reversible
|
||||
- Configuration changes can be reverted via environment variables
|
||||
- Each phase is independently deployable and reversible
|
||||
|
||||
## Performance Benefits Expected
|
||||
|
||||
### **HTTP Client Improvements:**
|
||||
- **Before**: New subprocess per request (~100-200ms overhead)
|
||||
- **After**: Connection pooling (~5-10ms per request)
|
||||
- **Improvement**: 10-20x faster API operations
|
||||
|
||||
### **Database Operations:**
|
||||
- **Before**: New connection per operation
|
||||
- **After**: Connection pooling and prepared statements
|
||||
- **Improvement**: 3-5x faster database operations
|
||||
|
||||
### **Error Recovery:**
|
||||
- **Before**: Silent failures and inconsistent error handling
|
||||
- **After**: Automatic retries and structured error reporting
|
||||
- **Improvement**: 90% reduction in transient failures
|
||||
|
||||
### **Resource Utilization:**
|
||||
- **Before**: Resource leaks from subprocess and connection management
|
||||
- **After**: Proper resource pooling and cleanup
|
||||
- **Improvement**: 50-70% reduction in resource usage
|
||||
|
||||
## Testing Strategy
|
||||
|
||||
### **Unit Testing:**
|
||||
- Repository interfaces with mock implementations
|
||||
- Business logic separated from data access
|
||||
- Error handling and edge cases
|
||||
- Performance characteristics
|
||||
|
||||
### **Integration Testing:**
|
||||
- Real database and API interactions
|
||||
- Transaction rollback scenarios
|
||||
- Connection pooling behavior
|
||||
- Retry mechanism validation
|
||||
|
||||
### **Performance Testing:**
|
||||
- Load testing for concurrent operations
|
||||
- Memory usage and leak detection
|
||||
- Connection pool utilization
|
||||
- Cache effectiveness measurement
|
||||
|
||||
## Monitoring & Observability
|
||||
|
||||
### **Metrics to Track:**
|
||||
- Request latency percentiles (p50, p95, p99)
|
||||
- Error rates by operation type
|
||||
- Connection pool utilization
|
||||
- Cache hit/miss ratios
|
||||
- Database query performance
|
||||
- API rate limiting compliance
|
||||
|
||||
### **Alerting:**
|
||||
- High error rates or latency spikes
|
||||
- Connection pool exhaustion
|
||||
- Database deadlocks or timeouts
|
||||
- API rate limit violations
|
||||
- Cache performance degradation
|
||||
|
||||
This comprehensive gameplan provides a systematic approach to modernizing data access patterns while maintaining system stability and ensuring measurable performance improvements.
|
||||
Reference in New Issue
Block a user