- Created complete domain layer with pure business logic - Implemented Issue domain models with 48 passing tests - Implemented Project domain models with 31 passing tests - Added domain services for complex business operations - Established clean separation between domain, application, and infrastructure - All 250 tests passing with no breaking changes 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
63 KiB
Domain Logic Separation - Gameplan
Overview
This gameplan implements clean architecture principles by systematically separating domain logic from infrastructure concerns across the MarkiTect codebase. The goal is to create a maintainable, testable, and flexible architecture where business rules are isolated from technical implementation details.
Current Domain Logic Problems
1. Mixed Business Logic and Infrastructure
IssueService - Business Logic Mixed with API Calls
# Current problem in services/issue_service.py (lines 51-107)
class IssueService:
def get_issue_details(self, issue_number: int) -> Dict[str, Any]:
# Business logic mixed with direct API calls
from tddai.project_manager import ProjectManager
project_mgr = ProjectManager()
# Direct infrastructure dependency
issue_url = f"{config.issues_api_url}/{issue_number}"
detailed_issue = project_mgr._make_api_call('GET', issue_url)
# Complex business logic for label processing mixed with data transformation
labels = detailed_issue.get('labels', [])
state_labels = [label['name'] for label in labels if label['name'].startswith('status:')]
# ... 50+ lines of mixed concerns
ProjectManager - Domain Logic Mixed with HTTP Infrastructure
# Current problem in tddai/project_manager.py (lines 35-67)
class ProjectManager:
def _make_api_call(self, method: str, url: str, data: Optional[Dict[str, Any]] = None):
# Infrastructure code mixed with business logic
cmd = ['curl', '-s', '-X', method]
# Project state management logic mixed with HTTP implementation
result = subprocess.run(cmd, stdout=PIPE, stderr=PIPE, text=True)
# Business rules for error handling mixed with HTTP error handling
DocumentManager - AST Processing Mixed with File I/O
# Current problem in markitect/document_manager.py (lines 55-111)
class DocumentManager:
def ingest_file(self, file_path: Path) -> Dict[str, Any]:
# Domain logic for document processing mixed with file operations
content = self._read_file_content(file_path) # File I/O
ast, parse_time = self._parse_content_to_ast(content) # Domain logic
# Database operations mixed with business rules
self._store_in_database(file_path.name, content) # Infrastructure
2. Single Responsibility Principle Violations
Multiple Responsibilities in Single Classes
- IssueService: Issue retrieval, data transformation, coverage analysis, API error handling
- ProjectManager: Project state management, HTTP communication, authentication, error translation
- DocumentManager: Document processing logic, database persistence, file operations, cache management
- WorkspaceManager: Workspace lifecycle, file management, configuration handling, test organization
Domain Logic Separation Strategy
Clean Architecture Layers
┌─────────────────────────────────────────┐
│ Presentation Layer │
│ (CLI Commands, API) │
└─────────────────┬───────────────────────┘
│
┌─────────────────▼───────────────────────┐
│ Application Services │
│ (Use Cases, Workflow Coordination) │
└─────────────────┬───────────────────────┘
│
┌─────────────────▼───────────────────────┐
│ Domain Layer │
│ (Business Logic, Domain Models) │
└─────────────────┬───────────────────────┘
│
┌─────────────────▼───────────────────────┐
│ Infrastructure Layer │
│ (Repositories, External APIs, DB) │
└─────────────────────────────────────────┘
Dependency Direction
- Inward Dependencies: All dependencies point toward the domain layer
- Abstraction: Infrastructure depends on domain interfaces, not implementations
- Isolation: Domain layer has no knowledge of infrastructure details
Implementation Gameplan
Phase 1: Domain Model Extraction (Week 1-2)
Task 1.1: Issue Domain Models
Create Domain Structure:
domain/
├── issues/
│ ├── __init__.py
│ ├── models.py # Issue entities and value objects
│ ├── services.py # Domain services for business logic
│ ├── repositories.py # Repository interfaces
│ └── exceptions.py # Domain-specific exceptions
Issue Domain Models:
# domain/issues/models.py
from dataclasses import dataclass
from typing import List, Optional
from datetime import datetime
from enum import Enum
class IssueState(Enum):
OPEN = "open"
CLOSED = "closed"
IN_PROGRESS = "in_progress"
@dataclass(frozen=True)
class Label:
"""Value object representing an issue label."""
name: str
color: Optional[str] = None
description: Optional[str] = None
def is_state_label(self) -> bool:
"""Check if this is a state-related label."""
return self.name.startswith('status:')
def is_priority_label(self) -> bool:
"""Check if this is a priority-related label."""
return self.name.startswith('priority:')
def is_type_label(self) -> bool:
"""Check if this is a type-related label."""
return self.name in ['bug', 'enhancement', 'feature', 'documentation']
@dataclass(frozen=True)
class LabelCategories:
"""Value object for categorized labels."""
state_labels: List[str]
priority_labels: List[str]
type_labels: List[str]
other_labels: List[str]
@dataclass
class Issue:
"""Issue aggregate root."""
number: int
title: str
state: IssueState
labels: List[Label]
created_at: datetime
updated_at: datetime
milestone: Optional[str] = None
assignee: Optional[str] = None
closed_at: Optional[datetime] = None
def categorize_labels(self) -> LabelCategories:
"""Categorize labels by type - pure domain logic."""
state_labels = [label.name for label in self.labels if label.is_state_label()]
priority_labels = [label.name for label in self.labels if label.is_priority_label()]
type_labels = [label.name for label in self.labels if label.is_type_label()]
other_labels = [label.name for label in self.labels
if not (label.is_state_label() or label.is_priority_label() or label.is_type_label())]
return LabelCategories(
state_labels=state_labels,
priority_labels=priority_labels,
type_labels=type_labels,
other_labels=other_labels
)
def close(self) -> None:
"""Close the issue - domain business rule."""
if self.state == IssueState.CLOSED:
raise ValueError("Issue is already closed")
self.state = IssueState.CLOSED
self.closed_at = datetime.utcnow()
def reopen(self) -> None:
"""Reopen the issue - domain business rule."""
if self.state != IssueState.CLOSED:
raise ValueError("Issue is not closed")
self.state = IssueState.OPEN
self.closed_at = None
Issue Domain Services:
# domain/issues/services.py
from typing import Dict, Any
from .models import Issue, LabelCategories
class IssueStatusService:
"""Domain service for issue status-related business logic."""
def determine_kanban_column(self, issue: Issue, project_info: Dict[str, Any]) -> str:
"""Determine kanban column based on issue state and labels."""
# Pure business logic - no infrastructure dependencies
label_categories = issue.categorize_labels()
# Business rules for kanban column determination
if issue.state == IssueState.CLOSED:
return "Done"
# Check for explicit status labels
for state_label in label_categories.state_labels:
if state_label == "status:in-progress":
return "In Progress"
elif state_label == "status:review":
return "Review"
elif state_label == "status:blocked":
return "Blocked"
# Default for open issues without explicit status
return "Todo"
def extract_priority_info(self, issue: Issue) -> Dict[str, Any]:
"""Extract priority information from issue labels."""
label_categories = issue.categorize_labels()
priority_mapping = {
"priority:low": "Low",
"priority:medium": "Medium",
"priority:high": "High",
"priority:critical": "Critical"
}
for priority_label in label_categories.priority_labels:
if priority_label in priority_mapping:
return {
"level": priority_mapping[priority_label],
"label": priority_label
}
# Default priority
return {"level": "Medium", "label": None}
class IssueValidationService:
"""Domain service for issue validation business rules."""
def validate_issue_creation(self, title: str, labels: List[str]) -> None:
"""Validate issue creation according to business rules."""
if not title or not title.strip():
raise ValueError("Issue title cannot be empty")
if len(title) > 255:
raise ValueError("Issue title cannot exceed 255 characters")
# Business rule: Cannot have conflicting priority labels
priority_labels = [label for label in labels if label.startswith("priority:")]
if len(priority_labels) > 1:
raise ValueError("Issue cannot have multiple priority labels")
Repository Interfaces:
# domain/issues/repositories.py
from abc import ABC, abstractmethod
from typing import List, Optional
from .models import Issue
class IssueRepository(ABC):
"""Repository interface for issue persistence."""
@abstractmethod
async def get_issue(self, issue_number: int) -> Issue:
"""Retrieve issue by number."""
pass
@abstractmethod
async def list_issues(self, state: Optional[str] = None) -> List[Issue]:
"""List issues, optionally filtered by state."""
pass
@abstractmethod
async def save_issue(self, issue: Issue) -> None:
"""Save issue changes."""
pass
@abstractmethod
async def create_issue(self, title: str, description: str, labels: List[str]) -> Issue:
"""Create a new issue."""
pass
class ProjectRepository(ABC):
"""Repository interface for project information."""
@abstractmethod
async def get_issue_project_info(self, issue_number: int) -> Dict[str, Any]:
"""Get project information for an issue."""
pass
@abstractmethod
async def get_kanban_columns(self) -> List[str]:
"""Get available kanban columns for the project."""
pass
Task 1.2: Project Domain Models
Project Domain Structure:
# domain/projects/models.py
from dataclasses import dataclass
from typing import List, Optional, Dict, Any
from datetime import datetime
from enum import Enum
class ProjectState(Enum):
ACTIVE = "active"
ARCHIVED = "archived"
PLANNING = "planning"
@dataclass
class Milestone:
"""Milestone entity."""
id: int
title: str
description: Optional[str]
due_date: Optional[datetime]
state: str
open_issues: int
closed_issues: int
@property
def completion_percentage(self) -> float:
"""Calculate milestone completion percentage."""
total_issues = self.open_issues + self.closed_issues
if total_issues == 0:
return 0.0
return (self.closed_issues / total_issues) * 100
@dataclass
class Project:
"""Project aggregate root."""
name: str
description: str
state: ProjectState
milestones: List[Milestone]
kanban_columns: List[str]
def get_active_milestones(self) -> List[Milestone]:
"""Get milestones that are currently active."""
return [milestone for milestone in self.milestones if milestone.state == "open"]
def calculate_overall_progress(self) -> float:
"""Calculate overall project progress based on milestones."""
if not self.milestones:
return 0.0
total_completion = sum(milestone.completion_percentage for milestone in self.milestones)
return total_completion / len(self.milestones)
# domain/projects/services.py
class ProjectManagementService:
"""Domain service for project management business logic."""
def determine_project_health(self, project: Project) -> str:
"""Determine project health based on business rules."""
progress = project.calculate_overall_progress()
active_milestones = project.get_active_milestones()
# Business rules for project health
if progress >= 90:
return "Excellent"
elif progress >= 70:
return "Good"
elif progress >= 50:
return "Fair"
elif len(active_milestones) == 0:
return "Stalled"
else:
return "Needs Attention"
Task 1.3: Document Domain Models
Document Domain Structure:
# domain/documents/models.py
from dataclasses import dataclass
from typing import Dict, Any, Optional, List
from datetime import datetime
from enum import Enum
class DocumentType(Enum):
MARKDOWN = "markdown"
TEXT = "text"
CODE = "code"
class ProcessingStatus(Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class DocumentMetadata:
"""Value object for document metadata."""
filename: str
size: int
created_at: datetime
modified_at: datetime
content_type: DocumentType
encoding: str = "utf-8"
@dataclass
class ASTNode:
"""Value object representing a node in the AST."""
node_type: str
content: Optional[str]
attributes: Dict[str, Any]
children: List['ASTNode']
@dataclass
class Document:
"""Document aggregate root."""
id: Optional[str]
metadata: DocumentMetadata
content: str
ast_data: Optional[Dict[str, Any]]
processing_status: ProcessingStatus
processing_time: Optional[float] = None
error_message: Optional[str] = None
def mark_processing_started(self) -> None:
"""Mark document as processing started."""
self.processing_status = ProcessingStatus.PROCESSING
def mark_processing_completed(self, ast_data: Dict[str, Any], processing_time: float) -> None:
"""Mark document processing as completed."""
self.ast_data = ast_data
self.processing_time = processing_time
self.processing_status = ProcessingStatus.COMPLETED
self.error_message = None
def mark_processing_failed(self, error_message: str) -> None:
"""Mark document processing as failed."""
self.processing_status = ProcessingStatus.FAILED
self.error_message = error_message
self.ast_data = None
# domain/documents/services.py
class DocumentProcessingService:
"""Domain service for document processing business logic."""
def validate_document_content(self, content: str, document_type: DocumentType) -> None:
"""Validate document content based on business rules."""
if not content or not content.strip():
raise ValueError("Document content cannot be empty")
if len(content.encode('utf-8')) > 100 * 1024 * 1024: # 100MB limit
raise ValueError("Document size exceeds maximum allowed size")
# Type-specific validation
if document_type == DocumentType.MARKDOWN:
self._validate_markdown_content(content)
def _validate_markdown_content(self, content: str) -> None:
"""Validate markdown-specific business rules."""
# Business rule: Markdown documents should have at least one heading
if not any(line.strip().startswith('#') for line in content.split('\n')):
# This is a warning, not an error - log it but don't fail
pass
def determine_processing_priority(self, document: Document) -> int:
"""Determine processing priority based on business rules."""
# Business rules for processing priority
if document.metadata.size < 1024: # Small files first
return 1
elif document.metadata.content_type == DocumentType.MARKDOWN:
return 2
else:
return 3
Task 1.4: Workspace Domain Models
Workspace Domain Structure:
# domain/workspaces/models.py
from dataclasses import dataclass
from typing import List, Optional, Dict, Any
from datetime import datetime
from enum import Enum
from pathlib import Path
class WorkspaceState(Enum):
CLEAN = "clean"
ACTIVE = "active"
DIRTY = "dirty"
@dataclass
class TestFile:
"""Value object representing a test file."""
filename: str
scenario: str
status: str
created_at: datetime
@dataclass
class Workspace:
"""Workspace aggregate root."""
issue_number: int
directory_path: Path
state: WorkspaceState
created_at: datetime
test_files: List[TestFile]
requirements_path: Optional[Path] = None
test_plan_path: Optional[Path] = None
def mark_active(self) -> None:
"""Mark workspace as active - business rule."""
if self.state == WorkspaceState.ACTIVE:
return # Already active
self.state = WorkspaceState.ACTIVE
def mark_dirty(self) -> None:
"""Mark workspace as dirty when files are modified."""
if self.state == WorkspaceState.CLEAN:
self.state = WorkspaceState.DIRTY
def can_be_cleaned(self) -> bool:
"""Check if workspace can be cleaned based on business rules."""
# Business rule: Only clean or dirty workspaces can be cleaned
return self.state in [WorkspaceState.CLEAN, WorkspaceState.DIRTY]
def add_test_file(self, filename: str, scenario: str) -> None:
"""Add a test file to the workspace."""
test_file = TestFile(
filename=filename,
scenario=scenario,
status="created",
created_at=datetime.utcnow()
)
self.test_files.append(test_file)
self.mark_dirty()
# domain/workspaces/services.py
class WorkspaceManagementService:
"""Domain service for workspace management business logic."""
def validate_workspace_creation(self, issue_number: int) -> None:
"""Validate workspace creation according to business rules."""
if issue_number <= 0:
raise ValueError("Issue number must be positive")
def determine_cleanup_eligibility(self, workspace: Workspace) -> bool:
"""Determine if workspace is eligible for cleanup."""
# Business rules for cleanup eligibility
if not workspace.can_be_cleaned():
return False
# Don't cleanup recently created workspaces (< 1 hour old)
if (datetime.utcnow() - workspace.created_at).total_seconds() < 3600:
return False
return True
def calculate_workspace_summary(self, workspaces: List[Workspace]) -> Dict[str, Any]:
"""Calculate summary statistics for workspaces."""
total_workspaces = len(workspaces)
active_workspaces = len([w for w in workspaces if w.state == WorkspaceState.ACTIVE])
total_test_files = sum(len(w.test_files) for w in workspaces)
return {
"total_workspaces": total_workspaces,
"active_workspaces": active_workspaces,
"clean_workspaces": len([w for w in workspaces if w.state == WorkspaceState.CLEAN]),
"dirty_workspaces": len([w for w in workspaces if w.state == WorkspaceState.DIRTY]),
"total_test_files": total_test_files,
"average_test_files_per_workspace": total_test_files / total_workspaces if total_workspaces > 0 else 0
}
Deliverables:
- Complete domain model hierarchy for all major entities
- Domain services containing pure business logic
- Repository interfaces defining data access contracts
- Domain-specific exceptions and error handling
- Value objects for immutable domain concepts
Risk Level: Low (purely additive, no infrastructure changes)
Phase 2: Infrastructure Layer Implementation (Week 2-3)
Task 2.1: Repository Implementations
Gitea Repository Implementation:
# infrastructure/repositories/gitea_issue_repository.py
from typing import List, Optional, Dict, Any
from domain.issues.repositories import IssueRepository
from domain.issues.models import Issue, Label, IssueState
from infrastructure.connection_manager import ConnectionManager
from infrastructure.exceptions import DataAccessError
import aiohttp
class GiteaIssueRepository(IssueRepository):
"""Gitea-specific implementation of issue repository."""
def __init__(self, connection_manager: ConnectionManager):
self.connection_manager = connection_manager
async def get_issue(self, issue_number: int) -> Issue:
"""Get issue from Gitea API."""
session = await self.connection_manager.get_http_session()
url = f"/api/v1/repos/{self.connection_manager.config.repo_owner}/{self.connection_manager.config.repo_name}/issues/{issue_number}"
try:
async with session.get(url) as response:
if response.status == 404:
raise IssueNotFoundError(f"Issue #{issue_number} not found")
response.raise_for_status()
data = await response.json()
return self._map_api_data_to_issue(data)
except aiohttp.ClientError as e:
raise DataAccessError(
message=f"Failed to fetch issue #{issue_number}",
operation="get_issue",
context={"issue_number": issue_number, "error": str(e)}
) from e
async def list_issues(self, state: Optional[str] = None) -> List[Issue]:
"""List issues from Gitea API."""
session = await self.connection_manager.get_http_session()
params = {}
if state:
params["state"] = state
url = f"/api/v1/repos/{self.connection_manager.config.repo_owner}/{self.connection_manager.config.repo_name}/issues"
try:
async with session.get(url, params=params) as response:
response.raise_for_status()
data = await response.json()
return [self._map_api_data_to_issue(issue_data) for issue_data in data]
except aiohttp.ClientError as e:
raise DataAccessError(
message="Failed to list issues",
operation="list_issues",
context={"state": state, "error": str(e)}
) from e
def _map_api_data_to_issue(self, data: Dict[str, Any]) -> Issue:
"""Map Gitea API data to domain Issue model."""
labels = [Label(name=label["name"]) for label in data.get("labels", [])]
state = IssueState.OPEN if data["state"] == "open" else IssueState.CLOSED
return Issue(
number=data["number"],
title=data["title"],
state=state,
labels=labels,
created_at=datetime.fromisoformat(data["created_at"].replace("Z", "+00:00")),
updated_at=datetime.fromisoformat(data["updated_at"].replace("Z", "+00:00")),
milestone=data.get("milestone", {}).get("title") if data.get("milestone") else None,
assignee=data.get("assignee", {}).get("login") if data.get("assignee") else None,
closed_at=datetime.fromisoformat(data["closed_at"].replace("Z", "+00:00")) if data.get("closed_at") else None
)
File System Repository Implementation:
# infrastructure/repositories/filesystem_workspace_repository.py
from typing import List, Optional
from pathlib import Path
from domain.workspaces.repositories import WorkspaceRepository
from domain.workspaces.models import Workspace, WorkspaceState, TestFile
from infrastructure.exceptions import DataAccessError
import json
import shutil
class FilesystemWorkspaceRepository(WorkspaceRepository):
"""File system implementation of workspace repository."""
def __init__(self, base_workspace_dir: Path):
self.base_workspace_dir = base_workspace_dir
self.base_workspace_dir.mkdir(parents=True, exist_ok=True)
async def create_workspace(self, issue_number: int) -> Workspace:
"""Create a new workspace on the file system."""
workspace_dir = self.base_workspace_dir / f"issue_{issue_number}"
if workspace_dir.exists():
raise DataAccessError(
message=f"Workspace for issue #{issue_number} already exists",
operation="create_workspace",
context={"issue_number": issue_number, "path": str(workspace_dir)}
)
try:
workspace_dir.mkdir(parents=True)
# Create standard workspace files
requirements_path = workspace_dir / "requirements.md"
test_plan_path = workspace_dir / "test_plan.md"
requirements_path.write_text(self._generate_requirements_template(issue_number))
test_plan_path.write_text(self._generate_test_plan_template(issue_number))
workspace = Workspace(
issue_number=issue_number,
directory_path=workspace_dir,
state=WorkspaceState.CLEAN,
created_at=datetime.utcnow(),
test_files=[],
requirements_path=requirements_path,
test_plan_path=test_plan_path
)
# Save workspace metadata
await self._save_workspace_metadata(workspace)
return workspace
except OSError as e:
raise DataAccessError(
message=f"Failed to create workspace for issue #{issue_number}",
operation="create_workspace",
context={"issue_number": issue_number, "error": str(e)}
) from e
async def get_workspace(self, issue_number: int) -> Workspace:
"""Get workspace from file system."""
workspace_dir = self.base_workspace_dir / f"issue_{issue_number}"
if not workspace_dir.exists():
raise WorkspaceNotFoundError(f"Workspace for issue #{issue_number} not found")
return await self._load_workspace_metadata(workspace_dir)
async def delete_workspace(self, issue_number: int) -> None:
"""Delete workspace from file system."""
workspace_dir = self.base_workspace_dir / f"issue_{issue_number}"
if not workspace_dir.exists():
return # Already deleted
try:
shutil.rmtree(workspace_dir)
except OSError as e:
raise DataAccessError(
message=f"Failed to delete workspace for issue #{issue_number}",
operation="delete_workspace",
context={"issue_number": issue_number, "error": str(e)}
) from e
async def _save_workspace_metadata(self, workspace: Workspace) -> None:
"""Save workspace metadata to JSON file."""
metadata_file = workspace.directory_path / ".workspace_metadata.json"
metadata = {
"issue_number": workspace.issue_number,
"state": workspace.state.value,
"created_at": workspace.created_at.isoformat(),
"test_files": [
{
"filename": tf.filename,
"scenario": tf.scenario,
"status": tf.status,
"created_at": tf.created_at.isoformat()
}
for tf in workspace.test_files
]
}
metadata_file.write_text(json.dumps(metadata, indent=2))
async def _load_workspace_metadata(self, workspace_dir: Path) -> Workspace:
"""Load workspace metadata from JSON file."""
metadata_file = workspace_dir / ".workspace_metadata.json"
if not metadata_file.exists():
# Create minimal workspace from directory structure
return Workspace(
issue_number=int(workspace_dir.name.split("_")[1]),
directory_path=workspace_dir,
state=WorkspaceState.ACTIVE, # Assume active if no metadata
created_at=datetime.fromtimestamp(workspace_dir.stat().st_ctime),
test_files=[],
requirements_path=workspace_dir / "requirements.md" if (workspace_dir / "requirements.md").exists() else None,
test_plan_path=workspace_dir / "test_plan.md" if (workspace_dir / "test_plan.md").exists() else None
)
metadata = json.loads(metadata_file.read_text())
test_files = [
TestFile(
filename=tf["filename"],
scenario=tf["scenario"],
status=tf["status"],
created_at=datetime.fromisoformat(tf["created_at"])
)
for tf in metadata.get("test_files", [])
]
return Workspace(
issue_number=metadata["issue_number"],
directory_path=workspace_dir,
state=WorkspaceState(metadata["state"]),
created_at=datetime.fromisoformat(metadata["created_at"]),
test_files=test_files,
requirements_path=workspace_dir / "requirements.md" if (workspace_dir / "requirements.md").exists() else None,
test_plan_path=workspace_dir / "test_plan.md" if (workspace_dir / "test_plan.md").exists() else None
)
Task 2.2: Unit of Work Implementation
Unit of Work Pattern:
# infrastructure/unit_of_work.py
from typing import Optional
from abc import ABC, abstractmethod
from domain.issues.repositories import IssueRepository, ProjectRepository
from domain.documents.repositories import DocumentRepository
from domain.workspaces.repositories import WorkspaceRepository
class UnitOfWork(ABC):
"""Abstract unit of work for coordinating transactions."""
issues: IssueRepository
projects: ProjectRepository
documents: DocumentRepository
workspaces: WorkspaceRepository
@abstractmethod
async def __aenter__(self):
"""Start transaction."""
pass
@abstractmethod
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Commit or rollback transaction."""
pass
@abstractmethod
async def commit(self):
"""Commit transaction."""
pass
@abstractmethod
async def rollback(self):
"""Rollback transaction."""
pass
class SqliteUnitOfWork(UnitOfWork):
"""SQLite implementation of unit of work."""
def __init__(self, database_path: str, workspace_dir: str, connection_manager: ConnectionManager):
self.database_path = database_path
self.workspace_dir = workspace_dir
self.connection_manager = connection_manager
self._connection = None
self._transaction = None
async def __aenter__(self):
# Initialize repositories with shared connection
self.issues = GiteaIssueRepository(self.connection_manager)
self.projects = GiteaProjectRepository(self.connection_manager)
self.documents = SqliteDocumentRepository(self.database_path)
self.workspaces = FilesystemWorkspaceRepository(Path(self.workspace_dir))
# Start database transaction
self._connection = await self._get_database_connection()
self._transaction = await self._connection.begin()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
await self.rollback()
else:
await self.commit()
if self._connection:
await self._connection.close()
async def commit(self):
"""Commit all changes."""
if self._transaction:
await self._transaction.commit()
async def rollback(self):
"""Rollback all changes."""
if self._transaction:
await self._transaction.rollback()
Deliverables:
- Repository implementations for all external systems
- Unit of Work pattern for transaction coordination
- Connection management and resource pooling
- Error handling and retry mechanisms
Risk Level: Medium (involves external system integration)
Phase 3: Application Services Layer (Week 3-4)
Task 3.1: Issue Application Service
Application Service Implementation:
# application/issue_application_service.py
from typing import List, Dict, Any
from domain.issues.models import Issue
from domain.issues.services import IssueStatusService, IssueValidationService
from infrastructure.unit_of_work import UnitOfWork
from dataclasses import dataclass
@dataclass
class IssueDetailsResult:
"""Result object for issue details query."""
issue: Issue
kanban_column: str
priority_info: Dict[str, Any]
project_context: Dict[str, Any]
class IssueApplicationService:
"""Application service for issue-related use cases."""
def __init__(self, uow: UnitOfWork):
self.uow = uow
self.status_service = IssueStatusService()
self.validation_service = IssueValidationService()
async def get_issue_details(self, issue_number: int) -> IssueDetailsResult:
"""Get detailed issue information with business logic applied."""
async with self.uow:
# Data access through repositories
issue = await self.uow.issues.get_issue(issue_number)
project_info = await self.uow.projects.get_issue_project_info(issue_number)
# Apply domain business logic
kanban_column = self.status_service.determine_kanban_column(issue, project_info)
priority_info = self.status_service.extract_priority_info(issue)
return IssueDetailsResult(
issue=issue,
kanban_column=kanban_column,
priority_info=priority_info,
project_context=project_info
)
async def list_issues_by_state(self, state: str) -> List[Issue]:
"""List issues filtered by state."""
async with self.uow:
return await self.uow.issues.list_issues(state=state)
async def create_issue(self, title: str, description: str, labels: List[str]) -> Issue:
"""Create a new issue with validation."""
# Apply domain validation
self.validation_service.validate_issue_creation(title, labels)
async with self.uow:
issue = await self.uow.issues.create_issue(title, description, labels)
await self.uow.commit()
return issue
async def close_issue(self, issue_number: int) -> Issue:
"""Close an issue using domain business rules."""
async with self.uow:
issue = await self.uow.issues.get_issue(issue_number)
# Apply domain business logic
issue.close()
await self.uow.issues.save_issue(issue)
await self.uow.commit()
return issue
Task 3.2: Document Application Service
Document Processing Application Service:
# application/document_application_service.py
from typing import List, Dict, Any
from pathlib import Path
from domain.documents.models import Document, DocumentMetadata, DocumentType, ProcessingStatus
from domain.documents.services import DocumentProcessingService
from infrastructure.unit_of_work import UnitOfWork
from dataclasses import dataclass
import time
@dataclass
class DocumentIngestionResult:
"""Result object for document ingestion."""
document_id: str
processing_time: float
ast_node_count: int
cache_path: Path
class DocumentApplicationService:
"""Application service for document-related use cases."""
def __init__(self, uow: UnitOfWork):
self.uow = uow
self.processing_service = DocumentProcessingService()
async def ingest_file(self, file_path: Path) -> DocumentIngestionResult:
"""Ingest a file into the document system."""
async with self.uow:
# Read file content
content = file_path.read_text(encoding='utf-8')
# Create document metadata
file_stat = file_path.stat()
metadata = DocumentMetadata(
filename=file_path.name,
size=file_stat.st_size,
created_at=datetime.fromtimestamp(file_stat.st_ctime),
modified_at=datetime.fromtimestamp(file_stat.st_mtime),
content_type=self._determine_content_type(file_path),
encoding='utf-8'
)
# Create document entity
document = Document(
id=None, # Will be assigned by repository
metadata=metadata,
content=content,
ast_data=None,
processing_status=ProcessingStatus.PENDING
)
# Apply domain validation
self.processing_service.validate_document_content(content, metadata.content_type)
# Start processing
document.mark_processing_started()
start_time = time.time()
# Process AST (this could be delegated to a domain service)
ast_data = await self._process_ast(content, metadata.content_type)
processing_time = time.time() - start_time
# Mark processing completed
document.mark_processing_completed(ast_data, processing_time)
# Store document
document_id = await self.uow.documents.store_document(document)
# Create cache
cache_path = await self.uow.documents.create_cache(document_id, ast_data)
await self.uow.commit()
return DocumentIngestionResult(
document_id=document_id,
processing_time=processing_time,
ast_node_count=self._count_ast_nodes(ast_data),
cache_path=cache_path
)
async def search_documents(self, query: str) -> List[Document]:
"""Search documents by content."""
async with self.uow:
return await self.uow.documents.search_content(query)
async def get_document_summary(self) -> Dict[str, Any]:
"""Get summary statistics for all documents."""
async with self.uow:
all_documents = await self.uow.documents.list_all_documents()
total_documents = len(all_documents)
total_size = sum(doc.metadata.size for doc in all_documents)
avg_processing_time = sum(doc.processing_time or 0 for doc in all_documents) / total_documents if total_documents > 0 else 0
status_counts = {}
for doc in all_documents:
status = doc.processing_status.value
status_counts[status] = status_counts.get(status, 0) + 1
return {
"total_documents": total_documents,
"total_size_bytes": total_size,
"average_processing_time": avg_processing_time,
"status_breakdown": status_counts,
"content_type_breakdown": self._get_content_type_breakdown(all_documents)
}
def _determine_content_type(self, file_path: Path) -> DocumentType:
"""Determine content type from file extension."""
suffix = file_path.suffix.lower()
if suffix in ['.md', '.markdown']:
return DocumentType.MARKDOWN
elif suffix in ['.py', '.js', '.ts', '.java', '.cpp', '.c']:
return DocumentType.CODE
else:
return DocumentType.TEXT
async def _process_ast(self, content: str, content_type: DocumentType) -> Dict[str, Any]:
"""Process content into AST - this could be a domain service."""
# This would delegate to appropriate parser based on content type
if content_type == DocumentType.MARKDOWN:
return await self._parse_markdown_ast(content)
else:
return {"type": "text", "content": content}
def _count_ast_nodes(self, ast_data: Dict[str, Any]) -> int:
"""Count nodes in AST data."""
if not ast_data:
return 0
count = 1 # Current node
children = ast_data.get("children", [])
for child in children:
count += self._count_ast_nodes(child)
return count
Task 3.3: Workspace Application Service
Workspace Management Application Service:
# application/workspace_application_service.py
from typing import List, Dict, Any
from domain.workspaces.models import Workspace
from domain.workspaces.services import WorkspaceManagementService
from infrastructure.unit_of_work import UnitOfWork
from dataclasses import dataclass
@dataclass
class WorkspaceCreationResult:
"""Result object for workspace creation."""
workspace: Workspace
requirements_file_created: bool
test_plan_file_created: bool
class WorkspaceApplicationService:
"""Application service for workspace-related use cases."""
def __init__(self, uow: UnitOfWork):
self.uow = uow
self.management_service = WorkspaceManagementService()
async def create_issue_workspace(self, issue_number: int) -> WorkspaceCreationResult:
"""Create a new workspace for an issue."""
# Apply domain validation
self.management_service.validate_workspace_creation(issue_number)
async with self.uow:
workspace = await self.uow.workspaces.create_workspace(issue_number)
await self.uow.commit()
return WorkspaceCreationResult(
workspace=workspace,
requirements_file_created=workspace.requirements_path is not None,
test_plan_file_created=workspace.test_plan_path is not None
)
async def cleanup_workspace(self, issue_number: int) -> bool:
"""Clean up a workspace if eligible."""
async with self.uow:
workspace = await self.uow.workspaces.get_workspace(issue_number)
# Apply domain business rules
if not self.management_service.determine_cleanup_eligibility(workspace):
return False
await self.uow.workspaces.delete_workspace(issue_number)
await self.uow.commit()
return True
async def add_test_to_workspace(self, issue_number: int, test_scenario: str) -> Workspace:
"""Add a test file to a workspace."""
async with self.uow:
workspace = await self.uow.workspaces.get_workspace(issue_number)
# Apply domain business logic
test_filename = f"test_issue_{issue_number}_{test_scenario}.py"
workspace.add_test_file(test_filename, test_scenario)
await self.uow.workspaces.save_workspace(workspace)
await self.uow.commit()
return workspace
async def get_workspace_summary(self) -> Dict[str, Any]:
"""Get summary of all workspaces."""
async with self.uow:
all_workspaces = await self.uow.workspaces.list_all_workspaces()
# Apply domain business logic for summary calculation
return self.management_service.calculate_workspace_summary(all_workspaces)
Deliverables:
- Application services for all major use cases
- Use case orchestration with domain service coordination
- Proper error handling and transaction management
- Result objects for complex return data
Risk Level: Medium (coordination logic, transaction handling)
Phase 4: Migration and Backward Compatibility (Week 4-5)
Task 4.1: Backward Compatibility Adapters
Legacy Service Adapters:
# adapters/legacy_issue_service_adapter.py
from typing import Dict, Any
from services.issue_service import IssueService as LegacyIssueService
from application.issue_application_service import IssueApplicationService
from infrastructure.unit_of_work import UnitOfWork
class LegacyIssueServiceAdapter(LegacyIssueService):
"""Adapter to maintain backward compatibility with existing IssueService API."""
def __init__(self, uow: UnitOfWork):
# Don't call super().__init__() to avoid legacy initialization
self.application_service = IssueApplicationService(uow)
def get_issue_details(self, issue_number: int) -> Dict[str, Any]:
"""Legacy method - converts async new API to sync old API."""
import asyncio
# Run async method in sync context
result = asyncio.run(self.application_service.get_issue_details(issue_number))
# Convert new result format to legacy format
return {
"number": result.issue.number,
"title": result.issue.title,
"state": result.issue.state.value,
"labels": [{"name": label.name} for label in result.issue.labels],
"kanban_column": result.kanban_column,
"priority": result.priority_info,
"project_info": result.project_context,
"created_at": result.issue.created_at.isoformat(),
"updated_at": result.issue.updated_at.isoformat()
}
def get_issue(self, issue_number: int) -> Dict[str, Any]:
"""Legacy method for basic issue retrieval."""
result = self.get_issue_details(issue_number)
# Return subset for basic get_issue method
return {
"number": result["number"],
"title": result["title"],
"state": result["state"],
"labels": result["labels"]
}
# Configuration for using adapter
# services/__init__.py
from config import get_unified_config
from infrastructure.unit_of_work import SqliteUnitOfWork
from infrastructure.connection_manager import ConnectionManager
from adapters.legacy_issue_service_adapter import LegacyIssueServiceAdapter
def get_issue_service():
"""Factory function for getting issue service (backward compatible)."""
config = get_unified_config()
# Check feature flag for new architecture
if config.use_new_architecture:
connection_manager = ConnectionManager(config)
uow = SqliteUnitOfWork(config.database_path, config.workspace_dir, connection_manager)
return LegacyIssueServiceAdapter(uow)
else:
# Fall back to legacy implementation
from services.issue_service import IssueService
return IssueService()
Task 4.2: Feature Flag Configuration
Feature Flag System:
# config/feature_flags.py
from dataclasses import dataclass
from typing import Dict, Any
@dataclass
class FeatureFlags:
"""Feature flags for gradual migration."""
use_new_architecture: bool = False
use_domain_services: bool = False
use_repository_pattern: bool = False
use_unit_of_work: bool = False
@classmethod
def from_config(cls, config_dict: Dict[str, Any]) -> 'FeatureFlags':
"""Create feature flags from configuration."""
return cls(
use_new_architecture=config_dict.get('USE_NEW_ARCHITECTURE', False),
use_domain_services=config_dict.get('USE_DOMAIN_SERVICES', False),
use_repository_pattern=config_dict.get('USE_REPOSITORY_PATTERN', False),
use_unit_of_work=config_dict.get('USE_UNIT_OF_WORK', False)
)
# Integration with existing config
# config/manager.py (additions)
@dataclass
class MarkitectConfig(BaseConfig):
# ... existing fields ...
# Feature flags for migration
use_new_architecture: bool = False
use_domain_services: bool = False
use_repository_pattern: bool = False
use_unit_of_work: bool = False
def get_feature_flags(self) -> FeatureFlags:
"""Get feature flags configuration."""
return FeatureFlags(
use_new_architecture=self.use_new_architecture,
use_domain_services=self.use_domain_services,
use_repository_pattern=self.use_repository_pattern,
use_unit_of_work=self.use_unit_of_work
)
Task 4.3: Gradual Migration Strategy
Migration Phases:
# migration/migration_manager.py
from typing import List, Dict, Any
from config.feature_flags import FeatureFlags
import logging
class MigrationManager:
"""Manages gradual migration to new architecture."""
def __init__(self, feature_flags: FeatureFlags):
self.feature_flags = feature_flags
self.logger = logging.getLogger(__name__)
def should_use_new_issue_service(self) -> bool:
"""Determine if new issue service should be used."""
return self.feature_flags.use_new_architecture and self.feature_flags.use_repository_pattern
def should_use_domain_services(self) -> bool:
"""Determine if domain services should be used."""
return self.feature_flags.use_domain_services
def log_migration_decision(self, component: str, use_new: bool, reason: str) -> None:
"""Log migration decisions for monitoring."""
self.logger.info(
f"Migration decision for {component}: "
f"{'NEW' if use_new else 'LEGACY'} architecture. "
f"Reason: {reason}"
)
# Usage in service factories
def create_issue_service():
"""Create issue service with migration logic."""
config = get_unified_config()
feature_flags = config.get_feature_flags()
migration_manager = MigrationManager(feature_flags)
if migration_manager.should_use_new_issue_service():
migration_manager.log_migration_decision(
"IssueService",
True,
"Feature flags enabled for new architecture"
)
# Use new architecture
connection_manager = ConnectionManager(config)
uow = SqliteUnitOfWork(config.database_path, config.workspace_dir, connection_manager)
return LegacyIssueServiceAdapter(uow)
else:
migration_manager.log_migration_decision(
"IssueService",
False,
"Feature flags not enabled or fallback required"
)
# Use legacy architecture
from services.issue_service import IssueService
return IssueService()
Deliverables:
- Backward compatibility adapters for all migrated services
- Feature flag system for gradual rollout
- Migration monitoring and logging
- Rollback mechanisms for failed migrations
Risk Level: High (involves changing existing behavior)
Phase 5: Testing and Validation (Week 5-6)
Task 5.1: Domain Logic Testing
Pure Domain Testing:
# tests/unit/domain/test_issue_models.py
import pytest
from datetime import datetime
from domain.issues.models import Issue, Label, IssueState, LabelCategories
class TestIssue:
"""Test Issue domain model behavior."""
def test_issue_creation_with_valid_data(self):
# Arrange & Act
issue = Issue(
number=123,
title="Test Issue",
state=IssueState.OPEN,
labels=[Label("bug"), Label("priority:high")],
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
)
# Assert
assert issue.number == 123
assert issue.title == "Test Issue"
assert issue.state == IssueState.OPEN
assert len(issue.labels) == 2
def test_categorize_labels_correctly_separates_types(self):
# Arrange
labels = [
Label("bug"), # type label
Label("priority:high"), # priority label
Label("status:in-progress"), # state label
Label("documentation"), # type label
Label("custom-label") # other label
]
issue = Issue(
number=1,
title="Test",
state=IssueState.OPEN,
labels=labels,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
)
# Act
categories = issue.categorize_labels()
# Assert
assert "bug" in categories.type_labels
assert "documentation" in categories.type_labels
assert "priority:high" in categories.priority_labels
assert "status:in-progress" in categories.state_labels
assert "custom-label" in categories.other_labels
def test_close_issue_changes_state_and_sets_closed_at(self):
# Arrange
issue = Issue(
number=1,
title="Test",
state=IssueState.OPEN,
labels=[],
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
)
# Act
issue.close()
# Assert
assert issue.state == IssueState.CLOSED
assert issue.closed_at is not None
def test_close_already_closed_issue_raises_error(self):
# Arrange
issue = Issue(
number=1,
title="Test",
state=IssueState.CLOSED,
labels=[],
created_at=datetime.utcnow(),
updated_at=datetime.utcnow(),
closed_at=datetime.utcnow()
)
# Act & Assert
with pytest.raises(ValueError, match="Issue is already closed"):
issue.close()
# tests/unit/domain/test_issue_services.py
class TestIssueStatusService:
"""Test business logic in issue status service."""
@pytest.fixture
def service(self):
return IssueStatusService()
def test_determine_kanban_column_for_closed_issue(self, service):
# Arrange
issue = Issue(
number=1,
title="Closed Issue",
state=IssueState.CLOSED,
labels=[],
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
)
project_info = {"kanban_columns": ["Todo", "In Progress", "Review", "Done"]}
# Act
column = service.determine_kanban_column(issue, project_info)
# Assert
assert column == "Done"
@pytest.mark.parametrize("status_label,expected_column", [
("status:in-progress", "In Progress"),
("status:review", "Review"),
("status:blocked", "Blocked"),
])
def test_determine_kanban_column_based_on_status_labels(self, service, status_label, expected_column):
# Arrange
issue = Issue(
number=1,
title="Test Issue",
state=IssueState.OPEN,
labels=[Label(status_label)],
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
)
project_info = {"kanban_columns": ["Todo", "In Progress", "Review", "Blocked", "Done"]}
# Act
column = service.determine_kanban_column(issue, project_info)
# Assert
assert column == expected_column
Task 5.2: Application Service Testing
Application Service Testing with Mocks:
# tests/unit/application/test_issue_application_service.py
import pytest
from unittest.mock import Mock, AsyncMock
from application.issue_application_service import IssueApplicationService
from domain.issues.models import Issue, Label, IssueState
from infrastructure.unit_of_work import UnitOfWork
class TestIssueApplicationService:
"""Test application service coordination logic."""
@pytest.fixture
def mock_uow(self):
uow = Mock(spec=UnitOfWork)
uow.issues = AsyncMock()
uow.projects = AsyncMock()
uow.commit = AsyncMock()
uow.rollback = AsyncMock()
uow.__aenter__ = AsyncMock(return_value=uow)
uow.__aexit__ = AsyncMock(return_value=None)
return uow
@pytest.fixture
def service(self, mock_uow):
return IssueApplicationService(mock_uow)
async def test_get_issue_details_coordinates_repositories_and_domain_services(self, service, mock_uow):
# Arrange
issue = Issue(
number=123,
title="Test Issue",
state=IssueState.OPEN,
labels=[Label("priority:high"), Label("status:in-progress")],
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
)
project_info = {"kanban_columns": ["Todo", "In Progress", "Done"]}
mock_uow.issues.get_issue.return_value = issue
mock_uow.projects.get_issue_project_info.return_value = project_info
# Act
result = await service.get_issue_details(123)
# Assert
assert result.issue == issue
assert result.kanban_column == "In Progress" # Based on status:in-progress label
assert result.priority_info["level"] == "High" # Based on priority:high label
assert result.project_context == project_info
# Verify repository calls
mock_uow.issues.get_issue.assert_called_once_with(123)
mock_uow.projects.get_issue_project_info.assert_called_once_with(123)
async def test_create_issue_applies_validation_and_saves(self, service, mock_uow):
# Arrange
created_issue = Issue(
number=456,
title="New Issue",
state=IssueState.OPEN,
labels=[Label("bug")],
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
)
mock_uow.issues.create_issue.return_value = created_issue
# Act
result = await service.create_issue("New Issue", "Description", ["bug"])
# Assert
assert result == created_issue
mock_uow.issues.create_issue.assert_called_once_with("New Issue", "Description", ["bug"])
mock_uow.commit.assert_called_once()
async def test_create_issue_with_invalid_title_raises_validation_error(self, service, mock_uow):
# Act & Assert
with pytest.raises(ValueError, match="Issue title cannot be empty"):
await service.create_issue("", "Description", ["bug"])
# Verify no repository calls were made
mock_uow.issues.create_issue.assert_not_called()
mock_uow.commit.assert_not_called()
Task 5.3: Integration Testing
Integration Testing with Real Components:
# tests/integration/test_issue_workflow_integration.py
import pytest
from pathlib import Path
from application.issue_application_service import IssueApplicationService
from infrastructure.unit_of_work import SqliteUnitOfWork
from infrastructure.connection_manager import ConnectionManager
from config import MarkitectConfig
class TestIssueWorkflowIntegration:
"""Integration tests for complete issue workflows."""
@pytest.fixture
async def integrated_service(self, test_workspace):
# Create real configuration for testing
config = MarkitectConfig(
gitea_url="http://test-gitea.com",
repo_owner="test",
repo_name="repo",
database_path=str(test_workspace / "test.db"),
workspace_dir=str(test_workspace)
)
connection_manager = ConnectionManager(config)
uow = SqliteUnitOfWork(config.database_path, config.workspace_dir, connection_manager)
# Initialize database schema
await uow.initialize_schema()
yield IssueApplicationService(uow)
await uow.close()
async def test_complete_issue_lifecycle(self, integrated_service, aioresponses):
# Arrange - Mock external API responses
issue_data = {
"number": 123,
"title": "Integration Test Issue",
"state": "open",
"labels": [{"name": "bug"}, {"name": "priority:high"}],
"created_at": "2025-01-01T00:00:00Z",
"updated_at": "2025-01-01T00:00:00Z"
}
aioresponses.get(
"http://test-gitea.com/api/v1/repos/test/repo/issues/123",
payload=issue_data
)
project_info = {"kanban_columns": ["Todo", "In Progress", "Done"]}
aioresponses.get(
"http://test-gitea.com/api/v1/repos/test/repo",
payload=project_info
)
# Act - Get issue details
result = await integrated_service.get_issue_details(123)
# Assert - Verify complete workflow
assert result.issue.number == 123
assert result.issue.title == "Integration Test Issue"
assert result.kanban_column == "Todo" # Default for new issues
assert result.priority_info["level"] == "High"
# Act - Close the issue
closed_issue = await integrated_service.close_issue(123)
# Assert - Verify state change
assert closed_issue.state == IssueState.CLOSED
assert closed_issue.closed_at is not None
Deliverables:
- Comprehensive unit tests for all domain models and services
- Application service tests with proper mocking
- Integration tests with real components
- End-to-end workflow testing
Risk Level: Low (testing activities, no production impact)
Success Criteria and Benefits
Implementation Success Indicators:
Architecture Quality Metrics:
- Domain Purity: Domain models have no infrastructure dependencies
- Testability: >95% unit test coverage for domain layer
- Separation: Clear boundaries between layers with proper dependency direction
- Maintainability: Business logic changes require minimal infrastructure changes
Performance Benefits:
- Faster Testing: Unit tests run in <10 seconds total (no external dependencies)
- Better Isolation: Integration failures don't break business logic tests
- Reduced Coupling: Changes to external APIs don't require business logic changes
Developer Experience:
- Clear Structure: Developers can easily locate business logic vs infrastructure code
- Easy Testing: New features can be test-driven with fast feedback loops
- Flexible Architecture: New requirements can be implemented by changing domain logic only
Business Benefits:
Maintainability:
- Single Responsibility: Each class has one clear purpose
- Business Logic Clarity: Domain rules are explicit and easy to understand
- Change Isolation: Infrastructure changes don't affect business rules
Testability:
- Fast Feedback: Business logic can be tested without external systems
- Comprehensive Coverage: All business scenarios can be easily tested
- Reliable Tests: No flaky tests due to external dependencies
Flexibility:
- Technology Independence: Business logic can work with any infrastructure
- Easy Extensions: New features can be added without changing existing code
- Migration Ready: Infrastructure can be changed without affecting domain logic
This comprehensive domain logic separation gameplan provides a systematic approach to implementing clean architecture principles while maintaining system stability and ensuring that business logic is properly isolated, testable, and maintainable.