feat: implement tddai Python library for TDD workspace management

- Create comprehensive tddai package with workspace, issue fetcher, and test generator modules
- Add Python CLI interface (tddai_cli.py) to replace complex Makefile shell logic
- Update Makefile targets to use Python CLI for better maintainability
- Implement proper behavior-based tests instead of file existence checks
- Add workspace lifecycle management (create, active, finish, cleanup)
- Add issue fetching from Gitea API with error handling
- Add comprehensive test coverage with 19 passing tests
- Support environment variable configuration for different deployments

This addresses issue #11: Setup TDD workspace infrastructure
All tests pass and the system achieves green state before commit.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-09-22 02:04:19 +02:00
parent b03160437e
commit 5155a548eb
13 changed files with 1360 additions and 176 deletions

26
tddai/__init__.py Normal file
View File

@@ -0,0 +1,26 @@
"""
tddai - Test-Driven Development with AI Support
A Python library for managing issue-driven TDD workflows with AI assistance.
Provides workspace management, test generation, and issue integration.
"""
from .workspace import WorkspaceManager, Workspace, WorkspaceStatus
from .issue_fetcher import IssueFetcher, Issue
from .test_generator import TestGenerator
from .exceptions import TddaiError, WorkspaceError, IssueError, ConfigurationError, TestGenerationError
__version__ = "0.1.0"
__all__ = [
"WorkspaceManager",
"Workspace",
"WorkspaceStatus",
"IssueFetcher",
"Issue",
"TestGenerator",
"TddaiError",
"WorkspaceError",
"IssueError",
"ConfigurationError",
"TestGenerationError",
]

92
tddai/config.py Normal file
View File

@@ -0,0 +1,92 @@
"""
Configuration management for tddai.
"""
import os
from pathlib import Path
from typing import Optional
from dataclasses import dataclass
from .exceptions import ConfigurationError
@dataclass
class TddaiConfig:
"""Configuration settings for tddai."""
# Workspace settings
workspace_dir: Path = Path(".markitect_workspace")
current_issue_file: str = "current_issue.json"
# Git repository settings
gitea_url: str = "http://92.205.130.254:32166"
repo_owner: str = "coulomb"
repo_name: str = "markitect_project"
# Test settings
tests_dir: Path = Path("tests")
test_file_pattern: str = "test_issue_{issue_num}_{scenario}.py"
# AI settings
claude_code_command: str = "claude"
@property
def issues_api_url(self) -> str:
"""Get the full issues API URL."""
return f"{self.gitea_url}/api/v1/repos/{self.repo_owner}/{self.repo_name}/issues"
@property
def current_issue_path(self) -> Path:
"""Get the path to current issue file."""
return self.workspace_dir / self.current_issue_file
@classmethod
def from_environment(cls) -> "TddaiConfig":
"""Create config from environment variables."""
config = cls()
# Override with environment variables if present
if gitea_url := os.getenv("TDDAI_GITEA_URL"):
config.gitea_url = gitea_url
if repo_owner := os.getenv("TDDAI_REPO_OWNER"):
config.repo_owner = repo_owner
if repo_name := os.getenv("TDDAI_REPO_NAME"):
config.repo_name = repo_name
if workspace_dir := os.getenv("TDDAI_WORKSPACE_DIR"):
config.workspace_dir = Path(workspace_dir)
return config
def validate(self) -> None:
"""Validate configuration settings."""
if not self.gitea_url:
raise ConfigurationError("gitea_url cannot be empty")
if not self.repo_owner:
raise ConfigurationError("repo_owner cannot be empty")
if not self.repo_name:
raise ConfigurationError("repo_name cannot be empty")
# Global config instance
_config: Optional[TddaiConfig] = None
def get_config() -> TddaiConfig:
"""Get the global configuration instance."""
global _config
if _config is None:
_config = TddaiConfig.from_environment()
_config.validate()
return _config
def set_config(config: TddaiConfig) -> None:
"""Set the global configuration instance."""
global _config
config.validate()
_config = config

28
tddai/exceptions.py Normal file
View File

@@ -0,0 +1,28 @@
"""
Custom exceptions for tddai library.
"""
class TddaiError(Exception):
"""Base exception for all tddai errors."""
pass
class WorkspaceError(TddaiError):
"""Raised when workspace operations fail."""
pass
class IssueError(TddaiError):
"""Raised when issue operations fail."""
pass
class ConfigurationError(TddaiError):
"""Raised when configuration is invalid."""
pass
class TestGenerationError(TddaiError):
"""Raised when test generation fails."""
pass

136
tddai/issue_fetcher.py Normal file
View File

