""" Workspace management for tddai. """ import json import shutil from dataclasses import dataclass from datetime import datetime from enum import Enum from pathlib import Path from typing import Optional, Dict, Any from .config import get_config from .exceptions import WorkspaceError class WorkspaceStatus(Enum): """Status of workspace.""" CLEAN = "clean" ACTIVE = "active" DIRTY = "dirty" @dataclass class Workspace: """Represents a TDD workspace for an issue.""" issue_number: int issue_title: str issue_body: str issue_state: str created_at: datetime workspace_dir: Path @property def issue_dir(self) -> Path: """Get the issue-specific directory.""" return self.workspace_dir / f"issue_{self.issue_number}" @property def tests_dir(self) -> Path: """Get the tests directory for this issue.""" return self.issue_dir / "tests" @property def requirements_file(self) -> Path: """Get the requirements file path.""" return self.issue_dir / "requirements.md" @property def test_plan_file(self) -> Path: """Get the test plan file path.""" return self.issue_dir / "test_plan.md" class WorkspaceManager: """Manages TDD workspaces for issues.""" def __init__(self, config=None): self.config = config or get_config() def get_status(self) -> WorkspaceStatus: """Get current workspace status.""" if not self.config.workspace_dir.exists(): return WorkspaceStatus.CLEAN if not self.config.current_issue_path.exists(): return WorkspaceStatus.DIRTY return WorkspaceStatus.ACTIVE def get_current_workspace(self) -> Optional[Workspace]: """Get the currently active workspace.""" if not self.config.current_issue_path.exists(): return None try: with open(self.config.current_issue_path, 'r') as f: issue_data = json.load(f) return Workspace( issue_number=issue_data['number'], issue_title=issue_data['title'], issue_body=issue_data['body'], issue_state=issue_data['state'], created_at=datetime.strptime(issue_data['created_at'].replace('Z', '').split('.')[0], '%Y-%m-%dT%H:%M:%S'), workspace_dir=self.config.workspace_dir ) except (json.JSONDecodeError, KeyError, ValueError) as e: raise WorkspaceError(f"Failed to load current workspace: {e}") def create_workspace(self, issue_data: Dict[str, Any]) -> Workspace: """Create a new workspace for an issue.""" status = self.get_status() if status == WorkspaceStatus.ACTIVE: current = self.get_current_workspace() raise WorkspaceError( f"Workspace already active for issue #{current.issue_number}. " "Finish current workspace before starting a new one." ) # Clean up any dirty workspace if status == WorkspaceStatus.DIRTY: self.cleanup_workspace() # Create workspace structure workspace = Workspace( issue_number=issue_data['number'], issue_title=issue_data['title'], issue_body=issue_data['body'], issue_state=issue_data['state'], created_at=datetime.now(), workspace_dir=self.config.workspace_dir ) # Create directories workspace.workspace_dir.mkdir(exist_ok=True) workspace.issue_dir.mkdir(exist_ok=True) workspace.tests_dir.mkdir(exist_ok=True) # Create metadata files self._create_requirements_file(workspace, issue_data) self._create_test_plan_file(workspace, issue_data) self._save_current_issue(workspace, issue_data) return workspace def cleanup_workspace(self) -> None: """Clean up the current workspace.""" if self.config.workspace_dir.exists(): shutil.rmtree(self.config.workspace_dir) def finish_workspace(self) -> Optional[Workspace]: """Finish the current workspace and integrate tests.""" workspace = self.get_current_workspace() if not workspace: return None # Move tests to main tests directory main_tests_dir = self.config.tests_dir main_tests_dir.mkdir(exist_ok=True) if workspace.tests_dir.exists(): for test_file in workspace.tests_dir.glob("*.py"): dest_file = main_tests_dir / test_file.name shutil.copy2(test_file, dest_file) # Clean up workspace self.cleanup_workspace() return workspace def _create_requirements_file(self, workspace: Workspace, issue_data: Dict[str, Any]) -> None: """Create requirements.md file for the issue.""" content = f"""# Requirements for Issue #{workspace.issue_number} ## Title {workspace.issue_title} ## Description {workspace.issue_body} ## Acceptance Criteria - [ ] Implementation meets the requirements described above - [ ] All tests pass - [ ] Code follows project conventions - [ ] Documentation is updated if needed ## Notes Created: {workspace.created_at.strftime('%Y-%m-%d %H:%M:%S')} """ workspace.requirements_file.write_text(content) def _create_test_plan_file(self, workspace: Workspace, issue_data: Dict[str, Any]) -> None: """Create test_plan.md file for the issue.""" content = f"""# Test Plan for Issue #{workspace.issue_number} ## Overview This test plan outlines the testing strategy for implementing: {workspace.issue_title} ## Test Categories ### Unit Tests - [ ] Core functionality tests - [ ] Edge case handling - [ ] Error condition tests ### Integration Tests - [ ] Component integration - [ ] API integration - [ ] End-to-end scenarios ### Generated Tests Tests generated for this workspace will be listed here as they are created. ## Test Execution Run tests with: `pytest tests/test_issue_{workspace.issue_number}_*.py` ## Notes - Follow TDD red-green-refactor cycle - Each test should be focused and specific - Tests should be readable and maintainable """ workspace.test_plan_file.write_text(content) def _save_current_issue(self, workspace: Workspace, issue_data: Dict[str, Any]) -> None: """Save current issue metadata.""" current_issue_data = { 'number': workspace.issue_number, 'title': workspace.issue_title, 'body': workspace.issue_body, 'state': workspace.issue_state, 'created_at': workspace.created_at.isoformat(), 'url': issue_data.get('html_url', ''), 'assignee': issue_data.get('assignee', {}).get('login', '') if issue_data.get('assignee') else '' } with open(self.config.current_issue_path, 'w') as f: json.dump(current_issue_data, f, indent=2) def add_test_to_workspace(self, test_filename: str, test_content: str) -> None: """Add a test file to the current workspace.""" workspace = self.get_current_workspace() if not workspace: raise WorkspaceError("No active workspace. Create a workspace first.") test_file_path = workspace.tests_dir / test_filename # Ensure tests directory exists workspace.tests_dir.mkdir(parents=True, exist_ok=True) # Write test content to file with open(test_file_path, 'w') as f: f.write(test_content) def get_workspace_status(self) -> WorkspaceStatus: """Alias for get_status() for API compatibility.""" return self.get_status()