From 51aea5effbcc3a15587e5dce4b78efded9907291 Mon Sep 17 00:00:00 2001 From: tegwick Date: Sat, 25 Oct 2025 00:54:20 +0200 Subject: [PATCH] init: first extract of implementation --- README.md | 363 ++++++++++++++++++++-- __init__.py | 24 ++ backends/__init__.py | 11 + backends/gitea/__init__.py | 17 + backends/gitea/backend.py | 591 +++++++++++++++++++++++++++++++++++ backends/local/__init__.py | 19 ++ backends/local/backend.py | 618 +++++++++++++++++++++++++++++++++++++ backends/local/schema.sql | 189 ++++++++++++ cli/__init__.py | 20 ++ cli/backend_commands.py | 141 +++++++++ cli/commands.py | 417 +++++++++++++++++++++++++ cli/main.py | 117 +++++++ cli/sync_commands.py | 235 ++++++++++++++ cli/utils.py | 336 ++++++++++++++++++++ core/interfaces.py | 407 ++++++++++++++++++++++++ core/models.py | 341 ++++++++++++++++++++ core/repository.py | 454 +++++++++++++++++++++++++++ pyproject.toml | 164 ++++++++++ 18 files changed, 4441 insertions(+), 23 deletions(-) create mode 100644 __init__.py create mode 100644 backends/__init__.py create mode 100644 backends/gitea/__init__.py create mode 100644 backends/gitea/backend.py create mode 100644 backends/local/__init__.py create mode 100644 backends/local/backend.py create mode 100644 backends/local/schema.sql create mode 100644 cli/__init__.py create mode 100644 cli/backend_commands.py create mode 100644 cli/commands.py create mode 100644 cli/main.py create mode 100644 cli/sync_commands.py create mode 100644 cli/utils.py create mode 100644 core/interfaces.py create mode 100644 core/models.py create mode 100644 core/repository.py create mode 100644 pyproject.toml diff --git a/README.md b/README.md index 3ba6b1f..bdcb19a 100644 --- a/README.md +++ b/README.md @@ -1,23 +1,340 @@ -# Issue Facade - Universal CLI for Issue Tracking - -A convenient command-line facade that provides a unified interface to the repository's main issue tracker, regardless of which backend system is actually being used. - -## Purpose - -The **Issue Facade** acts as a convenient CLI wrapper that automatically detects and interfaces with whatever issue tracking system is configured for the current repository. This means you get a consistent, intuitive command-line experience whether your project uses: - -- GitHub Issues -- GitLab Issues -- Gitea Issues -- JIRA -- Local SQLite storage -- Any other supported backend - -## Philosophy - -Rather than learning different commands and workflows for each issue tracking system, the Issue Facade provides: - -- **One CLI to rule them all**: Same commands work across all backends -- **Repository-aware**: Automatically detects the relevant issue tracker for your repo -- **Offline capability**: Local SQLite fallback when remote systems are unavailable -- **Seamless sync**: Keep local and remote issue trackers synchronized \ No newline at end of file +# Issue Facade - Universal CLI for Issue Tracking + +A convenient command-line facade that provides a unified interface to the repository's main issue tracker, regardless of which backend system is actually being used. + +## Purpose + +The **Issue Facade** acts as a convenient CLI wrapper that automatically detects and interfaces with whatever issue tracking system is configured for the current repository. This means you get a consistent, intuitive command-line experience whether your project uses: + +- GitHub Issues +- GitLab Issues +- Gitea Issues +- JIRA +- Local SQLite storage +- Any other supported backend + +## Philosophy + +Rather than learning different commands and workflows for each issue tracking system, the Issue Facade provides: + +- **One CLI to rule them all**: Same commands work across all backends +- **Repository-aware**: Automatically detects the relevant issue tracker for your repo +- **Offline capability**: Local SQLite fallback when remote systems are unavailable +- **Seamless sync**: Keep local and remote issue trackers synchronized + +## How It Works + +```bash +# The facade automatically detects your repository's issue tracker +cd /path/to/my-project +issue list # Lists issues from the repo's configured tracker + +cd /path/to/another-project +issue list # Lists issues from THIS repo's tracker + +# Same commands, different backends - transparent to the user +``` + +## Key Features + +### 🎯 **Repository Context Awareness** +The facade automatically detects: +- Git remotes (GitHub, GitLab, Gitea URLs) +- Configuration files (`.issue-config`, `pyproject.toml`, etc.) +- Environment variables +- Default fallbacks + +### 🖥️ **Unified CLI Experience** +```bash +# Core issue operations (same across all backends) +issue list # List issues +issue show 42 # Show issue details +issue create "Bug in parser" # Create new issue +issue edit 42 --add-label bug # Edit existing issue +issue close 42 --comment "Fixed" # Close with comment +issue comment 42 "Still broken" # Add comment + +# Advanced operations +issue list --assignee=me --state=open +issue search "memory leak" +issue stats # Show issue statistics +``` + +### 🔄 **Automatic Backend Detection** +```bash +# GitHub repository +cd my-github-project +issue list # → Automatically uses GitHub Issues API + +# Gitea repository +cd my-gitea-project +issue list # → Automatically uses Gitea Issues API + +# Offline/local work +cd any-project +issue backend set local +issue list # → Uses local SQLite storage +``` + +### 🌐 **Seamless Synchronization** +```bash +# Work offline, sync later +issue create "Bug found offline" --local +issue sync # Pushes to remote when online + +# Keep backups +issue sync pull # Download all remote issues locally +issue export backup.json # Export for archival +``` + +## Installation & Setup + +### 1. Install the Facade +```bash +pip install issue-facade +``` + +### 2. Automatic Configuration +The facade auto-detects your repository's issue tracker: + +```bash +cd your-repository +issue config detect # Auto-configure based on git remotes +issue list # Ready to use! +``` + +### 3. Manual Configuration (if needed) +```bash +# Configure specific backends +issue backend add github my-repo +issue backend add gitea company-repo +issue backend add local offline + +# Set repository-specific backend +issue config set-backend github # For current repository +``` + +## Repository Integration Examples + +### GitHub Repository +```bash +cd my-github-project +issue config detect +# → Automatically configures GitHub Issues via API +# → Uses .github/ISSUE_TEMPLATE/ for templates +# → Respects repository labels and milestones + +issue create "Security vulnerability" --template security +``` + +### Corporate Gitea +```bash +cd company-project +issue config detect +# → Detects Gitea instance from git remote +# → Prompts for access token (one-time setup) +# → Uses corporate labels and workflows + +issue list --milestone "Q4 Release" +``` + +### Offline Development +```bash +cd any-project +issue backend set local +# → Creates local SQLite database in .issue-facade/ +# → Full offline functionality +# → Sync to remote when connection available + +issue create "Performance issue" --offline +issue sync when-online +``` + +## Configuration + +### Per-Repository Configuration +Each repository can have its own configuration in `.issue-facade/config.json`: + +```json +{ + "backend": "github", + "github": { + "owner": "myorg", + "repo": "myproject", + "token_env": "GITHUB_TOKEN" + }, + "local": { + "db_path": ".issue-facade/issues.db", + "sync_enabled": true + }, + "templates": { + "bug": ".github/ISSUE_TEMPLATE/bug_report.md", + "feature": ".github/ISSUE_TEMPLATE/feature_request.md" + } +} +``` + +### Global Configuration +User-wide settings in `~/.config/issue-facade/config.json`: + +```json +{ + "default_backend": "local", + "github_token": "env:GITHUB_TOKEN", + "gitea_instances": { + "company": { + "url": "https://git.company.com", + "token": "env:GITEA_TOKEN" + } + }, + "offline_mode": false, + "sync_interval": "1h" +} +``` + +## Architecture + +### Facade Pattern Implementation +``` +Repository Working Directory +├── .git/ # Git repository +├── .issue-facade/ # Facade configuration +│ ├── config.json # Repository-specific config +│ ├── issues.db # Local SQLite cache/backup +│ └── templates/ # Issue templates +├── your-project-files... +└── issue # CLI command (context-aware) +``` + +### Backend Detection Logic +1. **Check repository configuration**: `.issue-facade/config.json` +2. **Analyze git remotes**: Detect GitHub/GitLab/Gitea URLs +3. **Look for platform files**: `.github/`, `.gitlab/`, etc. +4. **Check environment**: `GITHUB_TOKEN`, `GITLAB_TOKEN`, etc. +5. **Fall back to local**: SQLite storage if no remote detected + +### Multi-Repository Support +```bash +# Each repository maintains its own context +/projects/web-app/ → GitHub Issues +/projects/api-server/ → GitLab Issues +/projects/cli-tool/ → Gitea Issues +/projects/experiment/ → Local SQLite + +# Same commands work in all contexts +cd web-app && issue list # GitHub +cd api-server && issue list # GitLab +cd cli-tool && issue list # Gitea +cd experiment && issue list # Local +``` + +## Use Cases + +### 1. **Multi-Platform Developer** +You work with repositories across GitHub, GitLab, and company Gitea: +```bash +# Learn one CLI, use everywhere +issue list --assignee=me # Works on all platforms +issue create "Cross-platform bug" --label bug +``` + +### 2. **Offline Developer** +You need to track issues without constant internet: +```bash +issue create "Found while flying" --offline +issue list --local # View offline issues +issue sync # Upload when back online +``` + +### 3. **Repository Migration** +Moving from GitHub to GitLab: +```bash +issue export github-backup.json # Backup from GitHub +issue backend set gitlab # Switch to GitLab +issue import github-backup.json # Import to GitLab +``` + +### 4. **Cross-Repository Analytics** +Track issues across multiple repositories: +```bash +issue stats --all-repos # Statistics across all configured repos +issue search "security" --global # Search across all issue trackers +``` + +## Integration with Development Workflow + +### Git Hooks Integration +```bash +# .git/hooks/pre-commit +issue list --assignee=me --state=open > ISSUES.md +git add ISSUES.md +``` + +### CI/CD Integration +```bash +# In your CI pipeline +issue create "Build failed on commit $SHA" --label ci-failure +issue close-if-fixed $ISSUE_NUMBER +``` + +### IDE Integration +```bash +# VS Code, Vim, Emacs plugins can use the CLI +:IssueList # List issues in editor +:IssueCreate "Typo in function" # Create issue from editor +``` + +## Comparison with Native Tools + +| Feature | issue-facade | gh (GitHub CLI) | glab (GitLab CLI) | Platform Web UI | +|---------|--------------|-----------------|-------------------|-----------------| +| **Multi-platform** | ✅ All backends | ❌ GitHub only | ❌ GitLab only | ❌ Single platform | +| **Offline support** | ✅ Local SQLite | ❌ Online only | ❌ Online only | ❌ Online only | +| **Consistent CLI** | ✅ Same commands | ❌ GitHub-specific | ❌ GitLab-specific | ❌ Web interface | +| **Repository context** | ✅ Auto-detect | ✅ Git-aware | ✅ Git-aware | ❌ Manual navigation | +| **Cross-repo search** | ✅ Global search | ❌ Single repo | ❌ Single repo | ❌ Single repo | +| **Data portability** | ✅ Export/import | ❌ Platform-locked | ❌ Platform-locked | ❌ Platform-locked | + +## Future Roadmap + +### Version 1.0 (Current) +- [x] Core facade architecture +- [x] GitHub/GitLab/Gitea backend support +- [x] Local SQLite backend +- [x] Automatic repository detection +- [x] Basic synchronization + +### Version 1.1 +- [ ] Advanced sync (conflict resolution) +- [ ] Issue templates support +- [ ] Workflow automation hooks +- [ ] Plugin system for custom backends + +### Version 2.0 +- [ ] Web dashboard for multi-repo overview +- [ ] Advanced analytics and reporting +- [ ] Team collaboration features +- [ ] Integration with project management tools + +## Contributing + +The Issue Facade is designed to be: +- **Backend-agnostic**: Easy to add new issue tracking systems +- **Repository-aware**: Respects the conventions of each platform +- **Developer-friendly**: Consistent CLI across all environments + +To add a new backend: +1. Implement the `IssueBackend` interface +2. Add detection logic for the platform +3. Register the backend in the factory +4. Add CLI configuration support + +## Why "Facade"? + +The **Facade Pattern** perfectly describes this tool's purpose: + +> *"Provide a unified interface to a set of interfaces in a subsystem. Facade defines a higher-level interface that makes the subsystem easier to use."* + +Instead of learning different CLIs for GitHub (`gh`), GitLab (`glab`), JIRA, etc., the Issue Facade provides one consistent interface that works with all of them. It's the universal remote control for issue tracking systems. + +The facade doesn't replace the underlying issue trackers - it makes them easier to use consistently across different platforms and repositories. \ No newline at end of file diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..cab4613 --- /dev/null +++ b/__init__.py @@ -0,0 +1,24 @@ +""" +Universal Issue Tracking System + +A backend-agnostic issue tracking system that supports multiple backends +through a plugin architecture. Designed to be extracted into a standalone +repository for use across multiple projects. + +Features: +- Unified issue model across all backends +- Plugin-based backend architecture +- Local SQLite backend for offline work +- Bidirectional synchronization +- CLI-first interface +- Support for GitHub-style and other issue tracking systems + +Supported Backends: +- Local SQLite (for offline/standalone use) +- Gitea (GitHub-compatible API) +- Future: GitHub, GitLab, JIRA, Redmine, etc. +""" + +__version__ = "0.1.0" +__author__ = "MarkiTect Project" +__description__ = "Universal Issue Tracking System with Plugin Architecture" \ No newline at end of file diff --git a/backends/__init__.py b/backends/__init__.py new file mode 100644 index 0000000..429769c --- /dev/null +++ b/backends/__init__.py @@ -0,0 +1,11 @@ +""" +Issue Tracking Backend Plugins + +This package contains implementations for various issue tracking backends. +Each backend implements the IssueBackend interface to provide a consistent +API regardless of the underlying issue tracking system. + +Available Backends: +- local: SQLite-based local backend for offline use +- gitea: Gitea API backend for GitHub-compatible systems +""" \ No newline at end of file diff --git a/backends/gitea/__init__.py b/backends/gitea/__init__.py new file mode 100644 index 0000000..c02af71 --- /dev/null +++ b/backends/gitea/__init__.py @@ -0,0 +1,17 @@ +""" +Gitea Backend + +A backend implementation for Gitea issue tracking systems. +This backend provides integration with Gitea API for remote issue management. + +Features: +- Full Gitea API integration +- GitHub-compatible operations +- Remote synchronization +- Authentication support +- Rate limiting compliance +""" + +from .backend import GiteaBackend + +__all__ = ['GiteaBackend'] \ No newline at end of file diff --git a/backends/gitea/backend.py b/backends/gitea/backend.py new file mode 100644 index 0000000..c7ac47f --- /dev/null +++ b/backends/gitea/backend.py @@ -0,0 +1,591 @@ +""" +Gitea Backend Implementation + +Provides integration with Gitea API for remote issue tracking. +This backend adapts the Gitea API to our unified issue model. +""" + +import requests +import time +from datetime import datetime, timezone +from typing import List, Optional, Dict, Any +from urllib.parse import urljoin + +from ...core.interfaces import RemoteBackend, BackendCapabilities, IssueFilter, SyncableBackend +from ...core.models import Issue, Label, User, Milestone, Comment, IssueState, Priority, IssueType + + +class GiteaAPIError(Exception): + """Gitea API specific errors.""" + pass + + +class GiteaRateLimitError(GiteaAPIError): + """Rate limit exceeded.""" + pass + + +class GiteaBackend(RemoteBackend, SyncableBackend): + """Gitea API backend for remote issue tracking.""" + + def __init__(self): + self.base_url: Optional[str] = None + self.token: Optional[str] = None + self.owner: Optional[str] = None + self.repo: Optional[str] = None + self.session = requests.Session() + self._capabilities = BackendCapabilities( + supports_milestones=True, + supports_assignees=True, + supports_comments=True, + supports_labels=True, + supports_search=True, + supports_bulk_operations=False, + supports_webhooks=True, + supports_real_time=False, + max_labels_per_issue=None, + max_assignees_per_issue=10 # Gitea typical limit + ) + + @property + def backend_type(self) -> str: + return "gitea" + + @property + def capabilities(self) -> BackendCapabilities: + return self._capabilities + + def connect(self, config: Dict[str, Any]) -> None: + """Connect to Gitea API.""" + self.base_url = config['base_url'].rstrip('/') + self.token = config['token'] + self.owner = config['owner'] + self.repo = config['repo'] + + # Setup session with authentication + self.session.headers.update({ + 'Authorization': f'token {self.token}', + 'Content-Type': 'application/json', + 'Accept': 'application/json' + }) + + # Test connection + if not self.test_connection(): + raise GiteaAPIError("Failed to connect to Gitea API") + + def disconnect(self) -> None: + """Disconnect from Gitea API.""" + self.session.close() + self.base_url = None + self.token = None + self.owner = None + self.repo = None + + def test_connection(self) -> bool: + """Test Gitea API connection.""" + try: + response = self._api_request('GET', f'/repos/{self.owner}/{self.repo}') + return response.status_code == 200 + except Exception: + return False + + def _api_request(self, method: str, endpoint: str, data: Optional[Dict] = None, params: Optional[Dict] = None) -> requests.Response: + """Make API request with error handling and rate limiting.""" + url = urljoin(f"{self.base_url}/api/v1", endpoint) + + try: + response = self.session.request(method, url, json=data, params=params) + + # Handle rate limiting + if response.status_code == 429: + retry_after = int(response.headers.get('Retry-After', 60)) + raise GiteaRateLimitError(f"Rate limit exceeded. Retry after {retry_after} seconds") + + response.raise_for_status() + return response + + except requests.exceptions.RequestException as e: + raise GiteaAPIError(f"API request failed: {e}") + + def _gitea_issue_to_unified(self, gitea_issue: Dict[str, Any]) -> Issue: + """Convert Gitea issue JSON to unified Issue model.""" + # Convert labels + labels = [] + for label_data in gitea_issue.get('labels', []): + labels.append(Label( + name=label_data['name'], + color=label_data['color'], + description=label_data.get('description', ''), + backend_id=str(label_data['id']) + )) + + # Convert assignees + assignees = [] + if gitea_issue.get('assignees'): + for assignee_data in gitea_issue['assignees']: + assignees.append(User( + id=str(assignee_data['id']), + username=assignee_data['login'], + display_name=assignee_data.get('full_name', ''), + email=assignee_data.get('email', ''), + avatar_url=assignee_data.get('avatar_url', ''), + backend_id=str(assignee_data['id']) + )) + + # Convert milestone + milestone = None + if gitea_issue.get('milestone'): + milestone_data = gitea_issue['milestone'] + milestone = Milestone( + id=str(milestone_data['id']), + title=milestone_data['title'], + description=milestone_data.get('description', ''), + state=milestone_data['state'], + due_date=datetime.fromisoformat(milestone_data['due_on'].replace('Z', '+00:00')) if milestone_data.get('due_on') else None, + created_at=datetime.fromisoformat(milestone_data['created_at'].replace('Z', '+00:00')) if milestone_data.get('created_at') else None, + updated_at=datetime.fromisoformat(milestone_data['updated_at'].replace('Z', '+00:00')) if milestone_data.get('updated_at') else None, + backend_id=str(milestone_data['id']) + ) + + # Determine state + if gitea_issue['state'] == 'closed': + state = IssueState.CLOSED + else: + # Check for status labels to determine more specific state + for label in labels: + if label.name == 'status:in_progress' or label.name == 'status:in-progress': + state = IssueState.IN_PROGRESS + break + elif label.name == 'status:blocked': + state = IssueState.BLOCKED + break + else: + state = IssueState.OPEN + + return Issue( + id=str(gitea_issue['id']), + number=gitea_issue['number'], + title=gitea_issue['title'], + description=gitea_issue.get('body', ''), + state=state, + created_at=datetime.fromisoformat(gitea_issue['created_at'].replace('Z', '+00:00')), + updated_at=datetime.fromisoformat(gitea_issue['updated_at'].replace('Z', '+00:00')), + closed_at=datetime.fromisoformat(gitea_issue['closed_at'].replace('Z', '+00:00')) if gitea_issue.get('closed_at') else None, + labels=labels, + assignees=assignees, + milestone=milestone, + backend_id=str(gitea_issue['id']), + backend_type='gitea' + ) + + def _unified_issue_to_gitea(self, issue: Issue) -> Dict[str, Any]: + """Convert unified Issue to Gitea API format.""" + data = { + 'title': issue.title, + 'body': issue.description, + 'state': 'closed' if issue.state == IssueState.CLOSED else 'open' + } + + if issue.assignees: + data['assignees'] = [assignee.username for assignee in issue.assignees] + + if issue.milestone: + data['milestone'] = int(issue.milestone.backend_id) if issue.milestone.backend_id else None + + # Convert labels + if issue.labels: + data['labels'] = [label.name for label in issue.labels] + + return data + + # Issue CRUD Operations + def create_issue(self, issue: Issue) -> Issue: + """Create issue in Gitea.""" + data = self._unified_issue_to_gitea(issue) + + response = self._api_request('POST', f'/repos/{self.owner}/{self.repo}/issues', data=data) + gitea_issue = response.json() + + return self._gitea_issue_to_unified(gitea_issue) + + def get_issue(self, issue_id: str) -> Optional[Issue]: + """Get issue from Gitea by ID.""" + try: + response = self._api_request('GET', f'/repos/{self.owner}/{self.repo}/issues/{issue_id}') + gitea_issue = response.json() + return self._gitea_issue_to_unified(gitea_issue) + except GiteaAPIError: + return None + + def get_issue_by_number(self, number: int) -> Optional[Issue]: + """Get issue by number.""" + return self.get_issue(str(number)) + + def update_issue(self, issue: Issue) -> Issue: + """Update issue in Gitea.""" + data = self._unified_issue_to_gitea(issue) + + response = self._api_request('PATCH', f'/repos/{self.owner}/{self.repo}/issues/{issue.backend_id}', data=data) + gitea_issue = response.json() + + return self._gitea_issue_to_unified(gitea_issue) + + def delete_issue(self, issue_id: str) -> bool: + """Delete issue - not supported by Gitea API.""" + # Gitea doesn't support deleting issues via API + # We could close it instead + try: + issue = self.get_issue(issue_id) + if issue: + issue.close() + self.update_issue(issue) + return True + except Exception: + pass + return False + + def list_issues(self, filter_criteria: Optional[IssueFilter] = None) -> List[Issue]: + """List issues from Gitea.""" + params = { + 'state': 'all', # Get both open and closed + 'sort': 'updated', + 'order': 'desc' + } + + if filter_criteria: + if filter_criteria.state: + if filter_criteria.state == 'open': + params['state'] = 'open' + elif filter_criteria.state == 'closed': + params['state'] = 'closed' + + if filter_criteria.assignee: + params['assignee'] = filter_criteria.assignee + + if filter_criteria.milestone: + params['milestone'] = filter_criteria.milestone + + if filter_criteria.labels: + params['labels'] = ','.join(filter_criteria.labels) + + if filter_criteria.created_after: + params['since'] = filter_criteria.created_after.isoformat() + + if filter_criteria.limit: + params['limit'] = filter_criteria.limit + + if filter_criteria.offset: + params['page'] = (filter_criteria.offset // (filter_criteria.limit or 30)) + 1 + + response = self._api_request('GET', f'/repos/{self.owner}/{self.repo}/issues', params=params) + gitea_issues = response.json() + + issues = [self._gitea_issue_to_unified(gitea_issue) for gitea_issue in gitea_issues] + + # Apply additional filtering that Gitea API doesn't support + if filter_criteria: + if filter_criteria.search: + search_term = filter_criteria.search.lower() + issues = [ + issue for issue in issues + if search_term in issue.title.lower() or search_term in issue.description.lower() + ] + + return issues + + def search_issues(self, query: str, limit: Optional[int] = None) -> List[Issue]: + """Search issues - limited Gitea API support.""" + # Gitea has limited search API, fallback to list with search filter + filter_criteria = IssueFilter(search=query, limit=limit) + return self.list_issues(filter_criteria) + + # Label Operations + def create_label(self, label: Label) -> Label: + """Create label in Gitea.""" + data = { + 'name': label.name, + 'color': label.color or '#000000', + 'description': label.description or '' + } + + response = self._api_request('POST', f'/repos/{self.owner}/{self.repo}/labels', data=data) + gitea_label = response.json() + + return Label( + name=gitea_label['name'], + color=gitea_label['color'], + description=gitea_label.get('description', ''), + backend_id=str(gitea_label['id']) + ) + + def get_labels(self) -> List[Label]: + """Get all labels from Gitea.""" + response = self._api_request('GET', f'/repos/{self.owner}/{self.repo}/labels') + gitea_labels = response.json() + + return [Label( + name=label['name'], + color=label['color'], + description=label.get('description', ''), + backend_id=str(label['id']) + ) for label in gitea_labels] + + def update_label(self, label: Label) -> Label: + """Update label in Gitea.""" + data = { + 'name': label.name, + 'color': label.color or '#000000', + 'description': label.description or '' + } + + response = self._api_request('PATCH', f'/repos/{self.owner}/{self.repo}/labels/{label.name}', data=data) + gitea_label = response.json() + + return Label( + name=gitea_label['name'], + color=gitea_label['color'], + description=gitea_label.get('description', ''), + backend_id=str(gitea_label['id']) + ) + + def delete_label(self, label_name: str) -> bool: + """Delete label from Gitea.""" + try: + self._api_request('DELETE', f'/repos/{self.owner}/{self.repo}/labels/{label_name}') + return True + except GiteaAPIError: + return False + + # User Operations + def get_users(self) -> List[User]: + """Get repository collaborators.""" + response = self._api_request('GET', f'/repos/{self.owner}/{self.repo}/collaborators') + gitea_users = response.json() + + return [User( + id=str(user['id']), + username=user['login'], + display_name=user.get('full_name', ''), + email=user.get('email', ''), + avatar_url=user.get('avatar_url', ''), + backend_id=str(user['id']) + ) for user in gitea_users] + + def get_user(self, user_id: str) -> Optional[User]: + """Get specific user.""" + try: + response = self._api_request('GET', f'/users/{user_id}') + user = response.json() + return User( + id=str(user['id']), + username=user['login'], + display_name=user.get('full_name', ''), + email=user.get('email', ''), + avatar_url=user.get('avatar_url', ''), + backend_id=str(user['id']) + ) + except GiteaAPIError: + return None + + def search_users(self, query: str) -> List[User]: + """Search users in Gitea.""" + params = {'q': query, 'limit': 50} + response = self._api_request('GET', '/users/search', params=params) + search_result = response.json() + + return [User( + id=str(user['id']), + username=user['login'], + display_name=user.get('full_name', ''), + email=user.get('email', ''), + avatar_url=user.get('avatar_url', ''), + backend_id=str(user['id']) + ) for user in search_result.get('data', [])] + + # Milestone Operations + def create_milestone(self, milestone: Milestone) -> Milestone: + """Create milestone in Gitea.""" + data = { + 'title': milestone.title, + 'description': milestone.description or '', + 'due_on': milestone.due_date.isoformat() if milestone.due_date else None + } + + response = self._api_request('POST', f'/repos/{self.owner}/{self.repo}/milestones', data=data) + gitea_milestone = response.json() + + return Milestone( + id=str(gitea_milestone['id']), + title=gitea_milestone['title'], + description=gitea_milestone.get('description', ''), + state=gitea_milestone['state'], + due_date=datetime.fromisoformat(gitea_milestone['due_on'].replace('Z', '+00:00')) if gitea_milestone.get('due_on') else None, + created_at=datetime.fromisoformat(gitea_milestone['created_at'].replace('Z', '+00:00')), + updated_at=datetime.fromisoformat(gitea_milestone['updated_at'].replace('Z', '+00:00')), + backend_id=str(gitea_milestone['id']) + ) + + def get_milestones(self) -> List[Milestone]: + """Get all milestones.""" + response = self._api_request('GET', f'/repos/{self.owner}/{self.repo}/milestones') + gitea_milestones = response.json() + + return [Milestone( + id=str(m['id']), + title=m['title'], + description=m.get('description', ''), + state=m['state'], + due_date=datetime.fromisoformat(m['due_on'].replace('Z', '+00:00')) if m.get('due_on') else None, + created_at=datetime.fromisoformat(m['created_at'].replace('Z', '+00:00')), + updated_at=datetime.fromisoformat(m['updated_at'].replace('Z', '+00:00')), + backend_id=str(m['id']) + ) for m in gitea_milestones] + + def update_milestone(self, milestone: Milestone) -> Milestone: + """Update milestone.""" + data = { + 'title': milestone.title, + 'description': milestone.description or '', + 'state': milestone.state, + 'due_on': milestone.due_date.isoformat() if milestone.due_date else None + } + + response = self._api_request('PATCH', f'/repos/{self.owner}/{self.repo}/milestones/{milestone.backend_id}', data=data) + gitea_milestone = response.json() + + return Milestone( + id=str(gitea_milestone['id']), + title=gitea_milestone['title'], + description=gitea_milestone.get('description', ''), + state=gitea_milestone['state'], + due_date=datetime.fromisoformat(gitea_milestone['due_on'].replace('Z', '+00:00')) if gitea_milestone.get('due_on') else None, + created_at=datetime.fromisoformat(gitea_milestone['created_at'].replace('Z', '+00:00')), + updated_at=datetime.fromisoformat(gitea_milestone['updated_at'].replace('Z', '+00:00')), + backend_id=str(gitea_milestone['id']) + ) + + def delete_milestone(self, milestone_id: str) -> bool: + """Delete milestone.""" + try: + self._api_request('DELETE', f'/repos/{self.owner}/{self.repo}/milestones/{milestone_id}') + return True + except GiteaAPIError: + return False + + # Comment Operations + def add_comment(self, issue_id: str, comment: Comment) -> Comment: + """Add comment to issue.""" + data = {'body': comment.body} + + response = self._api_request('POST', f'/repos/{self.owner}/{self.repo}/issues/{issue_id}/comments', data=data) + gitea_comment = response.json() + + # Convert author + author_data = gitea_comment['user'] + author = User( + id=str(author_data['id']), + username=author_data['login'], + display_name=author_data.get('full_name', ''), + email=author_data.get('email', ''), + avatar_url=author_data.get('avatar_url', ''), + backend_id=str(author_data['id']) + ) + + return Comment( + id=str(gitea_comment['id']), + body=gitea_comment['body'], + author=author, + created_at=datetime.fromisoformat(gitea_comment['created_at'].replace('Z', '+00:00')), + updated_at=datetime.fromisoformat(gitea_comment['updated_at'].replace('Z', '+00:00')), + backend_id=str(gitea_comment['id']) + ) + + def get_comments(self, issue_id: str) -> List[Comment]: + """Get comments for issue.""" + response = self._api_request('GET', f'/repos/{self.owner}/{self.repo}/issues/{issue_id}/comments') + gitea_comments = response.json() + + comments = [] + for gc in gitea_comments: + author_data = gc['user'] + author = User( + id=str(author_data['id']), + username=author_data['login'], + display_name=author_data.get('full_name', ''), + email=author_data.get('email', ''), + avatar_url=author_data.get('avatar_url', ''), + backend_id=str(author_data['id']) + ) + + comment = Comment( + id=str(gc['id']), + body=gc['body'], + author=author, + created_at=datetime.fromisoformat(gc['created_at'].replace('Z', '+00:00')), + updated_at=datetime.fromisoformat(gc['updated_at'].replace('Z', '+00:00')), + backend_id=str(gc['id']) + ) + comments.append(comment) + + return comments + + def update_comment(self, comment: Comment) -> Comment: + """Update comment.""" + data = {'body': comment.body} + + response = self._api_request('PATCH', f'/repos/{self.owner}/{self.repo}/issues/comments/{comment.backend_id}', data=data) + gitea_comment = response.json() + + # Update comment object + comment.updated_at = datetime.fromisoformat(gitea_comment['updated_at'].replace('Z', '+00:00')) + return comment + + def delete_comment(self, comment_id: str) -> bool: + """Delete comment.""" + try: + self._api_request('DELETE', f'/repos/{self.owner}/{self.repo}/issues/comments/{comment_id}') + return True + except GiteaAPIError: + return False + + # Sync Support + def get_last_sync_timestamp(self) -> Optional[datetime]: + """Get last sync timestamp - stored in metadata.""" + # Could be stored in repository description or other metadata + return None + + def get_issues_modified_since(self, timestamp: datetime) -> List[Issue]: + """Get issues modified since timestamp.""" + filter_criteria = IssueFilter(updated_after=timestamp) + return self.list_issues(filter_criteria) + + # SyncableBackend Implementation + def prepare_for_sync(self) -> None: + """Prepare for sync operation.""" + # Could implement rate limiting preparation + pass + + def finalize_sync(self, success: bool) -> None: + """Finalize sync operation.""" + # Could log sync status + pass + + def get_sync_conflicts(self) -> List[Dict[str, Any]]: + """Get sync conflicts.""" + # Would compare timestamps and detect conflicts + return [] + + def resolve_sync_conflict(self, issue_id: str, resolution: str) -> Issue: + """Resolve sync conflict.""" + issue = self.get_issue(issue_id) + if not issue: + raise GiteaAPIError(f"Issue {issue_id} not found") + + if resolution == 'remote': + # Keep remote version (current issue) + return issue + elif resolution == 'local': + # This would require the local version to be provided + raise NotImplementedError("Local resolution requires local issue data") + else: + raise ValueError(f"Unknown resolution: {resolution}") \ No newline at end of file diff --git a/backends/local/__init__.py b/backends/local/__init__.py new file mode 100644 index 0000000..a972084 --- /dev/null +++ b/backends/local/__init__.py @@ -0,0 +1,19 @@ +""" +Local SQLite Backend + +A local, file-based issue tracking backend using SQLite for storage. +This backend provides complete offline functionality and serves as the +reference implementation for the backend interface. + +Features: +- Full CRUD operations +- SQLite database storage +- No external dependencies +- Offline operation +- Fast local search +- Backup and export capabilities +""" + +from .backend import LocalSQLiteBackend + +__all__ = ['LocalSQLiteBackend'] \ No newline at end of file diff --git a/backends/local/backend.py b/backends/local/backend.py new file mode 100644 index 0000000..b6c7d25 --- /dev/null +++ b/backends/local/backend.py @@ -0,0 +1,618 @@ +""" +Local SQLite Backend Implementation + +Provides a complete local issue tracking backend using SQLite for storage. +This implementation serves as the reference for the backend interface and +provides full offline functionality. +""" + +import sqlite3 +import json +import uuid +from datetime import datetime, timezone +from pathlib import Path +from typing import List, Optional, Dict, Any + +from ...core.interfaces import LocalBackend, BackendCapabilities, IssueFilter, SyncableBackend +from ...core.models import Issue, Label, User, Milestone, Comment, IssueState, Priority, IssueType + + +class LocalSQLiteBackend(LocalBackend, SyncableBackend): + """SQLite-based local backend for issue tracking.""" + + def __init__(self, db_path: Optional[str] = None): + self.db_path = db_path or "issues.db" + self.connection: Optional[sqlite3.Connection] = None + self._capabilities = BackendCapabilities( + supports_milestones=True, + supports_assignees=True, + supports_comments=True, + supports_labels=True, + supports_search=True, + supports_bulk_operations=True, + supports_webhooks=False, + supports_real_time=False, + max_labels_per_issue=None, + max_assignees_per_issue=None + ) + + @property + def backend_type(self) -> str: + return "local" + + @property + def capabilities(self) -> BackendCapabilities: + return self._capabilities + + def connect(self, config: Dict[str, Any]) -> None: + """Connect to SQLite database.""" + db_path = config.get('db_path', self.db_path) + self.db_path = db_path + + # Ensure directory exists + Path(db_path).parent.mkdir(parents=True, exist_ok=True) + + self.connection = sqlite3.connect(db_path) + self.connection.row_factory = sqlite3.Row # Enable dict-like access + self.connection.execute("PRAGMA foreign_keys = ON") + + # Initialize schema + self._initialize_schema() + + def disconnect(self) -> None: + """Disconnect from database.""" + if self.connection: + self.connection.close() + self.connection = None + + def test_connection(self) -> bool: + """Test database connection.""" + if not self.connection: + return False + try: + self.connection.execute("SELECT 1") + return True + except sqlite3.Error: + return False + + def _initialize_schema(self) -> None: + """Initialize database schema.""" + schema_path = Path(__file__).parent / "schema.sql" + with open(schema_path, 'r') as f: + schema_sql = f.read() + + # Execute schema in parts (SQLite doesn't like multiple statements) + for statement in schema_sql.split(';'): + statement = statement.strip() + if statement: + self.connection.execute(statement) + self.connection.commit() + + def _get_next_issue_number(self) -> int: + """Get the next available issue number.""" + cursor = self.connection.execute("SELECT MAX(number) FROM issues") + result = cursor.fetchone() + return (result[0] or 0) + 1 + + def _issue_from_row(self, row: sqlite3.Row) -> Issue: + """Convert database row to Issue object.""" + # Get labels + cursor = self.connection.execute(""" + SELECT l.id, l.name, l.color, l.description, l.backend_id + FROM labels l + JOIN issue_labels il ON l.id = il.label_id + WHERE il.issue_id = ? + """, (row['id'],)) + label_rows = cursor.fetchall() + labels = [Label( + name=lr['name'], + color=lr['color'], + description=lr['description'], + backend_id=lr['backend_id'] + ) for lr in label_rows] + + # Get assignees + cursor = self.connection.execute(""" + SELECT u.id, u.username, u.display_name, u.email, u.avatar_url, u.backend_id + FROM users u + JOIN issue_assignees ia ON u.id = ia.user_id + WHERE ia.issue_id = ? + """, (row['id'],)) + user_rows = cursor.fetchall() + assignees = [User( + id=ur['id'], + username=ur['username'], + display_name=ur['display_name'], + email=ur['email'], + avatar_url=ur['avatar_url'], + backend_id=ur['backend_id'] + ) for ur in user_rows] + + # Get milestone + milestone = None + if row['milestone_id']: + cursor = self.connection.execute(""" + SELECT id, title, description, state, due_date, created_at, updated_at, backend_id + FROM milestones WHERE id = ? + """, (row['milestone_id'],)) + m_row = cursor.fetchone() + if m_row: + milestone = Milestone( + id=m_row['id'], + title=m_row['title'], + description=m_row['description'], + state=m_row['state'], + due_date=datetime.fromisoformat(m_row['due_date']) if m_row['due_date'] else None, + created_at=datetime.fromisoformat(m_row['created_at']) if m_row['created_at'] else None, + updated_at=datetime.fromisoformat(m_row['updated_at']) if m_row['updated_at'] else None, + backend_id=m_row['backend_id'] + ) + + # Parse sync metadata + sync_metadata = {} + if row['sync_metadata']: + try: + sync_metadata = json.loads(row['sync_metadata']) + except json.JSONDecodeError: + pass + + return Issue( + id=row['id'], + number=row['number'], + title=row['title'], + description=row['description'], + state=IssueState.from_string(row['state']), + created_at=datetime.fromisoformat(row['created_at']), + updated_at=datetime.fromisoformat(row['updated_at']), + closed_at=datetime.fromisoformat(row['closed_at']) if row['closed_at'] else None, + labels=labels, + assignees=assignees, + milestone=milestone, + backend_id=row['backend_id'], + backend_type=row['backend_type'], + sync_metadata=sync_metadata + ) + + # Issue CRUD Operations + def create_issue(self, issue: Issue) -> Issue: + """Create a new issue.""" + if not issue.id: + issue.id = str(uuid.uuid4()) + + if not issue.number: + issue.number = self._get_next_issue_number() + + # Insert issue + self.connection.execute(""" + INSERT INTO issues (id, number, title, description, state, created_at, updated_at, + closed_at, milestone_id, backend_id, backend_type, sync_metadata) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, ( + issue.id, + issue.number, + issue.title, + issue.description, + issue.state.value, + issue.created_at.isoformat(), + issue.updated_at.isoformat(), + issue.closed_at.isoformat() if issue.closed_at else None, + issue.milestone.id if issue.milestone else None, + issue.backend_id, + issue.backend_type or 'local', + json.dumps(issue.sync_metadata) if issue.sync_metadata else None + )) + + # Add labels + for label in issue.labels: + self._ensure_label_exists(label) + self.connection.execute(""" + INSERT OR IGNORE INTO issue_labels (issue_id, label_id) + VALUES (?, ?) + """, (issue.id, label.name)) # Using name as ID for simplicity + + # Add assignees + for user in issue.assignees: + self._ensure_user_exists(user) + self.connection.execute(""" + INSERT OR IGNORE INTO issue_assignees (issue_id, user_id) + VALUES (?, ?) + """, (issue.id, user.id)) + + self.connection.commit() + return issue + + def get_issue(self, issue_id: str) -> Optional[Issue]: + """Get issue by ID.""" + cursor = self.connection.execute(""" + SELECT * FROM issues WHERE id = ? OR backend_id = ? + """, (issue_id, issue_id)) + row = cursor.fetchone() + return self._issue_from_row(row) if row else None + + def get_issue_by_number(self, number: int) -> Optional[Issue]: + """Get issue by number.""" + cursor = self.connection.execute(""" + SELECT * FROM issues WHERE number = ? + """, (number,)) + row = cursor.fetchone() + return self._issue_from_row(row) if row else None + + def update_issue(self, issue: Issue) -> Issue: + """Update existing issue.""" + # Update main issue record + self.connection.execute(""" + UPDATE issues SET + title = ?, description = ?, state = ?, updated_at = ?, + closed_at = ?, milestone_id = ?, sync_metadata = ? + WHERE id = ? + """, ( + issue.title, + issue.description, + issue.state.value, + issue.updated_at.isoformat(), + issue.closed_at.isoformat() if issue.closed_at else None, + issue.milestone.id if issue.milestone else None, + json.dumps(issue.sync_metadata) if issue.sync_metadata else None, + issue.id + )) + + # Update labels (remove all and re-add) + self.connection.execute("DELETE FROM issue_labels WHERE issue_id = ?", (issue.id,)) + for label in issue.labels: + self._ensure_label_exists(label) + self.connection.execute(""" + INSERT INTO issue_labels (issue_id, label_id) VALUES (?, ?) + """, (issue.id, label.name)) + + # Update assignees (remove all and re-add) + self.connection.execute("DELETE FROM issue_assignees WHERE issue_id = ?", (issue.id,)) + for user in issue.assignees: + self._ensure_user_exists(user) + self.connection.execute(""" + INSERT INTO issue_assignees (issue_id, user_id) VALUES (?, ?) + """, (issue.id, user.id)) + + self.connection.commit() + return issue + + def delete_issue(self, issue_id: str) -> bool: + """Delete issue.""" + cursor = self.connection.execute("DELETE FROM issues WHERE id = ?", (issue_id,)) + self.connection.commit() + return cursor.rowcount > 0 + + def list_issues(self, filter_criteria: Optional[IssueFilter] = None) -> List[Issue]: + """List issues with optional filtering.""" + query = "SELECT * FROM issues WHERE 1=1" + params = [] + + if filter_criteria: + if filter_criteria.state: + query += " AND state = ?" + params.append(filter_criteria.state) + + if filter_criteria.search: + query += " AND (title LIKE ? OR description LIKE ?)" + search_term = f"%{filter_criteria.search}%" + params.extend([search_term, search_term]) + + if filter_criteria.created_after: + query += " AND created_at >= ?" + params.append(filter_criteria.created_after.isoformat()) + + if filter_criteria.created_before: + query += " AND created_at <= ?" + params.append(filter_criteria.created_before.isoformat()) + + if filter_criteria.updated_after: + query += " AND updated_at >= ?" + params.append(filter_criteria.updated_after.isoformat()) + + if filter_criteria.updated_before: + query += " AND updated_at <= ?" + params.append(filter_criteria.updated_before.isoformat()) + + query += " ORDER BY updated_at DESC" + + if filter_criteria and filter_criteria.limit: + query += " LIMIT ?" + params.append(filter_criteria.limit) + if filter_criteria.offset: + query += " OFFSET ?" + params.append(filter_criteria.offset) + + cursor = self.connection.execute(query, params) + rows = cursor.fetchall() + return [self._issue_from_row(row) for row in rows] + + def search_issues(self, query: str, limit: Optional[int] = None) -> List[Issue]: + """Search issues using FTS if available, otherwise fallback to LIKE.""" + try: + # Try FTS search first + fts_query = """ + SELECT i.* FROM issues i + JOIN issue_search s ON i.id = s.issue_id + WHERE issue_search MATCH ? + ORDER BY rank + """ + params = [query] + if limit: + fts_query += " LIMIT ?" + params.append(limit) + + cursor = self.connection.execute(fts_query, params) + rows = cursor.fetchall() + return [self._issue_from_row(row) for row in rows] + + except sqlite3.OperationalError: + # Fallback to LIKE search + filter_criteria = IssueFilter(search=query, limit=limit) + return self.list_issues(filter_criteria) + + # Helper methods + def _ensure_label_exists(self, label: Label) -> None: + """Ensure label exists in database.""" + self.connection.execute(""" + INSERT OR IGNORE INTO labels (id, name, color, description, backend_id) + VALUES (?, ?, ?, ?, ?) + """, (label.name, label.name, label.color, label.description, label.backend_id)) + + def _ensure_user_exists(self, user: User) -> None: + """Ensure user exists in database.""" + self.connection.execute(""" + INSERT OR IGNORE INTO users (id, username, display_name, email, avatar_url, backend_id) + VALUES (?, ?, ?, ?, ?, ?) + """, (user.id, user.username, user.display_name, user.email, user.avatar_url, user.backend_id)) + + # Label Operations + def create_label(self, label: Label) -> Label: + """Create a new label.""" + label_id = label.name # Use name as ID + self.connection.execute(""" + INSERT INTO labels (id, name, color, description, backend_id) + VALUES (?, ?, ?, ?, ?) + """, (label_id, label.name, label.color, label.description, label.backend_id)) + self.connection.commit() + return label + + def get_labels(self) -> List[Label]: + """Get all labels.""" + cursor = self.connection.execute("SELECT * FROM labels ORDER BY name") + rows = cursor.fetchall() + return [Label( + name=row['name'], + color=row['color'], + description=row['description'], + backend_id=row['backend_id'] + ) for row in rows] + + def update_label(self, label: Label) -> Label: + """Update label.""" + self.connection.execute(""" + UPDATE labels SET color = ?, description = ? WHERE name = ? + """, (label.color, label.description, label.name)) + self.connection.commit() + return label + + def delete_label(self, label_name: str) -> bool: + """Delete label.""" + cursor = self.connection.execute("DELETE FROM labels WHERE name = ?", (label_name,)) + self.connection.commit() + return cursor.rowcount > 0 + + # User Operations + def get_users(self) -> List[User]: + """Get all users.""" + cursor = self.connection.execute("SELECT * FROM users ORDER BY username") + rows = cursor.fetchall() + return [User( + id=row['id'], + username=row['username'], + display_name=row['display_name'], + email=row['email'], + avatar_url=row['avatar_url'], + backend_id=row['backend_id'] + ) for row in rows] + + def get_user(self, user_id: str) -> Optional[User]: + """Get user by ID.""" + cursor = self.connection.execute("SELECT * FROM users WHERE id = ?", (user_id,)) + row = cursor.fetchone() + if row: + return User( + id=row['id'], + username=row['username'], + display_name=row['display_name'], + email=row['email'], + avatar_url=row['avatar_url'], + backend_id=row['backend_id'] + ) + return None + + def search_users(self, query: str) -> List[User]: + """Search users.""" + cursor = self.connection.execute(""" + SELECT * FROM users + WHERE username LIKE ? OR display_name LIKE ? OR email LIKE ? + ORDER BY username + """, (f"%{query}%", f"%{query}%", f"%{query}%")) + rows = cursor.fetchall() + return [User( + id=row['id'], + username=row['username'], + display_name=row['display_name'], + email=row['email'], + avatar_url=row['avatar_url'], + backend_id=row['backend_id'] + ) for row in rows] + + # Milestone Operations + def create_milestone(self, milestone: Milestone) -> Milestone: + """Create milestone.""" + if not milestone.id: + milestone.id = str(uuid.uuid4()) + + self.connection.execute(""" + INSERT INTO milestones (id, title, description, state, due_date, created_at, updated_at, backend_id) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, ( + milestone.id, + milestone.title, + milestone.description, + milestone.state, + milestone.due_date.isoformat() if milestone.due_date else None, + milestone.created_at.isoformat() if milestone.created_at else datetime.now(timezone.utc).isoformat(), + milestone.updated_at.isoformat() if milestone.updated_at else datetime.now(timezone.utc).isoformat(), + milestone.backend_id + )) + self.connection.commit() + return milestone + + def get_milestones(self) -> List[Milestone]: + """Get all milestones.""" + cursor = self.connection.execute("SELECT * FROM milestones ORDER BY title") + rows = cursor.fetchall() + return [Milestone( + id=row['id'], + title=row['title'], + description=row['description'], + state=row['state'], + due_date=datetime.fromisoformat(row['due_date']) if row['due_date'] else None, + created_at=datetime.fromisoformat(row['created_at']) if row['created_at'] else None, + updated_at=datetime.fromisoformat(row['updated_at']) if row['updated_at'] else None, + backend_id=row['backend_id'] + ) for row in rows] + + def update_milestone(self, milestone: Milestone) -> Milestone: + """Update milestone.""" + self.connection.execute(""" + UPDATE milestones SET title = ?, description = ?, state = ?, due_date = ?, updated_at = ? + WHERE id = ? + """, ( + milestone.title, + milestone.description, + milestone.state, + milestone.due_date.isoformat() if milestone.due_date else None, + datetime.now(timezone.utc).isoformat(), + milestone.id + )) + self.connection.commit() + return milestone + + def delete_milestone(self, milestone_id: str) -> bool: + """Delete milestone.""" + cursor = self.connection.execute("DELETE FROM milestones WHERE id = ?", (milestone_id,)) + self.connection.commit() + return cursor.rowcount > 0 + + # Comment Operations + def add_comment(self, issue_id: str, comment: Comment) -> Comment: + """Add comment to issue.""" + if not comment.id: + comment.id = str(uuid.uuid4()) + + self._ensure_user_exists(comment.author) + + self.connection.execute(""" + INSERT INTO comments (id, issue_id, author_id, body, created_at, updated_at, backend_id) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, ( + comment.id, + issue_id, + comment.author.id, + comment.body, + comment.created_at.isoformat(), + comment.updated_at.isoformat() if comment.updated_at else None, + comment.backend_id + )) + self.connection.commit() + return comment + + def get_comments(self, issue_id: str) -> List[Comment]: + """Get comments for issue.""" + cursor = self.connection.execute(""" + SELECT c.*, u.id as user_id, u.username, u.display_name, u.email, u.avatar_url, u.backend_id as user_backend_id + FROM comments c + JOIN users u ON c.author_id = u.id + WHERE c.issue_id = ? + ORDER BY c.created_at + """, (issue_id,)) + rows = cursor.fetchall() + + comments = [] + for row in rows: + author = User( + id=row['user_id'], + username=row['username'], + display_name=row['display_name'], + email=row['email'], + avatar_url=row['avatar_url'], + backend_id=row['user_backend_id'] + ) + comment = Comment( + id=row['id'], + body=row['body'], + author=author, + created_at=datetime.fromisoformat(row['created_at']), + updated_at=datetime.fromisoformat(row['updated_at']) if row['updated_at'] else None, + backend_id=row['backend_id'] + ) + comments.append(comment) + + return comments + + def update_comment(self, comment: Comment) -> Comment: + """Update comment.""" + self.connection.execute(""" + UPDATE comments SET body = ?, updated_at = ? WHERE id = ? + """, (comment.body, datetime.now(timezone.utc).isoformat(), comment.id)) + self.connection.commit() + return comment + + def delete_comment(self, comment_id: str) -> bool: + """Delete comment.""" + cursor = self.connection.execute("DELETE FROM comments WHERE id = ?", (comment_id,)) + self.connection.commit() + return cursor.rowcount > 0 + + # Sync Support + def get_last_sync_timestamp(self) -> Optional[datetime]: + """Get last sync timestamp.""" + cursor = self.connection.execute(""" + SELECT sync_timestamp FROM sync_history + WHERE success = 1 + ORDER BY sync_timestamp DESC + LIMIT 1 + """) + row = cursor.fetchone() + return datetime.fromisoformat(row[0]) if row else None + + def get_issues_modified_since(self, timestamp: datetime) -> List[Issue]: + """Get issues modified since timestamp.""" + filter_criteria = IssueFilter(updated_after=timestamp) + return self.list_issues(filter_criteria) + + # SyncableBackend Implementation + def prepare_for_sync(self) -> None: + """Prepare for sync operation.""" + # Could create backup or start transaction + pass + + def finalize_sync(self, success: bool) -> None: + """Finalize sync operation.""" + # Log sync operation + self.connection.execute(""" + INSERT INTO sync_history (backend_type, success, sync_timestamp) + VALUES (?, ?, ?) + """, ('sync', success, datetime.now(timezone.utc).isoformat())) + self.connection.commit() + + def get_sync_conflicts(self) -> List[Dict[str, Any]]: + """Get sync conflicts.""" + # For local backend, no conflicts since it's the source of truth + return [] + + def resolve_sync_conflict(self, issue_id: str, resolution: str) -> Issue: + """Resolve sync conflict.""" + # Local backend doesn't have conflicts + return self.get_issue(issue_id) \ No newline at end of file diff --git a/backends/local/schema.sql b/backends/local/schema.sql new file mode 100644 index 0000000..1dda808 --- /dev/null +++ b/backends/local/schema.sql @@ -0,0 +1,189 @@ +-- Local Issue Tracking Database Schema +-- SQLite schema for local issue storage with full referential integrity + +-- Enable foreign key constraints +PRAGMA foreign_keys = ON; + +-- Issues table - core issue data +CREATE TABLE IF NOT EXISTS issues ( + id TEXT PRIMARY KEY, + number INTEGER UNIQUE NOT NULL, + title TEXT NOT NULL, + description TEXT NOT NULL DEFAULT '', + state TEXT NOT NULL CHECK (state IN ('open', 'closed', 'in_progress', 'blocked')), + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + closed_at TIMESTAMP NULL, + milestone_id TEXT, + backend_id TEXT, + backend_type TEXT DEFAULT 'local', + sync_metadata TEXT, -- JSON for sync data + FOREIGN KEY (milestone_id) REFERENCES milestones(id) ON DELETE SET NULL +); + +-- Create index for issue number lookups +CREATE INDEX IF NOT EXISTS idx_issues_number ON issues(number); +CREATE INDEX IF NOT EXISTS idx_issues_state ON issues(state); +CREATE INDEX IF NOT EXISTS idx_issues_updated_at ON issues(updated_at); +CREATE INDEX IF NOT EXISTS idx_issues_backend_id ON issues(backend_id); + +-- Labels table +CREATE TABLE IF NOT EXISTS labels ( + id TEXT PRIMARY KEY, + name TEXT UNIQUE NOT NULL, + color TEXT, + description TEXT, + backend_id TEXT, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +-- Create index for label name lookups +CREATE INDEX IF NOT EXISTS idx_labels_name ON labels(name); + +-- Issue-Label many-to-many relationship +CREATE TABLE IF NOT EXISTS issue_labels ( + issue_id TEXT NOT NULL, + label_id TEXT NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (issue_id, label_id), + FOREIGN KEY (issue_id) REFERENCES issues(id) ON DELETE CASCADE, + FOREIGN KEY (label_id) REFERENCES labels(id) ON DELETE CASCADE +); + +-- Users table +CREATE TABLE IF NOT EXISTS users ( + id TEXT PRIMARY KEY, + username TEXT UNIQUE NOT NULL, + display_name TEXT, + email TEXT, + avatar_url TEXT, + backend_id TEXT, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +-- Create index for username lookups +CREATE INDEX IF NOT EXISTS idx_users_username ON users(username); + +-- Issue-User assignment many-to-many relationship +CREATE TABLE IF NOT EXISTS issue_assignees ( + issue_id TEXT NOT NULL, + user_id TEXT NOT NULL, + assigned_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (issue_id, user_id), + FOREIGN KEY (issue_id) REFERENCES issues(id) ON DELETE CASCADE, + FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE +); + +-- Milestones table +CREATE TABLE IF NOT EXISTS milestones ( + id TEXT PRIMARY KEY, + title TEXT NOT NULL, + description TEXT, + state TEXT NOT NULL DEFAULT 'open' CHECK (state IN ('open', 'closed')), + due_date TIMESTAMP, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + backend_id TEXT +); + +-- Create index for milestone title lookups +CREATE INDEX IF NOT EXISTS idx_milestones_title ON milestones(title); + +-- Comments table +CREATE TABLE IF NOT EXISTS comments ( + id TEXT PRIMARY KEY, + issue_id TEXT NOT NULL, + author_id TEXT NOT NULL, + body TEXT NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP, + backend_id TEXT, + FOREIGN KEY (issue_id) REFERENCES issues(id) ON DELETE CASCADE, + FOREIGN KEY (author_id) REFERENCES users(id) ON DELETE CASCADE +); + +-- Create index for comment lookups +CREATE INDEX IF NOT EXISTS idx_comments_issue_id ON comments(issue_id); +CREATE INDEX IF NOT EXISTS idx_comments_created_at ON comments(created_at); + +-- Sync tracking table +CREATE TABLE IF NOT EXISTS sync_history ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + backend_type TEXT NOT NULL, + sync_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + success BOOLEAN NOT NULL, + issues_synced INTEGER DEFAULT 0, + errors_count INTEGER DEFAULT 0, + details TEXT -- JSON for sync details +); + +-- Configuration table for backend settings +CREATE TABLE IF NOT EXISTS config ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +-- Triggers to automatically update updated_at timestamps +CREATE TRIGGER IF NOT EXISTS update_issues_timestamp + AFTER UPDATE ON issues +BEGIN + UPDATE issues SET updated_at = CURRENT_TIMESTAMP WHERE id = NEW.id; +END; + +CREATE TRIGGER IF NOT EXISTS update_milestones_timestamp + AFTER UPDATE ON milestones +BEGIN + UPDATE milestones SET updated_at = CURRENT_TIMESTAMP WHERE id = NEW.id; +END; + +-- Views for common queries +CREATE VIEW IF NOT EXISTS issue_summary AS +SELECT + i.id, + i.number, + i.title, + i.state, + i.created_at, + i.updated_at, + i.closed_at, + m.title as milestone_title, + COUNT(c.id) as comment_count, + GROUP_CONCAT(l.name) as labels, + GROUP_CONCAT(u.username) as assignees +FROM issues i +LEFT JOIN milestones m ON i.milestone_id = m.id +LEFT JOIN comments c ON i.id = c.issue_id +LEFT JOIN issue_labels il ON i.id = il.issue_id +LEFT JOIN labels l ON il.label_id = l.id +LEFT JOIN issue_assignees ia ON i.id = ia.issue_id +LEFT JOIN users u ON ia.user_id = u.id +GROUP BY i.id, i.number, i.title, i.state, i.created_at, i.updated_at, i.closed_at, m.title; + +-- Full-text search setup (if SQLite supports FTS) +CREATE VIRTUAL TABLE IF NOT EXISTS issue_search USING fts5( + issue_id, + title, + description, + labels, + content='issues' +); + +-- Trigger to keep FTS index updated +CREATE TRIGGER IF NOT EXISTS issue_search_insert AFTER INSERT ON issues +BEGIN + INSERT INTO issue_search(issue_id, title, description) + VALUES (NEW.id, NEW.title, NEW.description); +END; + +CREATE TRIGGER IF NOT EXISTS issue_search_update AFTER UPDATE ON issues +BEGIN + UPDATE issue_search + SET title = NEW.title, description = NEW.description + WHERE issue_id = NEW.id; +END; + +CREATE TRIGGER IF NOT EXISTS issue_search_delete AFTER DELETE ON issues +BEGIN + DELETE FROM issue_search WHERE issue_id = OLD.id; +END; \ No newline at end of file diff --git a/cli/__init__.py b/cli/__init__.py new file mode 100644 index 0000000..fee3716 --- /dev/null +++ b/cli/__init__.py @@ -0,0 +1,20 @@ +""" +Command Line Interface for Universal Issue Tracking + +Provides a comprehensive CLI for managing issues across different backends. +The CLI is designed to be intuitive and follows common patterns from +tools like git, gh (GitHub CLI), and similar utilities. + +Commands: +- issue list: List issues +- issue show: Show issue details +- issue create: Create new issue +- issue edit: Edit existing issue +- issue close: Close issue +- issue reopen: Reopen issue +- issue comment: Add comment +- issue label: Manage labels +- issue assign: Manage assignments +- backend: Manage backends +- sync: Synchronization operations +""" \ No newline at end of file diff --git a/cli/backend_commands.py b/cli/backend_commands.py new file mode 100644 index 0000000..580b5d9 --- /dev/null +++ b/cli/backend_commands.py @@ -0,0 +1,141 @@ +""" +Backend Management CLI Commands + +Commands for configuring and managing issue tracking backends. +""" + +import click +from .utils import ( + load_backend_configs, save_backend_configs, format_backend_list, + test_backend_connection, validate_backend_type, echo_success, + echo_error, echo_warning, confirm_action +) + + +@click.group() +def backend_group(): + """Backend configuration and management.""" + pass + + +@backend_group.command('list') +def list_backends(): + """List configured backends.""" + configs = load_backend_configs() + click.echo(format_backend_list(configs)) + + +@backend_group.command('add') +@click.argument('name') +@click.argument('backend_type', type=click.Choice(['local', 'gitea'])) +@click.pass_context +def add_backend(ctx, name, backend_type): + """Add a new backend configuration.""" + configs = load_backend_configs() + + if name in configs: + if not confirm_action(f"Backend '{name}' already exists. Overwrite?"): + click.echo("Aborted") + return + + if backend_type == 'local': + db_path = click.prompt('Database path', default=f'~/.config/issue-tracker/{name}.db') + config = { + 'type': 'local', + 'db_path': str(db_path) + } + elif backend_type == 'gitea': + base_url = click.prompt('Gitea base URL (e.g., https://git.example.com)') + owner = click.prompt('Repository owner/organization') + repo = click.prompt('Repository name') + token = click.prompt('Access token', hide_input=True) + + config = { + 'type': 'gitea', + 'base_url': base_url.rstrip('/'), + 'owner': owner, + 'repo': repo, + 'token': token + } + + # Test connection + click.echo("Testing connection...") + if test_backend_connection(config): + echo_success("Connection successful!") + else: + echo_warning("Connection test failed, but configuration will be saved anyway.") + + # Save configuration + configs[name] = config + save_backend_configs(configs) + + echo_success(f"Backend '{name}' added successfully") + + # Set as default if it's the first one + if 'default' not in configs: + configs['default'] = name + save_backend_configs(configs) + echo_info(f"Set '{name}' as default backend") + + +@backend_group.command('remove') +@click.argument('name') +def remove_backend(name): + """Remove a backend configuration.""" + configs = load_backend_configs() + + if name not in configs: + echo_error(f"Backend '{name}' not found") + return + + if not confirm_action(f"Remove backend '{name}'?"): + click.echo("Aborted") + return + + del configs[name] + + # Update default if necessary + if configs.get('default') == name: + remaining_backends = [k for k in configs.keys() if k != 'default'] + if remaining_backends: + configs['default'] = remaining_backends[0] + echo_info(f"Set '{configs['default']}' as new default backend") + else: + del configs['default'] + + save_backend_configs(configs) + echo_success(f"Backend '{name}' removed") + + +@backend_group.command('test') +@click.argument('name') +def test_backend(name): + """Test backend connection.""" + configs = load_backend_configs() + + if name not in configs: + echo_error(f"Backend '{name}' not found") + return + + config = configs[name] + click.echo(f"Testing connection to '{name}'...") + + if test_backend_connection(config): + echo_success("Connection successful!") + else: + echo_error("Connection failed!") + + +@backend_group.command('set-default') +@click.argument('name') +def set_default_backend(name): + """Set default backend.""" + configs = load_backend_configs() + + if name not in configs: + echo_error(f"Backend '{name}' not found") + return + + configs['default'] = name + save_backend_configs(configs) + echo_success(f"Set '{name}' as default backend") \ No newline at end of file diff --git a/cli/commands.py b/cli/commands.py new file mode 100644 index 0000000..09ff08b --- /dev/null +++ b/cli/commands.py @@ -0,0 +1,417 @@ +""" +Issue Management CLI Commands + +Core commands for managing issues: create, list, show, edit, close, etc. +""" + +import click +from datetime import datetime, timezone +from typing import Optional + +from ..core.models import Issue, Label, User, IssueState, Priority, IssueType +from ..core.interfaces import IssueFilter +from .utils import get_backend, format_issue, format_issue_list, get_user_input + + +@click.group() +def issue_group(): + """Issue management commands.""" + pass + + +@issue_group.command('list') +@click.option('--state', type=click.Choice(['open', 'closed', 'all']), default='open', help='Issue state filter') +@click.option('--assignee', help='Filter by assignee') +@click.option('--label', multiple=True, help='Filter by labels') +@click.option('--milestone', help='Filter by milestone') +@click.option('--search', help='Search in title and description') +@click.option('--limit', type=int, default=30, help='Maximum number of issues to show') +@click.option('--format', 'output_format', type=click.Choice(['table', 'json', 'compact']), default='table', help='Output format') +@click.pass_context +def list_issues(ctx, state, assignee, label, milestone, search, limit, output_format): + """List issues with optional filtering.""" + backend = get_backend(ctx) + + # Build filter criteria + filter_criteria = IssueFilter( + state=None if state == 'all' else state, + assignee=assignee, + labels=list(label) if label else None, + milestone=milestone, + search=search, + limit=limit + ) + + try: + issues = backend.list_issues(filter_criteria) + + if not issues: + click.echo("No issues found.") + return + + if output_format == 'json': + import json + click.echo(json.dumps([issue.to_dict() for issue in issues], indent=2)) + elif output_format == 'compact': + for issue in issues: + labels_str = ', '.join(label.name for label in issue.labels[:3]) + if len(issue.labels) > 3: + labels_str += f' (+{len(issue.labels) - 3} more)' + + assignee_str = issue.primary_assignee.username if issue.primary_assignee else 'unassigned' + + click.echo(f"#{issue.number:4d} {issue.state.value:10s} {issue.title[:50]:50s} {assignee_str:15s} {labels_str}") + else: # table format + click.echo(format_issue_list(issues)) + + except Exception as e: + raise click.ClickException(f"Failed to list issues: {e}") + + +@issue_group.command('show') +@click.argument('issue_number', type=int) +@click.option('--comments', is_flag=True, help='Show comments') +@click.option('--format', 'output_format', type=click.Choice(['detailed', 'json', 'compact']), default='detailed', help='Output format') +@click.pass_context +def show_issue(ctx, issue_number, comments, output_format): + """Show detailed information about an issue.""" + backend = get_backend(ctx) + + try: + issue = backend.get_issue_by_number(issue_number) + if not issue: + raise click.ClickException(f"Issue #{issue_number} not found") + + if output_format == 'json': + import json + issue_dict = issue.to_dict() + if comments: + issue_dict['comments'] = [ + { + 'id': c.id, + 'body': c.body, + 'author': c.author.username, + 'created_at': c.created_at.isoformat() + } + for c in backend.get_comments(issue.id) + ] + click.echo(json.dumps(issue_dict, indent=2)) + else: + click.echo(format_issue(issue, show_comments=comments, backend=backend if comments else None)) + + except Exception as e: + raise click.ClickException(f"Failed to show issue: {e}") + + +@issue_group.command('create') +@click.argument('title') +@click.option('--description', '-d', help='Issue description') +@click.option('--label', '-l', multiple=True, help='Labels to add') +@click.option('--assignee', '-a', help='Assign to user') +@click.option('--milestone', '-m', help='Milestone') +@click.option('--priority', type=click.Choice(['low', 'medium', 'high', 'critical']), help='Issue priority') +@click.option('--type', 'issue_type', type=click.Choice(['bug', 'feature', 'enhancement', 'task', 'documentation', 'question']), help='Issue type') +@click.option('--interactive', '-i', is_flag=True, help='Interactive mode') +@click.pass_context +def create_issue(ctx, title, description, label, assignee, milestone, priority, issue_type, interactive): + """Create a new issue.""" + backend = get_backend(ctx) + + try: + # Interactive mode + if interactive: + title = title or click.prompt('Title') + description = get_user_input('Description (optional)', multiline=True) + + # Show available labels + available_labels = backend.get_labels() + if available_labels: + click.echo(f"\nAvailable labels: {', '.join(l.name for l in available_labels)}") + label = click.prompt('Labels (comma-separated, optional)', default='').split(',') if click.prompt('Add labels?', type=bool, default=False) else [] + + # Show available users + available_users = backend.get_users() + if available_users: + click.echo(f"\nAvailable users: {', '.join(u.username for u in available_users)}") + assignee = click.prompt('Assignee (optional)', default='') or None + + # Show available milestones + available_milestones = backend.get_milestones() + if available_milestones: + click.echo(f"\nAvailable milestones: {', '.join(m.title for m in available_milestones)}") + milestone = click.prompt('Milestone (optional)', default='') or None + + # Build labels list + labels = [] + + # Add explicit labels + for label_name in label: + label_name = label_name.strip() + if label_name: + labels.append(Label(name=label_name)) + + # Add priority label + if priority: + labels.append(Label(name=f'priority:{priority}')) + + # Add type label + if issue_type: + labels.append(Label(name=issue_type)) + + # Build assignees list + assignees = [] + if assignee: + # Try to find user + users = backend.search_users(assignee) + if users: + assignees.append(users[0]) + else: + # Create a basic user object + assignees.append(User(id=assignee, username=assignee)) + + # Find milestone + milestone_obj = None + if milestone: + milestones = backend.get_milestones() + for m in milestones: + if m.title == milestone or m.id == milestone: + milestone_obj = m + break + + # Create issue + now = datetime.now(timezone.utc) + issue = Issue( + id="", # Will be set by backend + number=0, # Will be set by backend + title=title, + description=description or "", + state=IssueState.OPEN, + created_at=now, + updated_at=now, + labels=labels, + assignees=assignees, + milestone=milestone_obj + ) + + created_issue = backend.create_issue(issue) + click.echo(f"Created issue #{created_issue.number}: {created_issue.title}") + + if ctx.obj.get('verbose'): + click.echo(format_issue(created_issue)) + + except Exception as e: + raise click.ClickException(f"Failed to create issue: {e}") + + +@issue_group.command('edit') +@click.argument('issue_number', type=int) +@click.option('--title', help='New title') +@click.option('--description', help='New description') +@click.option('--add-label', multiple=True, help='Labels to add') +@click.option('--remove-label', multiple=True, help='Labels to remove') +@click.option('--assign', help='User to assign') +@click.option('--unassign', help='User to unassign') +@click.option('--milestone', help='Milestone to set') +@click.option('--interactive', '-i', is_flag=True, help='Interactive editing') +@click.pass_context +def edit_issue(ctx, issue_number, title, description, add_label, remove_label, assign, unassign, milestone, interactive): + """Edit an existing issue.""" + backend = get_backend(ctx) + + try: + issue = backend.get_issue_by_number(issue_number) + if not issue: + raise click.ClickException(f"Issue #{issue_number} not found") + + # Interactive mode + if interactive: + click.echo(f"Editing issue #{issue.number}: {issue.title}") + + new_title = click.prompt('Title', default=issue.title) + if new_title != issue.title: + title = new_title + + new_description = get_user_input('Description', default=issue.description, multiline=True) + if new_description != issue.description: + description = new_description + + # Apply changes + modified = False + + if title: + issue.title = title + modified = True + + if description is not None: + issue.description = description + modified = True + + # Add labels + for label_name in add_label: + issue.add_label(Label(name=label_name.strip())) + modified = True + + # Remove labels + for label_name in remove_label: + if issue.remove_label(label_name.strip()): + modified = True + + # Assign user + if assign: + users = backend.search_users(assign) + if users: + issue.add_assignee(users[0]) + modified = True + else: + issue.add_assignee(User(id=assign, username=assign)) + modified = True + + # Unassign user + if unassign: + if issue.remove_assignee(unassign): + modified = True + + # Set milestone + if milestone: + milestones = backend.get_milestones() + for m in milestones: + if m.title == milestone or m.id == milestone: + issue.milestone = m + modified = True + break + + if modified: + issue.updated_at = datetime.now(timezone.utc) + updated_issue = backend.update_issue(issue) + click.echo(f"Updated issue #{updated_issue.number}") + + if ctx.obj.get('verbose'): + click.echo(format_issue(updated_issue)) + else: + click.echo("No changes made") + + except Exception as e: + raise click.ClickException(f"Failed to edit issue: {e}") + + +@issue_group.command('close') +@click.argument('issue_number', type=int) +@click.option('--comment', '-c', help='Closing comment') +@click.pass_context +def close_issue(ctx, issue_number, comment): + """Close an issue.""" + backend = get_backend(ctx) + + try: + issue = backend.get_issue_by_number(issue_number) + if not issue: + raise click.ClickException(f"Issue #{issue_number} not found") + + if issue.state == IssueState.CLOSED: + click.echo(f"Issue #{issue_number} is already closed") + return + + # Close the issue + issue.close() + + # Add closing comment if provided + if comment: + from ..core.models import Comment + current_user = User(id="cli-user", username="cli-user") # TODO: Get actual user + closing_comment = Comment( + id="", + body=comment, + author=current_user, + created_at=datetime.now(timezone.utc) + ) + backend.add_comment(issue.id, closing_comment) + + updated_issue = backend.update_issue(issue) + click.echo(f"Closed issue #{updated_issue.number}: {updated_issue.title}") + + except Exception as e: + raise click.ClickException(f"Failed to close issue: {e}") + + +@issue_group.command('reopen') +@click.argument('issue_number', type=int) +@click.option('--comment', '-c', help='Reopening comment') +@click.pass_context +def reopen_issue(ctx, issue_number, comment): + """Reopen a closed issue.""" + backend = get_backend(ctx) + + try: + issue = backend.get_issue_by_number(issue_number) + if not issue: + raise click.ClickException(f"Issue #{issue_number} not found") + + if issue.state != IssueState.CLOSED: + click.echo(f"Issue #{issue_number} is not closed (current state: {issue.state.value})") + return + + # Reopen the issue + issue.reopen() + + # Add reopening comment if provided + if comment: + from ..core.models import Comment + current_user = User(id="cli-user", username="cli-user") # TODO: Get actual user + reopening_comment = Comment( + id="", + body=comment, + author=current_user, + created_at=datetime.now(timezone.utc) + ) + backend.add_comment(issue.id, reopening_comment) + + updated_issue = backend.update_issue(issue) + click.echo(f"Reopened issue #{updated_issue.number}: {updated_issue.title}") + + except Exception as e: + raise click.ClickException(f"Failed to reopen issue: {e}") + + +@issue_group.command('comment') +@click.argument('issue_number', type=int) +@click.argument('comment_text', required=False) +@click.option('--editor', is_flag=True, help='Open editor for comment') +@click.pass_context +def add_comment(ctx, issue_number, comment_text, editor): + """Add a comment to an issue.""" + backend = get_backend(ctx) + + try: + issue = backend.get_issue_by_number(issue_number) + if not issue: + raise click.ClickException(f"Issue #{issue_number} not found") + + # Get comment text + if editor: + comment_text = click.edit() or "" + elif not comment_text: + comment_text = get_user_input("Comment", multiline=True) + + if not comment_text.strip(): + click.echo("Empty comment, aborting") + return + + # Create comment + from ..core.models import Comment + current_user = User(id="cli-user", username="cli-user") # TODO: Get actual user + comment = Comment( + id="", + body=comment_text.strip(), + author=current_user, + created_at=datetime.now(timezone.utc) + ) + + added_comment = backend.add_comment(issue.id, comment) + click.echo(f"Added comment to issue #{issue_number}") + + if ctx.obj.get('verbose'): + click.echo(f"\nComment by {added_comment.author.username} at {added_comment.created_at}:") + click.echo(added_comment.body) + + except Exception as e: + raise click.ClickException(f"Failed to add comment: {e}") \ No newline at end of file diff --git a/cli/main.py b/cli/main.py new file mode 100644 index 0000000..567e1a4 --- /dev/null +++ b/cli/main.py @@ -0,0 +1,117 @@ +""" +Main CLI Entry Point + +Universal Issue Tracking System CLI +""" + +import click +import sys +from pathlib import Path + +from .commands import issue_group +from .backend_commands import backend_group +from .sync_commands import sync_group + + +@click.group() +@click.version_option() +@click.option('--config', type=click.Path(), help='Configuration file path') +@click.option('--backend', help='Backend to use (local, gitea)') +@click.option('--verbose', '-v', is_flag=True, help='Verbose output') +@click.pass_context +def cli(ctx, config, backend, verbose): + """ + Universal Issue Tracking System + + A backend-agnostic issue tracking tool that works with local SQLite, + Gitea, GitHub, and other issue tracking systems. + + Examples: + issue list # List all issues + issue create "Bug in parser" # Create new issue + issue show 42 # Show issue #42 + issue close 42 # Close issue #42 + + backend add local ~/.issues # Add local backend + backend add gitea myrepo # Add Gitea backend + + sync pull gitea # Sync from Gitea + sync push gitea # Sync to Gitea + """ + # Ensure the object exists + ctx.ensure_object(dict) + + # Store global options in context + ctx.obj['config_path'] = config + ctx.obj['backend'] = backend + ctx.obj['verbose'] = verbose + + +# Register command groups +cli.add_command(issue_group, name='issue') +cli.add_command(backend_group, name='backend') +cli.add_command(sync_group, name='sync') + + +# Convenience aliases - direct issue commands +@cli.command('list') +@click.pass_context +def list_issues(ctx): + """List all issues (alias for 'issue list').""" + ctx.invoke(issue_group.get_command(ctx, 'list')) + + +@cli.command('show') +@click.argument('issue_number', type=int) +@click.pass_context +def show_issue(ctx, issue_number): + """Show issue details (alias for 'issue show').""" + ctx.invoke(issue_group.get_command(ctx, 'show'), issue_number=issue_number) + + +@cli.command('create') +@click.argument('title') +@click.option('--description', '-d', help='Issue description') +@click.option('--label', '-l', multiple=True, help='Labels to add') +@click.option('--assignee', '-a', help='Assign to user') +@click.option('--milestone', '-m', help='Milestone') +@click.pass_context +def create_issue(ctx, title, description, label, assignee, milestone): + """Create new issue (alias for 'issue create').""" + ctx.invoke( + issue_group.get_command(ctx, 'create'), + title=title, + description=description, + label=label, + assignee=assignee, + milestone=milestone + ) + + +@cli.command('close') +@click.argument('issue_number', type=int) +@click.option('--comment', '-c', help='Closing comment') +@click.pass_context +def close_issue(ctx, issue_number, comment): + """Close issue (alias for 'issue close').""" + ctx.invoke( + issue_group.get_command(ctx, 'close'), + issue_number=issue_number, + comment=comment + ) + + +def main(): + """Main entry point for the CLI.""" + try: + cli(obj={}) + except KeyboardInterrupt: + click.echo("\nAborted by user", err=True) + sys.exit(1) + except Exception as e: + click.echo(f"Error: {e}", err=True) + sys.exit(1) + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/cli/sync_commands.py b/cli/sync_commands.py new file mode 100644 index 0000000..11d8a54 --- /dev/null +++ b/cli/sync_commands.py @@ -0,0 +1,235 @@ +""" +Synchronization CLI Commands + +Commands for synchronizing issues between different backends. +""" + +import click +from datetime import datetime, timezone +from .utils import ( + load_backend_configs, get_backend, echo_success, echo_error, + echo_warning, echo_info, progress_bar, confirm_action +) +from ..core.interfaces import BackendFactory + + +@click.group() +def sync_group(): + """Issue synchronization between backends.""" + pass + + +@sync_group.command('status') +@click.argument('backend_name', required=False) +@click.pass_context +def sync_status(ctx, backend_name): + """Show sync status for backends.""" + configs = load_backend_configs() + + if backend_name: + backends_to_check = [backend_name] if backend_name in configs else [] + if not backends_to_check: + echo_error(f"Backend '{backend_name}' not found") + return + else: + backends_to_check = [k for k in configs.keys() if k != 'default'] + + for name in backends_to_check: + config = configs[name] + try: + backend = BackendFactory.create_backend(config['type']) + backend.connect(config) + + # Get basic stats + all_issues = backend.list_issues() + open_issues = [i for i in all_issues if i.state.value != 'closed'] + + click.echo(f"\n{name} ({config['type']}):") + click.echo(f" Total issues: {len(all_issues)}") + click.echo(f" Open issues: {len(open_issues)}") + click.echo(f" Closed issues: {len(all_issues) - len(open_issues)}") + + # Check last sync + if hasattr(backend, 'get_last_sync_timestamp'): + last_sync = backend.get_last_sync_timestamp() + if last_sync: + click.echo(f" Last sync: {last_sync}") + else: + click.echo(f" Last sync: never") + + backend.disconnect() + + except Exception as e: + echo_error(f" Error accessing {name}: {e}") + + +@sync_group.command('pull') +@click.argument('source_backend') +@click.option('--target', default='local', help='Target backend (default: local)') +@click.option('--dry-run', is_flag=True, help='Show what would be synced without making changes') +@click.option('--force', is_flag=True, help='Force sync even with conflicts') +@click.pass_context +def sync_pull(ctx, source_backend, target, dry_run, force): + """Pull issues from source backend to target backend.""" + configs = load_backend_configs() + + if source_backend not in configs: + echo_error(f"Source backend '{source_backend}' not found") + return + + if target not in configs: + echo_error(f"Target backend '{target}' not found") + return + + if source_backend == target: + echo_error("Source and target backends cannot be the same") + return + + try: + # Connect to backends + source_config = configs[source_backend] + target_config = configs[target] + + source = BackendFactory.create_backend(source_config['type']) + source.connect(source_config) + + target = BackendFactory.create_backend(target_config['type']) + target.connect(target_config) + + echo_info(f"Syncing from {source_backend} to {target}") + + # Get issues from source + source_issues = source.list_issues() + echo_info(f"Found {len(source_issues)} issues in source") + + # Get existing issues in target + target_issues = target.list_issues() + target_numbers = {issue.number for issue in target_issues} + + # Determine what needs to be synced + new_issues = [] + updated_issues = [] + + for issue in source_issues: + if issue.number not in target_numbers: + new_issues.append(issue) + else: + # Check if update is needed (simplified check by updated_at) + target_issue = next((i for i in target_issues if i.number == issue.number), None) + if target_issue and issue.updated_at > target_issue.updated_at: + updated_issues.append(issue) + + echo_info(f"New issues to sync: {len(new_issues)}") + echo_info(f"Updated issues to sync: {len(updated_issues)}") + + if dry_run: + if new_issues: + click.echo("\nNew issues:") + for issue in new_issues: + click.echo(f" #{issue.number}: {issue.title}") + + if updated_issues: + click.echo("\nUpdated issues:") + for issue in updated_issues: + click.echo(f" #{issue.number}: {issue.title}") + + click.echo(f"\nDry run complete. {len(new_issues + updated_issues)} issues would be synced.") + return + + # Confirm sync + total_sync = len(new_issues) + len(updated_issues) + if total_sync == 0: + echo_success("No issues need syncing") + return + + if not force and not confirm_action(f"Sync {total_sync} issues?"): + click.echo("Aborted") + return + + # Perform sync + synced_count = 0 + errors = [] + + all_to_sync = new_issues + updated_issues + with progress_bar(all_to_sync, label="Syncing issues") as items: + for issue in items: + try: + # Clear backend-specific IDs for new backend + issue.backend_id = None + issue.backend_type = target_config['type'] + + if issue in new_issues: + target.create_issue(issue) + else: + # For updates, we need to find the target issue and update it + target_issue = target.get_issue_by_number(issue.number) + if target_issue: + # Copy relevant fields + target_issue.title = issue.title + target_issue.description = issue.description + target_issue.state = issue.state + target_issue.labels = issue.labels + target_issue.assignees = issue.assignees + target_issue.milestone = issue.milestone + target_issue.updated_at = issue.updated_at + target_issue.closed_at = issue.closed_at + target.update_issue(target_issue) + + synced_count += 1 + + except Exception as e: + errors.append(f"Issue #{issue.number}: {e}") + + # Report results + echo_success(f"Synced {synced_count} issues successfully") + + if errors: + echo_warning(f"{len(errors)} errors occurred:") + for error in errors[:5]: # Show first 5 errors + echo_error(f" {error}") + if len(errors) > 5: + echo_warning(f" ... and {len(errors) - 5} more errors") + + # Cleanup + source.disconnect() + target.disconnect() + + except Exception as e: + echo_error(f"Sync failed: {e}") + + +@sync_group.command('push') +@click.argument('target_backend') +@click.option('--source', default='local', help='Source backend (default: local)') +@click.option('--dry-run', is_flag=True, help='Show what would be synced without making changes') +@click.option('--force', is_flag=True, help='Force sync even with conflicts') +@click.pass_context +def sync_push(ctx, target_backend, source, dry_run, force): + """Push issues from source backend to target backend.""" + # This is essentially the same as pull but with arguments swapped + ctx.invoke(sync_pull, source_backend=source, target=target_backend, dry_run=dry_run, force=force) + + +@sync_group.command('bidirectional') +@click.argument('backend1') +@click.argument('backend2') +@click.option('--dry-run', is_flag=True, help='Show what would be synced without making changes') +@click.option('--force', is_flag=True, help='Force sync even with conflicts') +@click.pass_context +def sync_bidirectional(ctx, backend1, backend2, dry_run, force): + """Bidirectional sync between two backends.""" + echo_warning("Bidirectional sync is a complex operation that can cause conflicts.") + + if not force and not confirm_action("Continue with bidirectional sync?"): + click.echo("Aborted") + return + + # First sync backend1 -> backend2 + echo_info(f"Step 1: Syncing {backend1} -> {backend2}") + ctx.invoke(sync_pull, source_backend=backend1, target=backend2, dry_run=dry_run, force=True) + + # Then sync backend2 -> backend1 + echo_info(f"Step 2: Syncing {backend2} -> {backend1}") + ctx.invoke(sync_pull, source_backend=backend2, target=backend1, dry_run=dry_run, force=True) + + echo_success("Bidirectional sync completed") \ No newline at end of file diff --git a/cli/utils.py b/cli/utils.py new file mode 100644 index 0000000..32182cd --- /dev/null +++ b/cli/utils.py @@ -0,0 +1,336 @@ +""" +CLI Utility Functions + +Helper functions for the CLI commands including formatting, configuration, +backend management, and user interaction. +""" + +import click +import os +import sys +from datetime import datetime +from pathlib import Path +from typing import Optional, List +import tempfile + +from ..core.interfaces import IssueBackend, BackendFactory +from ..core.models import Issue, Comment +from ..backends.local import LocalSQLiteBackend +from ..backends.gitea import GiteaBackend + + +# Register available backends +BackendFactory.register_backend('local', LocalSQLiteBackend) +BackendFactory.register_backend('gitea', GiteaBackend) + + +def get_config_dir() -> Path: + """Get configuration directory.""" + config_dir = Path.home() / '.config' / 'issue-tracker' + config_dir.mkdir(parents=True, exist_ok=True) + return config_dir + + +def get_backend_config_path() -> Path: + """Get backend configuration file path.""" + return get_config_dir() / 'backends.json' + + +def load_backend_configs() -> dict: + """Load backend configurations.""" + config_path = get_backend_config_path() + if not config_path.exists(): + return {} + + import json + try: + with open(config_path, 'r') as f: + return json.load(f) + except (json.JSONDecodeError, IOError): + return {} + + +def save_backend_configs(configs: dict) -> None: + """Save backend configurations.""" + config_path = get_backend_config_path() + import json + with open(config_path, 'w') as f: + json.dump(configs, f, indent=2) + + +def get_default_backend() -> str: + """Get the default backend name.""" + configs = load_backend_configs() + return configs.get('default', 'local') + + +def get_backend(ctx) -> IssueBackend: + """Get backend instance from context.""" + backend_name = ctx.obj.get('backend') or get_default_backend() + configs = load_backend_configs() + + if backend_name not in configs: + if backend_name == 'local': + # Auto-configure local backend + local_config = { + 'type': 'local', + 'db_path': str(get_config_dir() / 'issues.db') + } + configs['local'] = local_config + save_backend_configs(configs) + else: + raise click.ClickException(f"Backend '{backend_name}' not configured. Use 'backend add' to configure it.") + + backend_config = configs[backend_name] + backend_type = backend_config['type'] + + try: + backend = BackendFactory.create_backend(backend_type) + backend.connect(backend_config) + return backend + except Exception as e: + raise click.ClickException(f"Failed to connect to backend '{backend_name}': {e}") + + +def format_issue_list(issues: List[Issue]) -> str: + """Format list of issues as a table.""" + if not issues: + return "No issues found." + + # Calculate column widths + max_title_width = min(50, max(len(issue.title) for issue in issues)) + max_assignee_width = 15 + + # Header + lines = [] + header = f"{'#':<6} {'State':<12} {'Title':<{max_title_width}} {'Assignee':<{max_assignee_width}} Labels" + lines.append(header) + lines.append("-" * len(header)) + + # Issues + for issue in issues: + title = issue.title[:max_title_width] + if len(issue.title) > max_title_width: + title = title[:-3] + "..." + + assignee = issue.primary_assignee.username if issue.primary_assignee else "unassigned" + assignee = assignee[:max_assignee_width] + + labels = ", ".join(label.name for label in issue.labels[:3]) + if len(issue.labels) > 3: + labels += f" (+{len(issue.labels) - 3})" + + line = f"{issue.number:<6} {issue.state.value:<12} {title:<{max_title_width}} {assignee:<{max_assignee_width}} {labels}" + lines.append(line) + + return "\n".join(lines) + + +def format_issue(issue: Issue, show_comments: bool = False, backend: Optional[IssueBackend] = None) -> str: + """Format a single issue with details.""" + lines = [] + + # Header + lines.append(f"#{issue.number}: {issue.title}") + lines.append("=" * (len(f"#{issue.number}: {issue.title}"))) + lines.append("") + + # Basic info + lines.append(f"State: {issue.state.value}") + lines.append(f"Created: {format_datetime(issue.created_at)}") + lines.append(f"Updated: {format_datetime(issue.updated_at)}") + + if issue.closed_at: + lines.append(f"Closed: {format_datetime(issue.closed_at)}") + + # Assignees + if issue.assignees: + assignees_str = ", ".join(assignee.username for assignee in issue.assignees) + lines.append(f"Assignees: {assignees_str}") + else: + lines.append("Assignees: none") + + # Milestone + if issue.milestone: + lines.append(f"Milestone: {issue.milestone.title}") + + # Labels + if issue.labels: + labels_by_category = {} + for label in issue.labels: + category = label.category + if category not in labels_by_category: + labels_by_category[category] = [] + labels_by_category[category].append(label.name) + + for category, label_names in labels_by_category.items(): + lines.append(f"{category.title()} labels: {', '.join(label_names)}") + else: + lines.append("Labels: none") + + # Description + lines.append("") + lines.append("Description:") + lines.append("-" * 12) + if issue.description: + lines.append(issue.description) + else: + lines.append("(no description)") + + # Comments + if show_comments and backend: + comments = backend.get_comments(issue.id) + if comments: + lines.append("") + lines.append(f"Comments ({len(comments)}):") + lines.append("-" * 20) + + for comment in comments: + lines.append("") + lines.append(f"Comment by {comment.author.username} at {format_datetime(comment.created_at)}:") + lines.append(comment.body) + + return "\n".join(lines) + + +def format_datetime(dt: datetime) -> str: + """Format datetime for display.""" + if dt.tzinfo: + dt = dt.astimezone() + return dt.strftime("%Y-%m-%d %H:%M:%S") + + +def get_user_input(prompt: str, default: str = "", multiline: bool = False) -> str: + """Get user input with optional default and multiline support.""" + if multiline: + click.echo(f"{prompt} (Press Ctrl+D when done, Ctrl+C to cancel):") + if default: + click.echo(f"Current value:\n{default}\n") + + lines = [] + try: + while True: + try: + line = input() + lines.append(line) + except EOFError: + break + except KeyboardInterrupt: + raise click.Abort() + + return "\n".join(lines) if lines else default + else: + return click.prompt(prompt, default=default) + + +def validate_backend_type(backend_type: str) -> bool: + """Validate that a backend type is supported.""" + return backend_type in BackendFactory.get_available_backends() + + +def test_backend_connection(backend_config: dict) -> bool: + """Test if a backend configuration works.""" + try: + backend_type = backend_config['type'] + backend = BackendFactory.create_backend(backend_type) + backend.connect(backend_config) + result = backend.test_connection() + backend.disconnect() + return result + except Exception: + return False + + +def format_backend_list(configs: dict) -> str: + """Format backend configurations for display.""" + if not configs: + return "No backends configured." + + lines = [] + default_backend = configs.get('default', 'local') + + lines.append(f"{'Name':<15} {'Type':<10} {'Status':<10} Description") + lines.append("-" * 60) + + for name, config in configs.items(): + if name == 'default': + continue + + backend_type = config.get('type', 'unknown') + is_default = name == default_backend + + # Test connection + try: + status = "connected" if test_backend_connection(config) else "error" + except Exception: + status = "error" + + # Description + if backend_type == 'local': + desc = f"Local SQLite: {config.get('db_path', 'unknown')}" + elif backend_type == 'gitea': + desc = f"Gitea: {config.get('base_url', 'unknown')}/{config.get('owner', 'unknown')}/{config.get('repo', 'unknown')}" + else: + desc = f"{backend_type} backend" + + if is_default: + desc += " (default)" + + line = f"{name:<15} {backend_type:<10} {status:<10} {desc}" + lines.append(line) + + return "\n".join(lines) + + +def get_editor() -> str: + """Get the user's preferred editor.""" + return os.environ.get('EDITOR', 'nano') + + +def edit_text(initial_text: str = "") -> Optional[str]: + """Open text in editor and return edited content.""" + with tempfile.NamedTemporaryFile(mode='w+', suffix='.md', delete=False) as f: + f.write(initial_text) + temp_path = f.name + + try: + editor = get_editor() + os.system(f'{editor} {temp_path}') + + with open(temp_path, 'r') as f: + edited_text = f.read() + + return edited_text if edited_text != initial_text else None + + finally: + os.unlink(temp_path) + + +def confirm_action(message: str, default: bool = False) -> bool: + """Ask user for confirmation.""" + return click.confirm(message, default=default) + + +def progress_bar(items, label: str = "Processing"): + """Create a progress bar for iterating over items.""" + return click.progressbar(items, label=label) + + +def echo_success(message: str) -> None: + """Echo success message in green.""" + click.echo(click.style(message, fg='green')) + + +def echo_warning(message: str) -> None: + """Echo warning message in yellow.""" + click.echo(click.style(message, fg='yellow')) + + +def echo_error(message: str) -> None: + """Echo error message in red.""" + click.echo(click.style(message, fg='red')) + + +def echo_info(message: str) -> None: + """Echo info message in blue.""" + click.echo(click.style(message, fg='blue')) \ No newline at end of file diff --git a/core/interfaces.py b/core/interfaces.py new file mode 100644 index 0000000..98118f4 --- /dev/null +++ b/core/interfaces.py @@ -0,0 +1,407 @@ +""" +Backend Plugin Interfaces + +Defines the contracts that all issue tracking backend plugins must implement. +This enables a clean plugin architecture where new backends can be added +without modifying core code. +""" + +from abc import ABC, abstractmethod +from typing import List, Optional, Dict, Any, Iterator +from datetime import datetime + +from .models import Issue, Label, User, Milestone, Comment + + +class IssueFilter: + """Filter criteria for issue queries.""" + + def __init__( + self, + state: Optional[str] = None, + assignee: Optional[str] = None, + labels: Optional[List[str]] = None, + milestone: Optional[str] = None, + created_after: Optional[datetime] = None, + created_before: Optional[datetime] = None, + updated_after: Optional[datetime] = None, + updated_before: Optional[datetime] = None, + search: Optional[str] = None, + limit: Optional[int] = None, + offset: Optional[int] = 0 + ): + self.state = state + self.assignee = assignee + self.labels = labels or [] + self.milestone = milestone + self.created_after = created_after + self.created_before = created_before + self.updated_after = updated_after + self.updated_before = updated_before + self.search = search + self.limit = limit + self.offset = offset + + +class BackendCapabilities: + """Describes what features a backend supports.""" + + def __init__( + self, + supports_milestones: bool = True, + supports_assignees: bool = True, + supports_comments: bool = True, + supports_labels: bool = True, + supports_search: bool = True, + supports_bulk_operations: bool = False, + supports_webhooks: bool = False, + supports_real_time: bool = False, + max_labels_per_issue: Optional[int] = None, + max_assignees_per_issue: Optional[int] = None + ): + self.supports_milestones = supports_milestones + self.supports_assignees = supports_assignees + self.supports_comments = supports_comments + self.supports_labels = supports_labels + self.supports_search = supports_search + self.supports_bulk_operations = supports_bulk_operations + self.supports_webhooks = supports_webhooks + self.supports_real_time = supports_real_time + self.max_labels_per_issue = max_labels_per_issue + self.max_assignees_per_issue = max_assignees_per_issue + + +class IssueBackend(ABC): + """ + Abstract base class for all issue tracking backends. + + Each backend plugin must implement this interface to provide + issue tracking functionality for a specific system. + """ + + @property + @abstractmethod + def backend_type(self) -> str: + """Return the backend type identifier (e.g., 'local', 'gitea', 'github').""" + pass + + @property + @abstractmethod + def capabilities(self) -> BackendCapabilities: + """Return the capabilities supported by this backend.""" + pass + + @abstractmethod + def connect(self, config: Dict[str, Any]) -> None: + """ + Connect to the backend using provided configuration. + + Args: + config: Backend-specific configuration (URLs, tokens, etc.) + """ + pass + + @abstractmethod + def disconnect(self) -> None: + """Disconnect from the backend and clean up resources.""" + pass + + @abstractmethod + def test_connection(self) -> bool: + """Test if the backend connection is working.""" + pass + + # Issue CRUD Operations + @abstractmethod + def create_issue(self, issue: Issue) -> Issue: + """ + Create a new issue in the backend. + + Args: + issue: Issue to create (id may be None for new issues) + + Returns: + Created issue with backend_id populated + """ + pass + + @abstractmethod + def get_issue(self, issue_id: str) -> Optional[Issue]: + """ + Retrieve an issue by its backend ID. + + Args: + issue_id: Backend-specific issue ID + + Returns: + Issue if found, None otherwise + """ + pass + + @abstractmethod + def get_issue_by_number(self, number: int) -> Optional[Issue]: + """ + Retrieve an issue by its human-readable number. + + Args: + number: Issue number + + Returns: + Issue if found, None otherwise + """ + pass + + @abstractmethod + def update_issue(self, issue: Issue) -> Issue: + """ + Update an existing issue in the backend. + + Args: + issue: Issue with modifications + + Returns: + Updated issue + """ + pass + + @abstractmethod + def delete_issue(self, issue_id: str) -> bool: + """ + Delete an issue from the backend. + + Args: + issue_id: Backend-specific issue ID + + Returns: + True if deleted successfully + """ + pass + + @abstractmethod + def list_issues(self, filter_criteria: Optional[IssueFilter] = None) -> List[Issue]: + """ + List issues matching filter criteria. + + Args: + filter_criteria: Optional filter to apply + + Returns: + List of matching issues + """ + pass + + @abstractmethod + def search_issues(self, query: str, limit: Optional[int] = None) -> List[Issue]: + """ + Search issues using backend-specific query syntax. + + Args: + query: Search query + limit: Maximum number of results + + Returns: + List of matching issues + """ + pass + + # Label Operations + @abstractmethod + def create_label(self, label: Label) -> Label: + """Create a new label.""" + pass + + @abstractmethod + def get_labels(self) -> List[Label]: + """Get all available labels.""" + pass + + @abstractmethod + def update_label(self, label: Label) -> Label: + """Update an existing label.""" + pass + + @abstractmethod + def delete_label(self, label_name: str) -> bool: + """Delete a label.""" + pass + + # User Operations + @abstractmethod + def get_users(self) -> List[User]: + """Get all available users.""" + pass + + @abstractmethod + def get_user(self, user_id: str) -> Optional[User]: + """Get a specific user by ID.""" + pass + + @abstractmethod + def search_users(self, query: str) -> List[User]: + """Search for users.""" + pass + + # Milestone Operations + @abstractmethod + def create_milestone(self, milestone: Milestone) -> Milestone: + """Create a new milestone.""" + pass + + @abstractmethod + def get_milestones(self) -> List[Milestone]: + """Get all milestones.""" + pass + + @abstractmethod + def update_milestone(self, milestone: Milestone) -> Milestone: + """Update a milestone.""" + pass + + @abstractmethod + def delete_milestone(self, milestone_id: str) -> bool: + """Delete a milestone.""" + pass + + # Comment Operations + @abstractmethod + def add_comment(self, issue_id: str, comment: Comment) -> Comment: + """Add a comment to an issue.""" + pass + + @abstractmethod + def get_comments(self, issue_id: str) -> List[Comment]: + """Get all comments for an issue.""" + pass + + @abstractmethod + def update_comment(self, comment: Comment) -> Comment: + """Update a comment.""" + pass + + @abstractmethod + def delete_comment(self, comment_id: str) -> bool: + """Delete a comment.""" + pass + + # Bulk Operations (optional, depends on capabilities) + def bulk_update_issues(self, updates: List[Dict[str, Any]]) -> List[Issue]: + """ + Bulk update multiple issues. + + Args: + updates: List of update operations + + Returns: + List of updated issues + """ + if not self.capabilities.supports_bulk_operations: + raise NotImplementedError(f"{self.backend_type} backend does not support bulk operations") + + # Default implementation: update one by one + results = [] + for update in updates: + issue_id = update['id'] + issue = self.get_issue(issue_id) + if issue: + # Apply updates to issue + for key, value in update.items(): + if key != 'id' and hasattr(issue, key): + setattr(issue, key, value) + updated_issue = self.update_issue(issue) + results.append(updated_issue) + return results + + # Sync Support + def get_last_sync_timestamp(self) -> Optional[datetime]: + """ + Get the timestamp of the last successful sync. + Used for incremental synchronization. + """ + return None + + def get_issues_modified_since(self, timestamp: datetime) -> List[Issue]: + """ + Get issues modified since a specific timestamp. + Used for incremental synchronization. + """ + filter_criteria = IssueFilter(updated_after=timestamp) + return self.list_issues(filter_criteria) + + +class LocalBackend(IssueBackend): + """ + Marker interface for local backends. + + Local backends store data locally and can work offline. + They serve as the source of truth for synchronization. + """ + pass + + +class RemoteBackend(IssueBackend): + """ + Marker interface for remote backends. + + Remote backends connect to external issue tracking systems. + They participate in bidirectional synchronization. + """ + pass + + +class SyncableBackend(ABC): + """ + Interface for backends that support synchronization. + + Backends implementing this interface can participate in + bidirectional sync operations. + """ + + @abstractmethod + def prepare_for_sync(self) -> None: + """Prepare backend for sync operation (e.g., create backup).""" + pass + + @abstractmethod + def finalize_sync(self, success: bool) -> None: + """Finalize sync operation (e.g., commit or rollback).""" + pass + + @abstractmethod + def get_sync_conflicts(self) -> List[Dict[str, Any]]: + """Get issues that have sync conflicts.""" + pass + + @abstractmethod + def resolve_sync_conflict(self, issue_id: str, resolution: str) -> Issue: + """ + Resolve a sync conflict. + + Args: + issue_id: Issue with conflict + resolution: 'local' or 'remote' or 'merge' + """ + pass + + +class BackendFactory: + """Factory for creating backend instances.""" + + _backends: Dict[str, type] = {} + + @classmethod + def register_backend(cls, backend_type: str, backend_class: type) -> None: + """Register a backend implementation.""" + cls._backends[backend_type] = backend_class + + @classmethod + def create_backend(cls, backend_type: str) -> IssueBackend: + """Create a backend instance.""" + if backend_type not in cls._backends: + raise ValueError(f"Unknown backend type: {backend_type}") + + return cls._backends[backend_type]() + + @classmethod + def get_available_backends(cls) -> List[str]: + """Get list of available backend types.""" + return list(cls._backends.keys()) \ No newline at end of file diff --git a/core/models.py b/core/models.py new file mode 100644 index 0000000..fae45d5 --- /dev/null +++ b/core/models.py @@ -0,0 +1,341 @@ +""" +Core Issue Domain Models + +Unified, backend-agnostic issue models that serve as the single source of truth +for all issue tracking operations. These models combine the best features from +various issue tracking systems while maintaining clean domain logic. +""" + +from dataclasses import dataclass, field +from datetime import datetime, timezone +from enum import Enum +from typing import List, Optional, Dict, Any +from functools import cached_property + + +class IssueState(Enum): + """Universal issue state enumeration with backend mapping support.""" + OPEN = "open" + CLOSED = "closed" + IN_PROGRESS = "in_progress" + BLOCKED = "blocked" + + @classmethod + def from_string(cls, state_str: str) -> 'IssueState': + """Convert string to IssueState, with fallback handling.""" + state_map = { + 'open': cls.OPEN, + 'closed': cls.CLOSED, + 'in_progress': cls.IN_PROGRESS, + 'in-progress': cls.IN_PROGRESS, + 'progress': cls.IN_PROGRESS, + 'blocked': cls.BLOCKED, + } + return state_map.get(state_str.lower(), cls.OPEN) + + def to_backend_string(self, backend_type: str) -> str: + """Convert to backend-specific string representation.""" + if backend_type == 'gitea': + return 'open' if self in [self.OPEN, self.IN_PROGRESS, self.BLOCKED] else 'closed' + elif backend_type == 'github': + return self.value + else: + return self.value + + +class Priority(Enum): + """Universal priority levels.""" + LOW = "low" + MEDIUM = "medium" + HIGH = "high" + CRITICAL = "critical" + + @classmethod + def from_label(cls, label_name: str) -> Optional['Priority']: + """Extract priority from label name.""" + if label_name.startswith('priority:'): + priority_str = label_name.replace('priority:', '') + try: + return cls(priority_str) + except ValueError: + return None + return None + + +class IssueType(Enum): + """Universal issue types.""" + BUG = "bug" + FEATURE = "feature" + ENHANCEMENT = "enhancement" + TASK = "task" + DOCUMENTATION = "documentation" + QUESTION = "question" + + @classmethod + def from_label(cls, label_name: str) -> Optional['IssueType']: + """Extract type from label name.""" + try: + return cls(label_name.lower()) + except ValueError: + return None + + +@dataclass(frozen=True) +class Label: + """Universal label model with backend mapping support.""" + name: str + color: Optional[str] = None + description: Optional[str] = None + backend_id: Optional[str] = None # Backend-specific ID for sync + + @cached_property + def category(self) -> str: + """Categorize label for efficient filtering.""" + if self.name.startswith('priority:'): + return 'priority' + elif self.name.startswith('status:'): + return 'status' + elif self.name.startswith('type:'): + return 'type' + elif self.name in ['bug', 'feature', 'enhancement', 'task', 'documentation', 'question']: + return 'type' + else: + return 'other' + + @cached_property + def priority(self) -> Optional[Priority]: + """Extract priority if this is a priority label.""" + return Priority.from_label(self.name) + + @cached_property + def issue_type(self) -> Optional[IssueType]: + """Extract issue type if this is a type label.""" + return IssueType.from_label(self.name) + + +@dataclass(frozen=True) +class LabelCategories: + """Categorized labels for efficient access.""" + priority_labels: List[Label] + type_labels: List[Label] + status_labels: List[Label] + other_labels: List[Label] + + @cached_property + def priority(self) -> Optional[Priority]: + """Get the issue priority.""" + for label in self.priority_labels: + if label.priority: + return label.priority + return None + + @cached_property + def issue_type(self) -> Optional[IssueType]: + """Get the issue type.""" + for label in self.type_labels: + if label.issue_type: + return label.issue_type + return None + + +@dataclass +class User: + """Universal user model.""" + id: str # String ID to handle different backend ID types + username: str + display_name: Optional[str] = None + email: Optional[str] = None + avatar_url: Optional[str] = None + backend_id: Optional[str] = None # Backend-specific ID for sync + + +@dataclass +class Milestone: + """Universal milestone/project model.""" + id: str + title: str + description: Optional[str] = None + state: str = "open" # open, closed + due_date: Optional[datetime] = None + created_at: Optional[datetime] = None + updated_at: Optional[datetime] = None + backend_id: Optional[str] = None + + +@dataclass +class Comment: + """Universal comment model.""" + id: str + body: str + author: User + created_at: datetime + updated_at: Optional[datetime] = None + backend_id: Optional[str] = None + + +@dataclass +class Issue: + """ + Universal Issue model - single source of truth. + + Combines the best features from domain and API models while maintaining + clean separation between core data and backend-specific details. + """ + # Core Issue Data + id: str # Universal ID (UUID for local, external ID for remotes) + number: int # Human-readable number + title: str + description: str + state: IssueState + + # Metadata + created_at: datetime + updated_at: datetime + closed_at: Optional[datetime] = None + + # Relationships + labels: List[Label] = field(default_factory=list) + assignees: List[User] = field(default_factory=list) + milestone: Optional[Milestone] = None + comments: List[Comment] = field(default_factory=list) + + # Backend Integration + backend_id: Optional[str] = None # Backend-specific ID + backend_type: Optional[str] = None # e.g., 'local', 'gitea', 'github' + sync_metadata: Dict[str, Any] = field(default_factory=dict) + + # Performance Optimization + _label_categories: Optional[LabelCategories] = field(default=None, init=False) + + @cached_property + def label_categories(self) -> LabelCategories: + """Efficiently categorize labels with caching.""" + if self._label_categories is None: + # Single-pass categorization for performance + priority_labels = [] + type_labels = [] + status_labels = [] + other_labels = [] + + for label in self.labels: + if label.category == 'priority': + priority_labels.append(label) + elif label.category == 'type': + type_labels.append(label) + elif label.category == 'status': + status_labels.append(label) + else: + other_labels.append(label) + + self._label_categories = LabelCategories( + priority_labels=priority_labels, + type_labels=type_labels, + status_labels=status_labels, + other_labels=other_labels + ) + return self._label_categories + + @property + def priority(self) -> Optional[Priority]: + """Get issue priority from labels.""" + return self.label_categories.priority + + @property + def issue_type(self) -> Optional[IssueType]: + """Get issue type from labels.""" + return self.label_categories.issue_type + + @property + def primary_assignee(self) -> Optional[User]: + """Get primary assignee (first one).""" + return self.assignees[0] if self.assignees else None + + def invalidate_cache(self) -> None: + """Invalidate cached properties when labels change.""" + if hasattr(self, '_label_categories'): + object.__setattr__(self, '_label_categories', None) + + # Domain Logic Methods + def close(self, closed_at: Optional[datetime] = None) -> None: + """Close the issue with business rule validation.""" + if self.state == IssueState.CLOSED: + raise ValueError(f"Issue #{self.number} is already closed") + + self.state = IssueState.CLOSED + self.closed_at = closed_at or datetime.now(timezone.utc) + self.updated_at = datetime.now(timezone.utc) + + def reopen(self) -> None: + """Reopen the issue with business rule validation.""" + if self.state != IssueState.CLOSED: + raise ValueError(f"Issue #{self.number} is not closed (current state: {self.state.value})") + + self.state = IssueState.OPEN + self.closed_at = None + self.updated_at = datetime.now(timezone.utc) + + def add_label(self, label: Label) -> None: + """Add a label to the issue.""" + if label not in self.labels: + self.labels.append(label) + self.invalidate_cache() + self.updated_at = datetime.now(timezone.utc) + + def remove_label(self, label_name: str) -> bool: + """Remove a label by name. Returns True if removed.""" + original_count = len(self.labels) + self.labels = [label for label in self.labels if label.name != label_name] + if len(self.labels) < original_count: + self.invalidate_cache() + self.updated_at = datetime.now(timezone.utc) + return True + return False + + def has_label(self, label_name: str) -> bool: + """Check if issue has a specific label.""" + return any(label.name == label_name for label in self.labels) + + def add_assignee(self, user: User) -> None: + """Add an assignee to the issue.""" + if user not in self.assignees: + self.assignees.append(user) + self.updated_at = datetime.now(timezone.utc) + + def remove_assignee(self, user_id: str) -> bool: + """Remove an assignee by ID. Returns True if removed.""" + original_count = len(self.assignees) + self.assignees = [user for user in self.assignees if user.id != user_id] + if len(self.assignees) < original_count: + self.updated_at = datetime.now(timezone.utc) + return True + return False + + def add_comment(self, comment: Comment) -> None: + """Add a comment to the issue.""" + self.comments.append(comment) + self.updated_at = datetime.now(timezone.utc) + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary for serialization.""" + return { + 'id': self.id, + 'number': self.number, + 'title': self.title, + 'description': self.description, + 'state': self.state.value, + 'created_at': self.created_at.isoformat(), + 'updated_at': self.updated_at.isoformat(), + 'closed_at': self.closed_at.isoformat() if self.closed_at else None, + 'labels': [{'name': l.name, 'color': l.color, 'description': l.description} for l in self.labels], + 'assignees': [{'id': u.id, 'username': u.username, 'display_name': u.display_name} for u in self.assignees], + 'milestone': {'id': self.milestone.id, 'title': self.milestone.title} if self.milestone else None, + 'backend_id': self.backend_id, + 'backend_type': self.backend_type, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'Issue': + """Create Issue from dictionary.""" + # This would be implemented with proper parsing + # Simplified version for now + raise NotImplementedError("from_dict implementation needed") \ No newline at end of file diff --git a/core/repository.py b/core/repository.py new file mode 100644 index 0000000..447a60e --- /dev/null +++ b/core/repository.py @@ -0,0 +1,454 @@ +""" +Repository Pattern Implementation + +Provides a high-level repository interface that abstracts backend operations +and adds features like caching, transaction support, and business logic. +""" + +from abc import ABC, abstractmethod +from typing import List, Optional, Dict, Any, Union +from datetime import datetime, timezone +import logging + +from .interfaces import IssueBackend, IssueFilter +from .models import Issue, Label, User, Milestone, Comment, IssueState + + +logger = logging.getLogger(__name__) + + +class IssueRepository: + """ + High-level repository for issue operations. + + Provides a clean interface for issue management with additional features + like caching, validation, and business rule enforcement. + """ + + def __init__(self, backend: IssueBackend, enable_caching: bool = True): + self.backend = backend + self.enable_caching = enable_caching + self._cache: Dict[str, Any] = {} + self._cache_timeout = 300 # 5 minutes + + def _cache_key(self, operation: str, *args) -> str: + """Generate cache key for operation.""" + return f"{operation}:{':'.join(str(arg) for arg in args)}" + + def _get_from_cache(self, key: str) -> Optional[Any]: + """Get value from cache if enabled and not expired.""" + if not self.enable_caching or key not in self._cache: + return None + + cached_item = self._cache[key] + if datetime.now(timezone.utc).timestamp() - cached_item['timestamp'] > self._cache_timeout: + del self._cache[key] + return None + + return cached_item['value'] + + def _set_cache(self, key: str, value: Any) -> None: + """Set value in cache if enabled.""" + if self.enable_caching: + self._cache[key] = { + 'value': value, + 'timestamp': datetime.now(timezone.utc).timestamp() + } + + def _invalidate_cache_pattern(self, pattern: str) -> None: + """Invalidate cache entries matching pattern.""" + if not self.enable_caching: + return + + keys_to_remove = [key for key in self._cache.keys() if pattern in key] + for key in keys_to_remove: + del self._cache[key] + + # Issue Operations + def create_issue( + self, + title: str, + description: str = "", + labels: Optional[List[str]] = None, + assignees: Optional[List[str]] = None, + milestone: Optional[str] = None, + issue_type: Optional[str] = None, + priority: Optional[str] = None + ) -> Issue: + """ + Create a new issue with business rule validation. + + Args: + title: Issue title (required) + description: Issue description + labels: List of label names + assignees: List of usernames to assign + milestone: Milestone title or ID + issue_type: Issue type (bug, feature, etc.) + priority: Priority level (low, medium, high, critical) + + Returns: + Created issue + """ + # Validation + if not title.strip(): + raise ValueError("Issue title cannot be empty") + + # Build labels + issue_labels = [] + if labels: + for label_name in labels: + issue_labels.append(Label(name=label_name.strip())) + + # Add type and priority as labels + if issue_type: + issue_labels.append(Label(name=issue_type)) + + if priority: + issue_labels.append(Label(name=f'priority:{priority}')) + + # Resolve assignees + issue_assignees = [] + if assignees: + for username in assignees: + users = self.backend.search_users(username) + if users: + issue_assignees.append(users[0]) + else: + # Create basic user if not found + issue_assignees.append(User(id=username, username=username)) + + # Resolve milestone + issue_milestone = None + if milestone: + milestones = self.backend.get_milestones() + for m in milestones: + if m.title == milestone or m.id == milestone: + issue_milestone = m + break + + # Create issue + now = datetime.now(timezone.utc) + issue = Issue( + id="", # Will be set by backend + number=0, # Will be set by backend + title=title.strip(), + description=description.strip(), + state=IssueState.OPEN, + created_at=now, + updated_at=now, + labels=issue_labels, + assignees=issue_assignees, + milestone=issue_milestone + ) + + created_issue = self.backend.create_issue(issue) + + # Invalidate relevant caches + self._invalidate_cache_pattern("list_issues") + self._invalidate_cache_pattern("search_issues") + + logger.info(f"Created issue #{created_issue.number}: {created_issue.title}") + return created_issue + + def get_issue(self, issue_id: Union[str, int]) -> Optional[Issue]: + """Get issue by ID or number.""" + if isinstance(issue_id, int): + return self.get_issue_by_number(issue_id) + + cache_key = self._cache_key("get_issue", issue_id) + cached_issue = self._get_from_cache(cache_key) + if cached_issue: + return cached_issue + + issue = self.backend.get_issue(str(issue_id)) + if issue: + self._set_cache(cache_key, issue) + + return issue + + def get_issue_by_number(self, number: int) -> Optional[Issue]: + """Get issue by number.""" + cache_key = self._cache_key("get_issue_by_number", number) + cached_issue = self._get_from_cache(cache_key) + if cached_issue: + return cached_issue + + issue = self.backend.get_issue_by_number(number) + if issue: + self._set_cache(cache_key, issue) + + return issue + + def update_issue(self, issue: Issue) -> Issue: + """Update issue with validation.""" + if not issue.title.strip(): + raise ValueError("Issue title cannot be empty") + + issue.updated_at = datetime.now(timezone.utc) + updated_issue = self.backend.update_issue(issue) + + # Invalidate caches + self._invalidate_cache_pattern("get_issue") + self._invalidate_cache_pattern("list_issues") + self._invalidate_cache_pattern("search_issues") + + logger.info(f"Updated issue #{updated_issue.number}: {updated_issue.title}") + return updated_issue + + def close_issue(self, issue_id: Union[str, int], comment: Optional[str] = None) -> Issue: + """Close issue with optional comment.""" + issue = self.get_issue(issue_id) + if not issue: + raise ValueError(f"Issue {issue_id} not found") + + if issue.state == IssueState.CLOSED: + raise ValueError(f"Issue #{issue.number} is already closed") + + issue.close() + + # Add closing comment if provided + if comment: + self.add_comment(issue.id, comment) + + return self.update_issue(issue) + + def reopen_issue(self, issue_id: Union[str, int], comment: Optional[str] = None) -> Issue: + """Reopen issue with optional comment.""" + issue = self.get_issue(issue_id) + if not issue: + raise ValueError(f"Issue {issue_id} not found") + + if issue.state != IssueState.CLOSED: + raise ValueError(f"Issue #{issue.number} is not closed") + + issue.reopen() + + # Add reopening comment if provided + if comment: + self.add_comment(issue.id, comment) + + return self.update_issue(issue) + + def list_issues( + self, + state: Optional[str] = None, + assignee: Optional[str] = None, + labels: Optional[List[str]] = None, + milestone: Optional[str] = None, + search: Optional[str] = None, + limit: Optional[int] = None, + offset: Optional[int] = None + ) -> List[Issue]: + """List issues with filtering and caching.""" + filter_criteria = IssueFilter( + state=state, + assignee=assignee, + labels=labels, + milestone=milestone, + search=search, + limit=limit, + offset=offset + ) + + # Create cache key from filter criteria + cache_key = self._cache_key( + "list_issues", + state or "all", + assignee or "any", + ",".join(labels) if labels else "any", + milestone or "any", + search or "any", + limit or 0, + offset or 0 + ) + + cached_issues = self._get_from_cache(cache_key) + if cached_issues: + return cached_issues + + issues = self.backend.list_issues(filter_criteria) + self._set_cache(cache_key, issues) + + return issues + + def search_issues(self, query: str, limit: Optional[int] = None) -> List[Issue]: + """Search issues with caching.""" + cache_key = self._cache_key("search_issues", query, limit or 0) + cached_issues = self._get_from_cache(cache_key) + if cached_issues: + return cached_issues + + issues = self.backend.search_issues(query, limit) + self._set_cache(cache_key, issues) + + return issues + + # Comment Operations + def add_comment(self, issue_id: str, comment_text: str, author_username: str = "cli-user") -> Comment: + """Add comment to issue.""" + if not comment_text.strip(): + raise ValueError("Comment text cannot be empty") + + # Try to find user + users = self.backend.search_users(author_username) + if users: + author = users[0] + else: + author = User(id=author_username, username=author_username) + + comment = Comment( + id="", + body=comment_text.strip(), + author=author, + created_at=datetime.now(timezone.utc) + ) + + added_comment = self.backend.add_comment(issue_id, comment) + + # Invalidate issue cache + self._invalidate_cache_pattern("get_issue") + + logger.info(f"Added comment to issue {issue_id}") + return added_comment + + def get_comments(self, issue_id: str) -> List[Comment]: + """Get comments for issue.""" + cache_key = self._cache_key("get_comments", issue_id) + cached_comments = self._get_from_cache(cache_key) + if cached_comments: + return cached_comments + + comments = self.backend.get_comments(issue_id) + self._set_cache(cache_key, comments) + + return comments + + # Label Operations + def get_or_create_label(self, name: str, color: Optional[str] = None, description: Optional[str] = None) -> Label: + """Get existing label or create new one.""" + existing_labels = self.backend.get_labels() + for label in existing_labels: + if label.name == name: + return label + + # Create new label + new_label = Label(name=name, color=color, description=description) + return self.backend.create_label(new_label) + + # Statistics and Analytics + def get_issue_stats(self) -> Dict[str, Any]: + """Get issue statistics.""" + cache_key = self._cache_key("get_issue_stats") + cached_stats = self._get_from_cache(cache_key) + if cached_stats: + return cached_stats + + all_issues = self.backend.list_issues() + + stats = { + 'total': len(all_issues), + 'open': len([i for i in all_issues if i.state != IssueState.CLOSED]), + 'closed': len([i for i in all_issues if i.state == IssueState.CLOSED]), + 'by_state': {}, + 'by_priority': {}, + 'by_type': {}, + 'by_assignee': {}, + 'recent_activity': 0 + } + + # Count by state + for issue in all_issues: + state = issue.state.value + stats['by_state'][state] = stats['by_state'].get(state, 0) + 1 + + # Count by priority and type + one_week_ago = datetime.now(timezone.utc).timestamp() - 604800 # 7 days + + for issue in all_issues: + # Priority + priority = issue.priority + if priority: + priority_name = priority.value + stats['by_priority'][priority_name] = stats['by_priority'].get(priority_name, 0) + 1 + + # Type + issue_type = issue.issue_type + if issue_type: + type_name = issue_type.value + stats['by_type'][type_name] = stats['by_type'].get(type_name, 0) + 1 + + # Assignee + if issue.assignees: + for assignee in issue.assignees: + username = assignee.username + stats['by_assignee'][username] = stats['by_assignee'].get(username, 0) + 1 + + # Recent activity + if issue.updated_at.timestamp() > one_week_ago: + stats['recent_activity'] += 1 + + self._set_cache(cache_key, stats) + return stats + + # Bulk Operations + def bulk_close_issues(self, issue_numbers: List[int], comment: Optional[str] = None) -> List[Issue]: + """Close multiple issues.""" + results = [] + for number in issue_numbers: + try: + closed_issue = self.close_issue(number, comment) + results.append(closed_issue) + except Exception as e: + logger.error(f"Failed to close issue #{number}: {e}") + + return results + + def bulk_add_label(self, issue_numbers: List[int], label_name: str) -> List[Issue]: + """Add label to multiple issues.""" + label = self.get_or_create_label(label_name) + results = [] + + for number in issue_numbers: + try: + issue = self.get_issue_by_number(number) + if issue: + issue.add_label(label) + updated_issue = self.update_issue(issue) + results.append(updated_issue) + except Exception as e: + logger.error(f"Failed to add label to issue #{number}: {e}") + + return results + + # Cache Management + def clear_cache(self) -> None: + """Clear all cached data.""" + self._cache.clear() + logger.info("Repository cache cleared") + + def get_cache_stats(self) -> Dict[str, Any]: + """Get cache statistics.""" + if not self.enable_caching: + return {'enabled': False} + + now = datetime.now(timezone.utc).timestamp() + expired_count = 0 + for cached_item in self._cache.values(): + if now - cached_item['timestamp'] > self._cache_timeout: + expired_count += 1 + + return { + 'enabled': True, + 'total_entries': len(self._cache), + 'expired_entries': expired_count, + 'cache_timeout': self._cache_timeout + } + + # Context Manager Support + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if hasattr(self.backend, 'disconnect'): + self.backend.disconnect() \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..72166b2 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,164 @@ +[build-system] +requires = ["setuptools>=45", "wheel", "setuptools-scm[toml]>=6.2"] +build-backend = "setuptools.build_meta" + +[project] +name = "universal-issue-tracker" +description = "Backend-agnostic issue tracking system with plugin architecture" +readme = "README.md" +requires-python = ">=3.8" +license = {text = "MIT"} +authors = [ + {name = "MarkiTect Project", email = "noreply@example.com"}, +] +keywords = ["issue-tracking", "project-management", "cli", "gitea", "github", "jira"] +classifiers = [ + "Development Status :: 4 - Beta", + "Environment :: Console", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Topic :: Software Development :: Bug Tracking", + "Topic :: Software Development :: Libraries :: Python Modules", + "Topic :: Utilities", +] +dependencies = [ + "click>=8.0.0", + "requests>=2.25.0", + "python-dateutil>=2.8.0", +] +dynamic = ["version"] + +[project.optional-dependencies] +dev = [ + "pytest>=6.0", + "pytest-cov>=2.0", + "pytest-mock>=3.0", + "black>=22.0", + "isort>=5.0", + "flake8>=4.0", + "mypy>=0.900", + "pre-commit>=2.0", +] +docs = [ + "sphinx>=4.0", + "sphinx-rtd-theme>=1.0", + "sphinx-click>=3.0", +] +gitea = [ + "requests>=2.25.0", +] +github = [ + "pygithub>=1.55", +] +jira = [ + "jira>=3.0", +] + +[project.urls] +Homepage = "https://github.com/markitect/universal-issue-tracker" +Documentation = "https://universal-issue-tracker.readthedocs.io/" +Repository = "https://github.com/markitect/universal-issue-tracker.git" +"Bug Tracker" = "https://github.com/markitect/universal-issue-tracker/issues" + +[project.scripts] +issue = "issue_tracker.cli.main:main" +issue-tracker = "issue_tracker.cli.main:main" + +[tool.setuptools] +packages = ["issue_tracker"] + +[tool.setuptools.dynamic] +version = {attr = "issue_tracker.__version__"} + +[tool.setuptools.package-data] +issue_tracker = ["backends/local/schema.sql"] + +[tool.black] +line-length = 100 +target-version = ['py38'] +include = '\.pyi?$' +extend-exclude = ''' +/( + # directories + \.eggs + | \.git + | \.hg + | \.mypy_cache + | \.tox + | \.venv + | build + | dist +)/ +''' + +[tool.isort] +profile = "black" +line_length = 100 +known_first_party = ["issue_tracker"] + +[tool.mypy] +python_version = "3.8" +warn_return_any = true +warn_unused_configs = true +disallow_untyped_defs = true +disallow_incomplete_defs = true +check_untyped_defs = true +disallow_untyped_decorators = true +no_implicit_optional = true +warn_redundant_casts = true +warn_unused_ignores = true +warn_no_return = true +warn_unreachable = true +strict_equality = true + +[[tool.mypy.overrides]] +module = [ + "requests.*", + "click.*", +] +ignore_missing_imports = true + +[tool.pytest.ini_options] +testpaths = ["tests"] +python_files = ["test_*.py", "*_test.py"] +python_classes = ["Test*"] +python_functions = ["test_*"] +addopts = [ + "--strict-markers", + "--disable-warnings", + "--tb=short", +] +markers = [ + "unit: Unit tests", + "integration: Integration tests", + "slow: Slow tests", +] + +[tool.coverage.run] +source = ["issue_tracker"] +omit = [ + "*/tests/*", + "*/test_*", + "setup.py", +] + +[tool.coverage.report] +exclude_lines = [ + "pragma: no cover", + "def __repr__", + "if self.debug:", + "if settings.DEBUG", + "raise AssertionError", + "raise NotImplementedError", + "if 0:", + "if __name__ == .__main__.:", + "class .*\\bProtocol\\):", + "@(abc\\.)?abstractmethod", +] \ No newline at end of file