@@ -0,0 +1,136 @@
"""
Issue fetching from Gitea API.
"""
import json
import subprocess
from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional, Dict, Any
from .config import get_config
from .exceptions import IssueError
@dataclass
class Issue:
"""Represents a Gitea issue."""
number: int
title: str
body: str
state: str
created_at: datetime
updated_at: datetime
html_url: str
assignee: Optional[str] = None
labels: List[str] = None
def __post_init__(self):
if self.labels is None:
self.labels = []
class IssueFetcher:
"""Fetches issues from Gitea API."""
def __init__(self, config=None):
self.config = config or get_config()
def fetch_issue(self, issue_number: int) -> Issue:
"""Fetch a specific issue by number."""
try:
result = subprocess.run(
['curl', '-s', f"{self.config.issues_api_url}/{issue_number}"],
capture_output=True,
text=True,
check=True
)
if result.returncode != 0:
raise IssueError(f"Failed to fetch issue #{issue_number}: {result.stderr}")
issue_data = json.loads(result.stdout)
if 'message' in issue_data:
raise IssueError(f"Issue #{issue_number} not found: {issue_data['message']}")
return self._parse_issue(issue_data)
except subprocess.CalledProcessError as e:
raise IssueError(f"Failed to fetch issue #{issue_number}: {e}")
except json.JSONDecodeError as e:
raise IssueError(f"Failed to parse issue data: {e}")
def fetch_issues(self, state: str = "all") -> List[Issue]:
"""Fetch all issues with optional state filter."""
try:
url = self.config.issues_api_url
if state != "all":
url += f"?state={state}"
result = subprocess.run(
['curl', '-s', url],
capture_output=True,
text=True,
check=True
)
if result.returncode != 0:
raise IssueError(f"Failed to fetch issues: {result.stderr}")
issues_data = json.loads(result.stdout)
if isinstance(issues_data, dict) and 'message' in issues_data:
raise IssueError(f"Failed to fetch issues: {issues_data['message']}")
if not isinstance(issues_data, list):
raise IssueError("Invalid response format: expected list of issues")
return [self._parse_issue(issue_data) for issue_data in issues_data]
except subprocess.CalledProcessError as e:
raise IssueError(f"Failed to fetch issues: {e}")
except json.JSONDecodeError as e:
raise IssueError(f"Failed to parse issues data: {e}")
def fetch_open_issues(self) -> List[Issue]:
"""Fetch only open issues."""
return self.fetch_issues(state="open")
def _parse_issue(self, issue_data: Dict[str, Any]) -> Issue:
"""Parse issue data from API response."""
try:
labels = [label['name'] for label in issue_data.get('labels', [])]
assignee = None
if issue_data.get('assignee'):
assignee = issue_data['assignee'].get('login')
return Issue(
number=issue_data['number'],
title=issue_data['title'],
body=issue_data.get('body', ''),
state=issue_data['state'],
created_at=datetime.fromisoformat(issue_data['created_at'].replace('Z', '+00:00')),
updated_at=datetime.fromisoformat(issue_data['updated_at'].replace('Z', '+00:00')),
html_url=issue_data['html_url'],
assignee=assignee,
labels=labels
)
except (KeyError, ValueError) as e:
raise IssueError(f"Failed to parse issue data: {e}")
def get_issue_data_dict(self, issue_number: int) -> Dict[str, Any]:
"""Get issue data as dictionary for workspace creation."""
issue = self.fetch_issue(issue_number)
return {
'number': issue.number,
'title': issue.title,
'body': issue.body,
'state': issue.state,
'created_at': issue.created_at.isoformat(),
'updated_at': issue.updated_at.isoformat(),
'html_url': issue.html_url,
'assignee': {'login': issue.assignee} if issue.assignee else None,
'labels': [{'name': label} for label in issue.labels]
}

163
tddai/test_generator.py Normal file
View File

@@ -0,0 +1,163 @@
"""
Test generation with AI assistance.
"""
import subprocess
import tempfile
from pathlib import Path
from typing import Optional
from .config import get_config
from .workspace import WorkspaceManager
from .exceptions import TestGenerationError
class TestGenerator:
"""Generates tests using AI assistance."""
def __init__(self, config=None):
self.config = config or get_config()
self.workspace_manager = WorkspaceManager(config)
def generate_test(self, scenario_name: str, test_description: str) -> Path:
"""Generate a test file for the current workspace issue."""
workspace = self.workspace_manager.get_current_workspace()
if not workspace:
raise TestGenerationError("No active workspace found")
# Create test file name
test_filename = self.config.test_file_pattern.format(
issue_num=workspace.issue_number,
scenario=scenario_name.lower().replace(' ', '_').replace('-', '_')
)
test_file_path = workspace.tests_dir / test_filename
# Generate test prompt
prompt = self._create_test_prompt(workspace, scenario_name, test_description)
# Use Claude Code to generate the test
try:
with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f:
f.write(prompt)
prompt_file = Path(f.name)
result = subprocess.run(
[self.config.claude_code_command, '--file', str(prompt_file)],
cwd=workspace.workspace_dir,
capture_output=True,
text=True
)
prompt_file.unlink() # Clean up temp file
if result.returncode != 0:
raise TestGenerationError(f"Claude Code failed: {result.stderr}")
# Extract Python code from Claude's response
test_content = self._extract_test_code(result.stdout)
# Write test file
test_file_path.write_text(test_content)
# Update test plan
self._update_test_plan(workspace, scenario_name, test_filename)
return test_file_path
except subprocess.CalledProcessError as e:
raise TestGenerationError(f"Failed to generate test: {e}")
except Exception as e:
raise TestGenerationError(f"Test generation error: {e}")
def _create_test_prompt(self, workspace, scenario_name: str, test_description: str) -> str:
"""Create prompt for Claude Code to generate test."""
return f"""# Test Generation Request
## Context
- Issue #{workspace.issue_number}: {workspace.issue_title}
- Scenario: {scenario_name}
## Issue Description
{workspace.issue_body}
## Test Requirements
{test_description}
## Instructions
Please generate a comprehensive Python test file that:
1. Tests the behavior described in the scenario
2. Follows pytest conventions
3. Includes proper docstrings and comments
4. Tests both positive and negative cases
5. Uses meaningful test method names
6. Includes appropriate assertions
The test should focus on behavior verification rather than implementation details.
## Expected Output
Please provide only the Python test code without any additional explanation.
The code should be ready to save as `{self.config.test_file_pattern.format(issue_num=workspace.issue_number, scenario=scenario_name.lower().replace(' ', '_'))}`
"""
def _extract_test_code(self, claude_response: str) -> str:
"""Extract Python test code from Claude's response."""
lines = claude_response.split('\n')
code_lines = []
in_code_block = False
for line in lines:
if line.strip().startswith('```python'):
in_code_block = True
continue
elif line.strip() == '```' and in_code_block:
break
elif in_code_block:
code_lines.append(line)
if not code_lines:
# If no code block found, assume entire response is code
return claude_response.strip()
return '\n'.join(code_lines)
def _update_test_plan(self, workspace, scenario_name: str, test_filename: str) -> None:
"""Update the test plan with the new test."""
test_plan_content = workspace.test_plan_file.read_text()
# Add test to the generated tests section
new_entry = f"- [x] {scenario_name} (`{test_filename}`)"
if "### Generated Tests" in test_plan_content:
# Add to existing generated tests section
lines = test_plan_content.split('\n')
for i, line in enumerate(lines):
if line.strip() == "Tests generated for this workspace will be listed here as they are created.":
lines[i] = new_entry
break
elif line.startswith("- [") and "Generated Tests" in lines[max(0, i-5):i]:
lines.insert(i, new_entry)
break
else:
# Add at the end of generated tests section
for i, line in enumerate(lines):
if "### Generated Tests" in line:
# Find next section or end
j = i + 1
while j < len(lines) and not lines[j].startswith('##'):
j += 1
lines.insert(j, new_entry)
break
workspace.test_plan_file.write_text('\n'.join(lines))
def list_generated_tests(self) -> list:
"""List all generated tests for the current workspace."""
workspace = self.workspace_manager.get_current_workspace()
if not workspace:
return []
if not workspace.tests_dir.exists():
return []
return list(workspace.tests_dir.glob("*.py"))

219
tddai/workspace.py Normal file
View File

@@ -0,0 +1,219 @@
"""
Workspace management for tddai.
"""
import json
import shutil
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Optional, Dict, Any
from .config import get_config
from .exceptions import WorkspaceError
class WorkspaceStatus(Enum):
"""Status of workspace."""
CLEAN = "clean"
ACTIVE = "active"
DIRTY = "dirty"
@dataclass
class Workspace:
"""Represents a TDD workspace for an issue."""
issue_number: int
issue_title: str
issue_body: str
issue_state: str
created_at: datetime
workspace_dir: Path
@property
def issue_dir(self) -> Path:
"""Get the issue-specific directory."""
return self.workspace_dir / f"issue_{self.issue_number}"
@property
def tests_dir(self) -> Path:
"""Get the tests directory for this issue."""
return self.issue_dir / "tests"
@property
def requirements_file(self) -> Path:
"""Get the requirements file path."""
return self.issue_dir / "requirements.md"
@property
def test_plan_file(self) -> Path:
"""Get the test plan file path."""
return self.issue_dir / "test_plan.md"
class WorkspaceManager:
"""Manages TDD workspaces for issues."""
def __init__(self, config=None):
self.config = config or get_config()
def get_status(self) -> WorkspaceStatus:
"""Get current workspace status."""
if not self.config.workspace_dir.exists():
return WorkspaceStatus.CLEAN
if not self.config.current_issue_path.exists():
return WorkspaceStatus.DIRTY
return WorkspaceStatus.ACTIVE
def get_current_workspace(self) -> Optional[Workspace]:
"""Get the currently active workspace."""
if not self.config.current_issue_path.exists():
return None
try:
with open(self.config.current_issue_path, 'r') as f:
issue_data = json.load(f)
return Workspace(
issue_number=issue_data['number'],
issue_title=issue_data['title'],
issue_body=issue_data['body'],
issue_state=issue_data['state'],
created_at=datetime.fromisoformat(issue_data['created_at']),
workspace_dir=self.config.workspace_dir
)
except (json.JSONDecodeError, KeyError, ValueError) as e:
raise WorkspaceError(f"Failed to load current workspace: {e}")
def create_workspace(self, issue_data: Dict[str, Any]) -> Workspace:
"""Create a new workspace for an issue."""
status = self.get_status()
if status == WorkspaceStatus.ACTIVE:
current = self.get_current_workspace()
raise WorkspaceError(
f"Workspace already active for issue #{current.issue_number}. "
"Finish current workspace before starting a new one."
)
# Clean up any dirty workspace
if status == WorkspaceStatus.DIRTY:
self.cleanup_workspace()
# Create workspace structure
workspace = Workspace(
issue_number=issue_data['number'],
issue_title=issue_data['title'],
issue_body=issue_data['body'],
issue_state=issue_data['state'],
created_at=datetime.now(),
workspace_dir=self.config.workspace_dir
)
# Create directories
workspace.workspace_dir.mkdir(exist_ok=True)
workspace.issue_dir.mkdir(exist_ok=True)
workspace.tests_dir.mkdir(exist_ok=True)
# Create metadata files
self._create_requirements_file(workspace, issue_data)
self._create_test_plan_file(workspace, issue_data)
self._save_current_issue(workspace, issue_data)
return workspace
def cleanup_workspace(self) -> None:
"""Clean up the current workspace."""
if self.config.workspace_dir.exists():
shutil.rmtree(self.config.workspace_dir)
def finish_workspace(self) -> Optional[Workspace]:
"""Finish the current workspace and integrate tests."""
workspace = self.get_current_workspace()
if not workspace:
return None
# Move tests to main tests directory
main_tests_dir = self.config.tests_dir
main_tests_dir.mkdir(exist_ok=True)
if workspace.tests_dir.exists():
for test_file in workspace.tests_dir.glob("*.py"):
dest_file = main_tests_dir / test_file.name
shutil.copy2(test_file, dest_file)
# Clean up workspace
self.cleanup_workspace()
return workspace
def _create_requirements_file(self, workspace: Workspace, issue_data: Dict[str, Any]) -> None:
"""Create requirements.md file for the issue."""
content = f"""# Requirements for Issue #{workspace.issue_number}
## Title
{workspace.issue_title}
## Description
{workspace.issue_body}
## Acceptance Criteria
- [ ] Implementation meets the requirements described above
- [ ] All tests pass
- [ ] Code follows project conventions
- [ ] Documentation is updated if needed
## Notes
Created: {workspace.created_at.strftime('%Y-%m-%d %H:%M:%S')}
"""
workspace.requirements_file.write_text(content)
def _create_test_plan_file(self, workspace: Workspace, issue_data: Dict[str, Any]) -> None:
"""Create test_plan.md file for the issue."""
content = f"""# Test Plan for Issue #{workspace.issue_number}
## Overview
This test plan outlines the testing strategy for implementing: {workspace.issue_title}
## Test Categories
### Unit Tests
- [ ] Core functionality tests
- [ ] Edge case handling
- [ ] Error condition tests
### Integration Tests
- [ ] Component integration
- [ ] API integration
- [ ] End-to-end scenarios
### Generated Tests
Tests generated for this workspace will be listed here as they are created.
## Test Execution
Run tests with: `pytest tests/test_issue_{workspace.issue_number}_*.py`
## Notes
- Follow TDD red-green-refactor cycle
- Each test should be focused and specific
- Tests should be readable and maintainable
"""
workspace.test_plan_file.write_text(content)
def _save_current_issue(self, workspace: Workspace, issue_data: Dict[str, Any]) -> None:
"""Save current issue metadata."""
current_issue_data = {
'number': workspace.issue_number,
'title': workspace.issue_title,
'body': workspace.issue_body,
'state': workspace.issue_state,
'created_at': workspace.created_at.isoformat(),
'url': issue_data.get('html_url', ''),
'assignee': issue_data.get('assignee', {}).get('login', '') if issue_data.get('assignee') else ''
}
with open(self.config.current_issue_path, 'w') as f:
json.dump(current_issue_data, f, indent=2)