From d402f3c75bd1192aca2800eb0e5947e4d05806f5 Mon Sep 17 00:00:00 2001 From: tegwick Date: Sat, 25 Oct 2025 01:19:21 +0200 Subject: [PATCH] feat: replace local issue-facade with standalone repository submodule MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaced the local issue-facade implementation with a git submodule pointing to the standalone coulomb/issue-facade repository. This establishes clear separation between the MarkiTect core project and the universal issue management facade. Changes: - Removed local issue-facade-251025 directory - Added coulomb/issue-facade as git submodule at issue-facade/ - Updated .gitmodules with submodule configuration - Updated Claude Code permissions for submodule usage The issue-facade is now maintained as a separate project while remaining easily accessible as a submodule. This allows independent development and versioning of the universal issue tracking facade while maintaining integration with MarkiTect workflows. Submodule URL: http://92.205.130.254:32166/coulomb/issue-facade.git Current commit: 51aea5e (init: first extract of implementation) 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .gitmodules | 3 + issue-facade | 1 + issue-facade/README.md | 340 ------------- issue-facade/__init__.py | 24 - issue-facade/backends/__init__.py | 11 - issue-facade/backends/gitea/__init__.py | 17 - issue-facade/backends/gitea/backend.py | 591 ---------------------- issue-facade/backends/local/__init__.py | 19 - issue-facade/backends/local/backend.py | 618 ------------------------ issue-facade/backends/local/schema.sql | 189 -------- issue-facade/cli/__init__.py | 20 - issue-facade/cli/backend_commands.py | 141 ------ issue-facade/cli/commands.py | 417 ---------------- issue-facade/cli/main.py | 117 ----- issue-facade/cli/sync_commands.py | 235 --------- issue-facade/cli/utils.py | 336 ------------- issue-facade/core/interfaces.py | 407 ---------------- issue-facade/core/models.py | 341 ------------- issue-facade/core/repository.py | 454 ----------------- 19 files changed, 4 insertions(+), 4277 deletions(-) create mode 160000 issue-facade delete mode 100644 issue-facade/README.md delete mode 100644 issue-facade/__init__.py delete mode 100644 issue-facade/backends/__init__.py delete mode 100644 issue-facade/backends/gitea/__init__.py delete mode 100644 issue-facade/backends/gitea/backend.py delete mode 100644 issue-facade/backends/local/__init__.py delete mode 100644 issue-facade/backends/local/backend.py delete mode 100644 issue-facade/backends/local/schema.sql delete mode 100644 issue-facade/cli/__init__.py delete mode 100644 issue-facade/cli/backend_commands.py delete mode 100644 issue-facade/cli/commands.py delete mode 100644 issue-facade/cli/main.py delete mode 100644 issue-facade/cli/sync_commands.py delete mode 100644 issue-facade/cli/utils.py delete mode 100644 issue-facade/core/interfaces.py delete mode 100644 issue-facade/core/models.py delete mode 100644 issue-facade/core/repository.py diff --git a/.gitmodules b/.gitmodules index d2e0e58a..0f710637 100644 --- a/.gitmodules +++ b/.gitmodules @@ -2,3 +2,6 @@ path = wiki url = http://92.205.130.254:32166/coulomb/markitect_project.wiki.git branch = main +[submodule "issue-facade"] + path = issue-facade + url = http://92.205.130.254:32166/coulomb/issue-facade.git diff --git a/issue-facade b/issue-facade new file mode 160000 index 00000000..51aea5ef --- /dev/null +++ b/issue-facade @@ -0,0 +1 @@ +Subproject commit 51aea5effbcc3a15587e5dce4b78efded9907291 diff --git a/issue-facade/README.md b/issue-facade/README.md deleted file mode 100644 index bdcb19ac..00000000 --- a/issue-facade/README.md +++ /dev/null @@ -1,340 +0,0 @@ -# Issue Facade - Universal CLI for Issue Tracking - -A convenient command-line facade that provides a unified interface to the repository's main issue tracker, regardless of which backend system is actually being used. - -## Purpose - -The **Issue Facade** acts as a convenient CLI wrapper that automatically detects and interfaces with whatever issue tracking system is configured for the current repository. This means you get a consistent, intuitive command-line experience whether your project uses: - -- GitHub Issues -- GitLab Issues -- Gitea Issues -- JIRA -- Local SQLite storage -- Any other supported backend - -## Philosophy - -Rather than learning different commands and workflows for each issue tracking system, the Issue Facade provides: - -- **One CLI to rule them all**: Same commands work across all backends -- **Repository-aware**: Automatically detects the relevant issue tracker for your repo -- **Offline capability**: Local SQLite fallback when remote systems are unavailable -- **Seamless sync**: Keep local and remote issue trackers synchronized - -## How It Works - -```bash -# The facade automatically detects your repository's issue tracker -cd /path/to/my-project -issue list # Lists issues from the repo's configured tracker - -cd /path/to/another-project -issue list # Lists issues from THIS repo's tracker - -# Same commands, different backends - transparent to the user -``` - -## Key Features - -### 🎯 **Repository Context Awareness** -The facade automatically detects: -- Git remotes (GitHub, GitLab, Gitea URLs) -- Configuration files (`.issue-config`, `pyproject.toml`, etc.) -- Environment variables -- Default fallbacks - -### 🖥️ **Unified CLI Experience** -```bash -# Core issue operations (same across all backends) -issue list # List issues -issue show 42 # Show issue details -issue create "Bug in parser" # Create new issue -issue edit 42 --add-label bug # Edit existing issue -issue close 42 --comment "Fixed" # Close with comment -issue comment 42 "Still broken" # Add comment - -# Advanced operations -issue list --assignee=me --state=open -issue search "memory leak" -issue stats # Show issue statistics -``` - -### 🔄 **Automatic Backend Detection** -```bash -# GitHub repository -cd my-github-project -issue list # → Automatically uses GitHub Issues API - -# Gitea repository -cd my-gitea-project -issue list # → Automatically uses Gitea Issues API - -# Offline/local work -cd any-project -issue backend set local -issue list # → Uses local SQLite storage -``` - -### 🌐 **Seamless Synchronization** -```bash -# Work offline, sync later -issue create "Bug found offline" --local -issue sync # Pushes to remote when online - -# Keep backups -issue sync pull # Download all remote issues locally -issue export backup.json # Export for archival -``` - -## Installation & Setup - -### 1. Install the Facade -```bash -pip install issue-facade -``` - -### 2. Automatic Configuration -The facade auto-detects your repository's issue tracker: - -```bash -cd your-repository -issue config detect # Auto-configure based on git remotes -issue list # Ready to use! -``` - -### 3. Manual Configuration (if needed) -```bash -# Configure specific backends -issue backend add github my-repo -issue backend add gitea company-repo -issue backend add local offline - -# Set repository-specific backend -issue config set-backend github # For current repository -``` - -## Repository Integration Examples - -### GitHub Repository -```bash -cd my-github-project -issue config detect -# → Automatically configures GitHub Issues via API -# → Uses .github/ISSUE_TEMPLATE/ for templates -# → Respects repository labels and milestones - -issue create "Security vulnerability" --template security -``` - -### Corporate Gitea -```bash -cd company-project -issue config detect -# → Detects Gitea instance from git remote -# → Prompts for access token (one-time setup) -# → Uses corporate labels and workflows - -issue list --milestone "Q4 Release" -``` - -### Offline Development -```bash -cd any-project -issue backend set local -# → Creates local SQLite database in .issue-facade/ -# → Full offline functionality -# → Sync to remote when connection available - -issue create "Performance issue" --offline -issue sync when-online -``` - -## Configuration - -### Per-Repository Configuration -Each repository can have its own configuration in `.issue-facade/config.json`: - -```json -{ - "backend": "github", - "github": { - "owner": "myorg", - "repo": "myproject", - "token_env": "GITHUB_TOKEN" - }, - "local": { - "db_path": ".issue-facade/issues.db", - "sync_enabled": true - }, - "templates": { - "bug": ".github/ISSUE_TEMPLATE/bug_report.md", - "feature": ".github/ISSUE_TEMPLATE/feature_request.md" - } -} -``` - -### Global Configuration -User-wide settings in `~/.config/issue-facade/config.json`: - -```json -{ - "default_backend": "local", - "github_token": "env:GITHUB_TOKEN", - "gitea_instances": { - "company": { - "url": "https://git.company.com", - "token": "env:GITEA_TOKEN" - } - }, - "offline_mode": false, - "sync_interval": "1h" -} -``` - -## Architecture - -### Facade Pattern Implementation -``` -Repository Working Directory -├── .git/ # Git repository -├── .issue-facade/ # Facade configuration -│ ├── config.json # Repository-specific config -│ ├── issues.db # Local SQLite cache/backup -│ └── templates/ # Issue templates -├── your-project-files... -└── issue # CLI command (context-aware) -``` - -### Backend Detection Logic -1. **Check repository configuration**: `.issue-facade/config.json` -2. **Analyze git remotes**: Detect GitHub/GitLab/Gitea URLs -3. **Look for platform files**: `.github/`, `.gitlab/`, etc. -4. **Check environment**: `GITHUB_TOKEN`, `GITLAB_TOKEN`, etc. -5. **Fall back to local**: SQLite storage if no remote detected - -### Multi-Repository Support -```bash -# Each repository maintains its own context -/projects/web-app/ → GitHub Issues -/projects/api-server/ → GitLab Issues -/projects/cli-tool/ → Gitea Issues -/projects/experiment/ → Local SQLite - -# Same commands work in all contexts -cd web-app && issue list # GitHub -cd api-server && issue list # GitLab -cd cli-tool && issue list # Gitea -cd experiment && issue list # Local -``` - -## Use Cases - -### 1. **Multi-Platform Developer** -You work with repositories across GitHub, GitLab, and company Gitea: -```bash -# Learn one CLI, use everywhere -issue list --assignee=me # Works on all platforms -issue create "Cross-platform bug" --label bug -``` - -### 2. **Offline Developer** -You need to track issues without constant internet: -```bash -issue create "Found while flying" --offline -issue list --local # View offline issues -issue sync # Upload when back online -``` - -### 3. **Repository Migration** -Moving from GitHub to GitLab: -```bash -issue export github-backup.json # Backup from GitHub -issue backend set gitlab # Switch to GitLab -issue import github-backup.json # Import to GitLab -``` - -### 4. **Cross-Repository Analytics** -Track issues across multiple repositories: -```bash -issue stats --all-repos # Statistics across all configured repos -issue search "security" --global # Search across all issue trackers -``` - -## Integration with Development Workflow - -### Git Hooks Integration -```bash -# .git/hooks/pre-commit -issue list --assignee=me --state=open > ISSUES.md -git add ISSUES.md -``` - -### CI/CD Integration -```bash -# In your CI pipeline -issue create "Build failed on commit $SHA" --label ci-failure -issue close-if-fixed $ISSUE_NUMBER -``` - -### IDE Integration -```bash -# VS Code, Vim, Emacs plugins can use the CLI -:IssueList # List issues in editor -:IssueCreate "Typo in function" # Create issue from editor -``` - -## Comparison with Native Tools - -| Feature | issue-facade | gh (GitHub CLI) | glab (GitLab CLI) | Platform Web UI | -|---------|--------------|-----------------|-------------------|-----------------| -| **Multi-platform** | ✅ All backends | ❌ GitHub only | ❌ GitLab only | ❌ Single platform | -| **Offline support** | ✅ Local SQLite | ❌ Online only | ❌ Online only | ❌ Online only | -| **Consistent CLI** | ✅ Same commands | ❌ GitHub-specific | ❌ GitLab-specific | ❌ Web interface | -| **Repository context** | ✅ Auto-detect | ✅ Git-aware | ✅ Git-aware | ❌ Manual navigation | -| **Cross-repo search** | ✅ Global search | ❌ Single repo | ❌ Single repo | ❌ Single repo | -| **Data portability** | ✅ Export/import | ❌ Platform-locked | ❌ Platform-locked | ❌ Platform-locked | - -## Future Roadmap - -### Version 1.0 (Current) -- [x] Core facade architecture -- [x] GitHub/GitLab/Gitea backend support -- [x] Local SQLite backend -- [x] Automatic repository detection -- [x] Basic synchronization - -### Version 1.1 -- [ ] Advanced sync (conflict resolution) -- [ ] Issue templates support -- [ ] Workflow automation hooks -- [ ] Plugin system for custom backends - -### Version 2.0 -- [ ] Web dashboard for multi-repo overview -- [ ] Advanced analytics and reporting -- [ ] Team collaboration features -- [ ] Integration with project management tools - -## Contributing - -The Issue Facade is designed to be: -- **Backend-agnostic**: Easy to add new issue tracking systems -- **Repository-aware**: Respects the conventions of each platform -- **Developer-friendly**: Consistent CLI across all environments - -To add a new backend: -1. Implement the `IssueBackend` interface -2. Add detection logic for the platform -3. Register the backend in the factory -4. Add CLI configuration support - -## Why "Facade"? - -The **Facade Pattern** perfectly describes this tool's purpose: - -> *"Provide a unified interface to a set of interfaces in a subsystem. Facade defines a higher-level interface that makes the subsystem easier to use."* - -Instead of learning different CLIs for GitHub (`gh`), GitLab (`glab`), JIRA, etc., the Issue Facade provides one consistent interface that works with all of them. It's the universal remote control for issue tracking systems. - -The facade doesn't replace the underlying issue trackers - it makes them easier to use consistently across different platforms and repositories. \ No newline at end of file diff --git a/issue-facade/__init__.py b/issue-facade/__init__.py deleted file mode 100644 index cab4613f..00000000 --- a/issue-facade/__init__.py +++ /dev/null @@ -1,24 +0,0 @@ -""" -Universal Issue Tracking System - -A backend-agnostic issue tracking system that supports multiple backends -through a plugin architecture. Designed to be extracted into a standalone -repository for use across multiple projects. - -Features: -- Unified issue model across all backends -- Plugin-based backend architecture -- Local SQLite backend for offline work -- Bidirectional synchronization -- CLI-first interface -- Support for GitHub-style and other issue tracking systems - -Supported Backends: -- Local SQLite (for offline/standalone use) -- Gitea (GitHub-compatible API) -- Future: GitHub, GitLab, JIRA, Redmine, etc. -""" - -__version__ = "0.1.0" -__author__ = "MarkiTect Project" -__description__ = "Universal Issue Tracking System with Plugin Architecture" \ No newline at end of file diff --git a/issue-facade/backends/__init__.py b/issue-facade/backends/__init__.py deleted file mode 100644 index 429769c5..00000000 --- a/issue-facade/backends/__init__.py +++ /dev/null @@ -1,11 +0,0 @@ -""" -Issue Tracking Backend Plugins - -This package contains implementations for various issue tracking backends. -Each backend implements the IssueBackend interface to provide a consistent -API regardless of the underlying issue tracking system. - -Available Backends: -- local: SQLite-based local backend for offline use -- gitea: Gitea API backend for GitHub-compatible systems -""" \ No newline at end of file diff --git a/issue-facade/backends/gitea/__init__.py b/issue-facade/backends/gitea/__init__.py deleted file mode 100644 index c02af71a..00000000 --- a/issue-facade/backends/gitea/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -""" -Gitea Backend - -A backend implementation for Gitea issue tracking systems. -This backend provides integration with Gitea API for remote issue management. - -Features: -- Full Gitea API integration -- GitHub-compatible operations -- Remote synchronization -- Authentication support -- Rate limiting compliance -""" - -from .backend import GiteaBackend - -__all__ = ['GiteaBackend'] \ No newline at end of file diff --git a/issue-facade/backends/gitea/backend.py b/issue-facade/backends/gitea/backend.py deleted file mode 100644 index c7ac47fa..00000000 --- a/issue-facade/backends/gitea/backend.py +++ /dev/null @@ -1,591 +0,0 @@ -""" -Gitea Backend Implementation - -Provides integration with Gitea API for remote issue tracking. -This backend adapts the Gitea API to our unified issue model. -""" - -import requests -import time -from datetime import datetime, timezone -from typing import List, Optional, Dict, Any -from urllib.parse import urljoin - -from ...core.interfaces import RemoteBackend, BackendCapabilities, IssueFilter, SyncableBackend -from ...core.models import Issue, Label, User, Milestone, Comment, IssueState, Priority, IssueType - - -class GiteaAPIError(Exception): - """Gitea API specific errors.""" - pass - - -class GiteaRateLimitError(GiteaAPIError): - """Rate limit exceeded.""" - pass - - -class GiteaBackend(RemoteBackend, SyncableBackend): - """Gitea API backend for remote issue tracking.""" - - def __init__(self): - self.base_url: Optional[str] = None - self.token: Optional[str] = None - self.owner: Optional[str] = None - self.repo: Optional[str] = None - self.session = requests.Session() - self._capabilities = BackendCapabilities( - supports_milestones=True, - supports_assignees=True, - supports_comments=True, - supports_labels=True, - supports_search=True, - supports_bulk_operations=False, - supports_webhooks=True, - supports_real_time=False, - max_labels_per_issue=None, - max_assignees_per_issue=10 # Gitea typical limit - ) - - @property - def backend_type(self) -> str: - return "gitea" - - @property - def capabilities(self) -> BackendCapabilities: - return self._capabilities - - def connect(self, config: Dict[str, Any]) -> None: - """Connect to Gitea API.""" - self.base_url = config['base_url'].rstrip('/') - self.token = config['token'] - self.owner = config['owner'] - self.repo = config['repo'] - - # Setup session with authentication - self.session.headers.update({ - 'Authorization': f'token {self.token}', - 'Content-Type': 'application/json', - 'Accept': 'application/json' - }) - - # Test connection - if not self.test_connection(): - raise GiteaAPIError("Failed to connect to Gitea API") - - def disconnect(self) -> None: - """Disconnect from Gitea API.""" - self.session.close() - self.base_url = None - self.token = None - self.owner = None - self.repo = None - - def test_connection(self) -> bool: - """Test Gitea API connection.""" - try: - response = self._api_request('GET', f'/repos/{self.owner}/{self.repo}') - return response.status_code == 200 - except Exception: - return False - - def _api_request(self, method: str, endpoint: str, data: Optional[Dict] = None, params: Optional[Dict] = None) -> requests.Response: - """Make API request with error handling and rate limiting.""" - url = urljoin(f"{self.base_url}/api/v1", endpoint) - - try: - response = self.session.request(method, url, json=data, params=params) - - # Handle rate limiting - if response.status_code == 429: - retry_after = int(response.headers.get('Retry-After', 60)) - raise GiteaRateLimitError(f"Rate limit exceeded. Retry after {retry_after} seconds") - - response.raise_for_status() - return response - - except requests.exceptions.RequestException as e: - raise GiteaAPIError(f"API request failed: {e}") - - def _gitea_issue_to_unified(self, gitea_issue: Dict[str, Any]) -> Issue: - """Convert Gitea issue JSON to unified Issue model.""" - # Convert labels - labels = [] - for label_data in gitea_issue.get('labels', []): - labels.append(Label( - name=label_data['name'], - color=label_data['color'], - description=label_data.get('description', ''), - backend_id=str(label_data['id']) - )) - - # Convert assignees - assignees = [] - if gitea_issue.get('assignees'): - for assignee_data in gitea_issue['assignees']: - assignees.append(User( - id=str(assignee_data['id']), - username=assignee_data['login'], - display_name=assignee_data.get('full_name', ''), - email=assignee_data.get('email', ''), - avatar_url=assignee_data.get('avatar_url', ''), - backend_id=str(assignee_data['id']) - )) - - # Convert milestone - milestone = None - if gitea_issue.get('milestone'): - milestone_data = gitea_issue['milestone'] - milestone = Milestone( - id=str(milestone_data['id']), - title=milestone_data['title'], - description=milestone_data.get('description', ''), - state=milestone_data['state'], - due_date=datetime.fromisoformat(milestone_data['due_on'].replace('Z', '+00:00')) if milestone_data.get('due_on') else None, - created_at=datetime.fromisoformat(milestone_data['created_at'].replace('Z', '+00:00')) if milestone_data.get('created_at') else None, - updated_at=datetime.fromisoformat(milestone_data['updated_at'].replace('Z', '+00:00')) if milestone_data.get('updated_at') else None, - backend_id=str(milestone_data['id']) - ) - - # Determine state - if gitea_issue['state'] == 'closed': - state = IssueState.CLOSED - else: - # Check for status labels to determine more specific state - for label in labels: - if label.name == 'status:in_progress' or label.name == 'status:in-progress': - state = IssueState.IN_PROGRESS - break - elif label.name == 'status:blocked': - state = IssueState.BLOCKED - break - else: - state = IssueState.OPEN - - return Issue( - id=str(gitea_issue['id']), - number=gitea_issue['number'], - title=gitea_issue['title'], - description=gitea_issue.get('body', ''), - state=state, - created_at=datetime.fromisoformat(gitea_issue['created_at'].replace('Z', '+00:00')), - updated_at=datetime.fromisoformat(gitea_issue['updated_at'].replace('Z', '+00:00')), - closed_at=datetime.fromisoformat(gitea_issue['closed_at'].replace('Z', '+00:00')) if gitea_issue.get('closed_at') else None, - labels=labels, - assignees=assignees, - milestone=milestone, - backend_id=str(gitea_issue['id']), - backend_type='gitea' - ) - - def _unified_issue_to_gitea(self, issue: Issue) -> Dict[str, Any]: - """Convert unified Issue to Gitea API format.""" - data = { - 'title': issue.title, - 'body': issue.description, - 'state': 'closed' if issue.state == IssueState.CLOSED else 'open' - } - - if issue.assignees: - data['assignees'] = [assignee.username for assignee in issue.assignees] - - if issue.milestone: - data['milestone'] = int(issue.milestone.backend_id) if issue.milestone.backend_id else None - - # Convert labels - if issue.labels: - data['labels'] = [label.name for label in issue.labels] - - return data - - # Issue CRUD Operations - def create_issue(self, issue: Issue) -> Issue: - """Create issue in Gitea.""" - data = self._unified_issue_to_gitea(issue) - - response = self._api_request('POST', f'/repos/{self.owner}/{self.repo}/issues', data=data) - gitea_issue = response.json() - - return self._gitea_issue_to_unified(gitea_issue) - - def get_issue(self, issue_id: str) -> Optional[Issue]: - """Get issue from Gitea by ID.""" - try: - response = self._api_request('GET', f'/repos/{self.owner}/{self.repo}/issues/{issue_id}') - gitea_issue = response.json() - return self._gitea_issue_to_unified(gitea_issue) - except GiteaAPIError: - return None - - def get_issue_by_number(self, number: int) -> Optional[Issue]: - """Get issue by number.""" - return self.get_issue(str(number)) - - def update_issue(self, issue: Issue) -> Issue: - """Update issue in Gitea.""" - data = self._unified_issue_to_gitea(issue) - - response = self._api_request('PATCH', f'/repos/{self.owner}/{self.repo}/issues/{issue.backend_id}', data=data) - gitea_issue = response.json() - - return self._gitea_issue_to_unified(gitea_issue) - - def delete_issue(self, issue_id: str) -> bool: - """Delete issue - not supported by Gitea API.""" - # Gitea doesn't support deleting issues via API - # We could close it instead - try: - issue = self.get_issue(issue_id) - if issue: - issue.close() - self.update_issue(issue) - return True - except Exception: - pass - return False - - def list_issues(self, filter_criteria: Optional[IssueFilter] = None) -> List[Issue]: - """List issues from Gitea.""" - params = { - 'state': 'all', # Get both open and closed - 'sort': 'updated', - 'order': 'desc' - } - - if filter_criteria: - if filter_criteria.state: - if filter_criteria.state == 'open': - params['state'] = 'open' - elif filter_criteria.state == 'closed': - params['state'] = 'closed' - - if filter_criteria.assignee: - params['assignee'] = filter_criteria.assignee - - if filter_criteria.milestone: - params['milestone'] = filter_criteria.milestone - - if filter_criteria.labels: - params['labels'] = ','.join(filter_criteria.labels) - - if filter_criteria.created_after: - params['since'] = filter_criteria.created_after.isoformat() - - if filter_criteria.limit: - params['limit'] = filter_criteria.limit - - if filter_criteria.offset: - params['page'] = (filter_criteria.offset // (filter_criteria.limit or 30)) + 1 - - response = self._api_request('GET', f'/repos/{self.owner}/{self.repo}/issues', params=params) - gitea_issues = response.json() - - issues = [self._gitea_issue_to_unified(gitea_issue) for gitea_issue in gitea_issues] - - # Apply additional filtering that Gitea API doesn't support - if filter_criteria: - if filter_criteria.search: - search_term = filter_criteria.search.lower() - issues = [ - issue for issue in issues - if search_term in issue.title.lower() or search_term in issue.description.lower() - ] - - return issues - - def search_issues(self, query: str, limit: Optional[int] = None) -> List[Issue]: - """Search issues - limited Gitea API support.""" - # Gitea has limited search API, fallback to list with search filter - filter_criteria = IssueFilter(search=query, limit=limit) - return self.list_issues(filter_criteria) - - # Label Operations - def create_label(self, label: Label) -> Label: - """Create label in Gitea.""" - data = { - 'name': label.name, - 'color': label.color or '#000000', - 'description': label.description or '' - } - - response = self._api_request('POST', f'/repos/{self.owner}/{self.repo}/labels', data=data) - gitea_label = response.json() - - return Label( - name=gitea_label['name'], - color=gitea_label['color'], - description=gitea_label.get('description', ''), - backend_id=str(gitea_label['id']) - ) - - def get_labels(self) -> List[Label]: - """Get all labels from Gitea.""" - response = self._api_request('GET', f'/repos/{self.owner}/{self.repo}/labels') - gitea_labels = response.json() - - return [Label( - name=label['name'], - color=label['color'], - description=label.get('description', ''), - backend_id=str(label['id']) - ) for label in gitea_labels] - - def update_label(self, label: Label) -> Label: - """Update label in Gitea.""" - data = { - 'name': label.name, - 'color': label.color or '#000000', - 'description': label.description or '' - } - - response = self._api_request('PATCH', f'/repos/{self.owner}/{self.repo}/labels/{label.name}', data=data) - gitea_label = response.json() - - return Label( - name=gitea_label['name'], - color=gitea_label['color'], - description=gitea_label.get('description', ''), - backend_id=str(gitea_label['id']) - ) - - def delete_label(self, label_name: str) -> bool: - """Delete label from Gitea.""" - try: - self._api_request('DELETE', f'/repos/{self.owner}/{self.repo}/labels/{label_name}') - return True - except GiteaAPIError: - return False - - # User Operations - def get_users(self) -> List[User]: - """Get repository collaborators.""" - response = self._api_request('GET', f'/repos/{self.owner}/{self.repo}/collaborators') - gitea_users = response.json() - - return [User( - id=str(user['id']), - username=user['login'], - display_name=user.get('full_name', ''), - email=user.get('email', ''), - avatar_url=user.get('avatar_url', ''), - backend_id=str(user['id']) - ) for user in gitea_users] - - def get_user(self, user_id: str) -> Optional[User]: - """Get specific user.""" - try: - response = self._api_request('GET', f'/users/{user_id}') - user = response.json() - return User( - id=str(user['id']), - username=user['login'], - display_name=user.get('full_name', ''), - email=user.get('email', ''), - avatar_url=user.get('avatar_url', ''), - backend_id=str(user['id']) - ) - except GiteaAPIError: - return None - - def search_users(self, query: str) -> List[User]: - """Search users in Gitea.""" - params = {'q': query, 'limit': 50} - response = self._api_request('GET', '/users/search', params=params) - search_result = response.json() - - return [User( - id=str(user['id']), - username=user['login'], - display_name=user.get('full_name', ''), - email=user.get('email', ''), - avatar_url=user.get('avatar_url', ''), - backend_id=str(user['id']) - ) for user in search_result.get('data', [])] - - # Milestone Operations - def create_milestone(self, milestone: Milestone) -> Milestone: - """Create milestone in Gitea.""" - data = { - 'title': milestone.title, - 'description': milestone.description or '', - 'due_on': milestone.due_date.isoformat() if milestone.due_date else None - } - - response = self._api_request('POST', f'/repos/{self.owner}/{self.repo}/milestones', data=data) - gitea_milestone = response.json() - - return Milestone( - id=str(gitea_milestone['id']), - title=gitea_milestone['title'], - description=gitea_milestone.get('description', ''), - state=gitea_milestone['state'], - due_date=datetime.fromisoformat(gitea_milestone['due_on'].replace('Z', '+00:00')) if gitea_milestone.get('due_on') else None, - created_at=datetime.fromisoformat(gitea_milestone['created_at'].replace('Z', '+00:00')), - updated_at=datetime.fromisoformat(gitea_milestone['updated_at'].replace('Z', '+00:00')), - backend_id=str(gitea_milestone['id']) - ) - - def get_milestones(self) -> List[Milestone]: - """Get all milestones.""" - response = self._api_request('GET', f'/repos/{self.owner}/{self.repo}/milestones') - gitea_milestones = response.json() - - return [Milestone( - id=str(m['id']), - title=m['title'], - description=m.get('description', ''), - state=m['state'], - due_date=datetime.fromisoformat(m['due_on'].replace('Z', '+00:00')) if m.get('due_on') else None, - created_at=datetime.fromisoformat(m['created_at'].replace('Z', '+00:00')), - updated_at=datetime.fromisoformat(m['updated_at'].replace('Z', '+00:00')), - backend_id=str(m['id']) - ) for m in gitea_milestones] - - def update_milestone(self, milestone: Milestone) -> Milestone: - """Update milestone.""" - data = { - 'title': milestone.title, - 'description': milestone.description or '', - 'state': milestone.state, - 'due_on': milestone.due_date.isoformat() if milestone.due_date else None - } - - response = self._api_request('PATCH', f'/repos/{self.owner}/{self.repo}/milestones/{milestone.backend_id}', data=data) - gitea_milestone = response.json() - - return Milestone( - id=str(gitea_milestone['id']), - title=gitea_milestone['title'], - description=gitea_milestone.get('description', ''), - state=gitea_milestone['state'], - due_date=datetime.fromisoformat(gitea_milestone['due_on'].replace('Z', '+00:00')) if gitea_milestone.get('due_on') else None, - created_at=datetime.fromisoformat(gitea_milestone['created_at'].replace('Z', '+00:00')), - updated_at=datetime.fromisoformat(gitea_milestone['updated_at'].replace('Z', '+00:00')), - backend_id=str(gitea_milestone['id']) - ) - - def delete_milestone(self, milestone_id: str) -> bool: - """Delete milestone.""" - try: - self._api_request('DELETE', f'/repos/{self.owner}/{self.repo}/milestones/{milestone_id}') - return True - except GiteaAPIError: - return False - - # Comment Operations - def add_comment(self, issue_id: str, comment: Comment) -> Comment: - """Add comment to issue.""" - data = {'body': comment.body} - - response = self._api_request('POST', f'/repos/{self.owner}/{self.repo}/issues/{issue_id}/comments', data=data) - gitea_comment = response.json() - - # Convert author - author_data = gitea_comment['user'] - author = User( - id=str(author_data['id']), - username=author_data['login'], - display_name=author_data.get('full_name', ''), - email=author_data.get('email', ''), - avatar_url=author_data.get('avatar_url', ''), - backend_id=str(author_data['id']) - ) - - return Comment( - id=str(gitea_comment['id']), - body=gitea_comment['body'], - author=author, - created_at=datetime.fromisoformat(gitea_comment['created_at'].replace('Z', '+00:00')), - updated_at=datetime.fromisoformat(gitea_comment['updated_at'].replace('Z', '+00:00')), - backend_id=str(gitea_comment['id']) - ) - - def get_comments(self, issue_id: str) -> List[Comment]: - """Get comments for issue.""" - response = self._api_request('GET', f'/repos/{self.owner}/{self.repo}/issues/{issue_id}/comments') - gitea_comments = response.json() - - comments = [] - for gc in gitea_comments: - author_data = gc['user'] - author = User( - id=str(author_data['id']), - username=author_data['login'], - display_name=author_data.get('full_name', ''), - email=author_data.get('email', ''), - avatar_url=author_data.get('avatar_url', ''), - backend_id=str(author_data['id']) - ) - - comment = Comment( - id=str(gc['id']), - body=gc['body'], - author=author, - created_at=datetime.fromisoformat(gc['created_at'].replace('Z', '+00:00')), - updated_at=datetime.fromisoformat(gc['updated_at'].replace('Z', '+00:00')), - backend_id=str(gc['id']) - ) - comments.append(comment) - - return comments - - def update_comment(self, comment: Comment) -> Comment: - """Update comment.""" - data = {'body': comment.body} - - response = self._api_request('PATCH', f'/repos/{self.owner}/{self.repo}/issues/comments/{comment.backend_id}', data=data) - gitea_comment = response.json() - - # Update comment object - comment.updated_at = datetime.fromisoformat(gitea_comment['updated_at'].replace('Z', '+00:00')) - return comment - - def delete_comment(self, comment_id: str) -> bool: - """Delete comment.""" - try: - self._api_request('DELETE', f'/repos/{self.owner}/{self.repo}/issues/comments/{comment_id}') - return True - except GiteaAPIError: - return False - - # Sync Support - def get_last_sync_timestamp(self) -> Optional[datetime]: - """Get last sync timestamp - stored in metadata.""" - # Could be stored in repository description or other metadata - return None - - def get_issues_modified_since(self, timestamp: datetime) -> List[Issue]: - """Get issues modified since timestamp.""" - filter_criteria = IssueFilter(updated_after=timestamp) - return self.list_issues(filter_criteria) - - # SyncableBackend Implementation - def prepare_for_sync(self) -> None: - """Prepare for sync operation.""" - # Could implement rate limiting preparation - pass - - def finalize_sync(self, success: bool) -> None: - """Finalize sync operation.""" - # Could log sync status - pass - - def get_sync_conflicts(self) -> List[Dict[str, Any]]: - """Get sync conflicts.""" - # Would compare timestamps and detect conflicts - return [] - - def resolve_sync_conflict(self, issue_id: str, resolution: str) -> Issue: - """Resolve sync conflict.""" - issue = self.get_issue(issue_id) - if not issue: - raise GiteaAPIError(f"Issue {issue_id} not found") - - if resolution == 'remote': - # Keep remote version (current issue) - return issue - elif resolution == 'local': - # This would require the local version to be provided - raise NotImplementedError("Local resolution requires local issue data") - else: - raise ValueError(f"Unknown resolution: {resolution}") \ No newline at end of file diff --git a/issue-facade/backends/local/__init__.py b/issue-facade/backends/local/__init__.py deleted file mode 100644 index a972084c..00000000 --- a/issue-facade/backends/local/__init__.py +++ /dev/null @@ -1,19 +0,0 @@ -""" -Local SQLite Backend - -A local, file-based issue tracking backend using SQLite for storage. -This backend provides complete offline functionality and serves as the -reference implementation for the backend interface. - -Features: -- Full CRUD operations -- SQLite database storage -- No external dependencies -- Offline operation -- Fast local search -- Backup and export capabilities -""" - -from .backend import LocalSQLiteBackend - -__all__ = ['LocalSQLiteBackend'] \ No newline at end of file diff --git a/issue-facade/backends/local/backend.py b/issue-facade/backends/local/backend.py deleted file mode 100644 index b6c7d253..00000000 --- a/issue-facade/backends/local/backend.py +++ /dev/null @@ -1,618 +0,0 @@ -""" -Local SQLite Backend Implementation - -Provides a complete local issue tracking backend using SQLite for storage. -This implementation serves as the reference for the backend interface and -provides full offline functionality. -""" - -import sqlite3 -import json -import uuid -from datetime import datetime, timezone -from pathlib import Path -from typing import List, Optional, Dict, Any - -from ...core.interfaces import LocalBackend, BackendCapabilities, IssueFilter, SyncableBackend -from ...core.models import Issue, Label, User, Milestone, Comment, IssueState, Priority, IssueType - - -class LocalSQLiteBackend(LocalBackend, SyncableBackend): - """SQLite-based local backend for issue tracking.""" - - def __init__(self, db_path: Optional[str] = None): - self.db_path = db_path or "issues.db" - self.connection: Optional[sqlite3.Connection] = None - self._capabilities = BackendCapabilities( - supports_milestones=True, - supports_assignees=True, - supports_comments=True, - supports_labels=True, - supports_search=True, - supports_bulk_operations=True, - supports_webhooks=False, - supports_real_time=False, - max_labels_per_issue=None, - max_assignees_per_issue=None - ) - - @property - def backend_type(self) -> str: - return "local" - - @property - def capabilities(self) -> BackendCapabilities: - return self._capabilities - - def connect(self, config: Dict[str, Any]) -> None: - """Connect to SQLite database.""" - db_path = config.get('db_path', self.db_path) - self.db_path = db_path - - # Ensure directory exists - Path(db_path).parent.mkdir(parents=True, exist_ok=True) - - self.connection = sqlite3.connect(db_path) - self.connection.row_factory = sqlite3.Row # Enable dict-like access - self.connection.execute("PRAGMA foreign_keys = ON") - - # Initialize schema - self._initialize_schema() - - def disconnect(self) -> None: - """Disconnect from database.""" - if self.connection: - self.connection.close() - self.connection = None - - def test_connection(self) -> bool: - """Test database connection.""" - if not self.connection: - return False - try: - self.connection.execute("SELECT 1") - return True - except sqlite3.Error: - return False - - def _initialize_schema(self) -> None: - """Initialize database schema.""" - schema_path = Path(__file__).parent / "schema.sql" - with open(schema_path, 'r') as f: - schema_sql = f.read() - - # Execute schema in parts (SQLite doesn't like multiple statements) - for statement in schema_sql.split(';'): - statement = statement.strip() - if statement: - self.connection.execute(statement) - self.connection.commit() - - def _get_next_issue_number(self) -> int: - """Get the next available issue number.""" - cursor = self.connection.execute("SELECT MAX(number) FROM issues") - result = cursor.fetchone() - return (result[0] or 0) + 1 - - def _issue_from_row(self, row: sqlite3.Row) -> Issue: - """Convert database row to Issue object.""" - # Get labels - cursor = self.connection.execute(""" - SELECT l.id, l.name, l.color, l.description, l.backend_id - FROM labels l - JOIN issue_labels il ON l.id = il.label_id - WHERE il.issue_id = ? - """, (row['id'],)) - label_rows = cursor.fetchall() - labels = [Label( - name=lr['name'], - color=lr['color'], - description=lr['description'], - backend_id=lr['backend_id'] - ) for lr in label_rows] - - # Get assignees - cursor = self.connection.execute(""" - SELECT u.id, u.username, u.display_name, u.email, u.avatar_url, u.backend_id - FROM users u - JOIN issue_assignees ia ON u.id = ia.user_id - WHERE ia.issue_id = ? - """, (row['id'],)) - user_rows = cursor.fetchall() - assignees = [User( - id=ur['id'], - username=ur['username'], - display_name=ur['display_name'], - email=ur['email'], - avatar_url=ur['avatar_url'], - backend_id=ur['backend_id'] - ) for ur in user_rows] - - # Get milestone - milestone = None - if row['milestone_id']: - cursor = self.connection.execute(""" - SELECT id, title, description, state, due_date, created_at, updated_at, backend_id - FROM milestones WHERE id = ? - """, (row['milestone_id'],)) - m_row = cursor.fetchone() - if m_row: - milestone = Milestone( - id=m_row['id'], - title=m_row['title'], - description=m_row['description'], - state=m_row['state'], - due_date=datetime.fromisoformat(m_row['due_date']) if m_row['due_date'] else None, - created_at=datetime.fromisoformat(m_row['created_at']) if m_row['created_at'] else None, - updated_at=datetime.fromisoformat(m_row['updated_at']) if m_row['updated_at'] else None, - backend_id=m_row['backend_id'] - ) - - # Parse sync metadata - sync_metadata = {} - if row['sync_metadata']: - try: - sync_metadata = json.loads(row['sync_metadata']) - except json.JSONDecodeError: - pass - - return Issue( - id=row['id'], - number=row['number'], - title=row['title'], - description=row['description'], - state=IssueState.from_string(row['state']), - created_at=datetime.fromisoformat(row['created_at']), - updated_at=datetime.fromisoformat(row['updated_at']), - closed_at=datetime.fromisoformat(row['closed_at']) if row['closed_at'] else None, - labels=labels, - assignees=assignees, - milestone=milestone, - backend_id=row['backend_id'], - backend_type=row['backend_type'], - sync_metadata=sync_metadata - ) - - # Issue CRUD Operations - def create_issue(self, issue: Issue) -> Issue: - """Create a new issue.""" - if not issue.id: - issue.id = str(uuid.uuid4()) - - if not issue.number: - issue.number = self._get_next_issue_number() - - # Insert issue - self.connection.execute(""" - INSERT INTO issues (id, number, title, description, state, created_at, updated_at, - closed_at, milestone_id, backend_id, backend_type, sync_metadata) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, ( - issue.id, - issue.number, - issue.title, - issue.description, - issue.state.value, - issue.created_at.isoformat(), - issue.updated_at.isoformat(), - issue.closed_at.isoformat() if issue.closed_at else None, - issue.milestone.id if issue.milestone else None, - issue.backend_id, - issue.backend_type or 'local', - json.dumps(issue.sync_metadata) if issue.sync_metadata else None - )) - - # Add labels - for label in issue.labels: - self._ensure_label_exists(label) - self.connection.execute(""" - INSERT OR IGNORE INTO issue_labels (issue_id, label_id) - VALUES (?, ?) - """, (issue.id, label.name)) # Using name as ID for simplicity - - # Add assignees - for user in issue.assignees: - self._ensure_user_exists(user) - self.connection.execute(""" - INSERT OR IGNORE INTO issue_assignees (issue_id, user_id) - VALUES (?, ?) - """, (issue.id, user.id)) - - self.connection.commit() - return issue - - def get_issue(self, issue_id: str) -> Optional[Issue]: - """Get issue by ID.""" - cursor = self.connection.execute(""" - SELECT * FROM issues WHERE id = ? OR backend_id = ? - """, (issue_id, issue_id)) - row = cursor.fetchone() - return self._issue_from_row(row) if row else None - - def get_issue_by_number(self, number: int) -> Optional[Issue]: - """Get issue by number.""" - cursor = self.connection.execute(""" - SELECT * FROM issues WHERE number = ? - """, (number,)) - row = cursor.fetchone() - return self._issue_from_row(row) if row else None - - def update_issue(self, issue: Issue) -> Issue: - """Update existing issue.""" - # Update main issue record - self.connection.execute(""" - UPDATE issues SET - title = ?, description = ?, state = ?, updated_at = ?, - closed_at = ?, milestone_id = ?, sync_metadata = ? - WHERE id = ? - """, ( - issue.title, - issue.description, - issue.state.value, - issue.updated_at.isoformat(), - issue.closed_at.isoformat() if issue.closed_at else None, - issue.milestone.id if issue.milestone else None, - json.dumps(issue.sync_metadata) if issue.sync_metadata else None, - issue.id - )) - - # Update labels (remove all and re-add) - self.connection.execute("DELETE FROM issue_labels WHERE issue_id = ?", (issue.id,)) - for label in issue.labels: - self._ensure_label_exists(label) - self.connection.execute(""" - INSERT INTO issue_labels (issue_id, label_id) VALUES (?, ?) - """, (issue.id, label.name)) - - # Update assignees (remove all and re-add) - self.connection.execute("DELETE FROM issue_assignees WHERE issue_id = ?", (issue.id,)) - for user in issue.assignees: - self._ensure_user_exists(user) - self.connection.execute(""" - INSERT INTO issue_assignees (issue_id, user_id) VALUES (?, ?) - """, (issue.id, user.id)) - - self.connection.commit() - return issue - - def delete_issue(self, issue_id: str) -> bool: - """Delete issue.""" - cursor = self.connection.execute("DELETE FROM issues WHERE id = ?", (issue_id,)) - self.connection.commit() - return cursor.rowcount > 0 - - def list_issues(self, filter_criteria: Optional[IssueFilter] = None) -> List[Issue]: - """List issues with optional filtering.""" - query = "SELECT * FROM issues WHERE 1=1" - params = [] - - if filter_criteria: - if filter_criteria.state: - query += " AND state = ?" - params.append(filter_criteria.state) - - if filter_criteria.search: - query += " AND (title LIKE ? OR description LIKE ?)" - search_term = f"%{filter_criteria.search}%" - params.extend([search_term, search_term]) - - if filter_criteria.created_after: - query += " AND created_at >= ?" - params.append(filter_criteria.created_after.isoformat()) - - if filter_criteria.created_before: - query += " AND created_at <= ?" - params.append(filter_criteria.created_before.isoformat()) - - if filter_criteria.updated_after: - query += " AND updated_at >= ?" - params.append(filter_criteria.updated_after.isoformat()) - - if filter_criteria.updated_before: - query += " AND updated_at <= ?" - params.append(filter_criteria.updated_before.isoformat()) - - query += " ORDER BY updated_at DESC" - - if filter_criteria and filter_criteria.limit: - query += " LIMIT ?" - params.append(filter_criteria.limit) - if filter_criteria.offset: - query += " OFFSET ?" - params.append(filter_criteria.offset) - - cursor = self.connection.execute(query, params) - rows = cursor.fetchall() - return [self._issue_from_row(row) for row in rows] - - def search_issues(self, query: str, limit: Optional[int] = None) -> List[Issue]: - """Search issues using FTS if available, otherwise fallback to LIKE.""" - try: - # Try FTS search first - fts_query = """ - SELECT i.* FROM issues i - JOIN issue_search s ON i.id = s.issue_id - WHERE issue_search MATCH ? - ORDER BY rank - """ - params = [query] - if limit: - fts_query += " LIMIT ?" - params.append(limit) - - cursor = self.connection.execute(fts_query, params) - rows = cursor.fetchall() - return [self._issue_from_row(row) for row in rows] - - except sqlite3.OperationalError: - # Fallback to LIKE search - filter_criteria = IssueFilter(search=query, limit=limit) - return self.list_issues(filter_criteria) - - # Helper methods - def _ensure_label_exists(self, label: Label) -> None: - """Ensure label exists in database.""" - self.connection.execute(""" - INSERT OR IGNORE INTO labels (id, name, color, description, backend_id) - VALUES (?, ?, ?, ?, ?) - """, (label.name, label.name, label.color, label.description, label.backend_id)) - - def _ensure_user_exists(self, user: User) -> None: - """Ensure user exists in database.""" - self.connection.execute(""" - INSERT OR IGNORE INTO users (id, username, display_name, email, avatar_url, backend_id) - VALUES (?, ?, ?, ?, ?, ?) - """, (user.id, user.username, user.display_name, user.email, user.avatar_url, user.backend_id)) - - # Label Operations - def create_label(self, label: Label) -> Label: - """Create a new label.""" - label_id = label.name # Use name as ID - self.connection.execute(""" - INSERT INTO labels (id, name, color, description, backend_id) - VALUES (?, ?, ?, ?, ?) - """, (label_id, label.name, label.color, label.description, label.backend_id)) - self.connection.commit() - return label - - def get_labels(self) -> List[Label]: - """Get all labels.""" - cursor = self.connection.execute("SELECT * FROM labels ORDER BY name") - rows = cursor.fetchall() - return [Label( - name=row['name'], - color=row['color'], - description=row['description'], - backend_id=row['backend_id'] - ) for row in rows] - - def update_label(self, label: Label) -> Label: - """Update label.""" - self.connection.execute(""" - UPDATE labels SET color = ?, description = ? WHERE name = ? - """, (label.color, label.description, label.name)) - self.connection.commit() - return label - - def delete_label(self, label_name: str) -> bool: - """Delete label.""" - cursor = self.connection.execute("DELETE FROM labels WHERE name = ?", (label_name,)) - self.connection.commit() - return cursor.rowcount > 0 - - # User Operations - def get_users(self) -> List[User]: - """Get all users.""" - cursor = self.connection.execute("SELECT * FROM users ORDER BY username") - rows = cursor.fetchall() - return [User( - id=row['id'], - username=row['username'], - display_name=row['display_name'], - email=row['email'], - avatar_url=row['avatar_url'], - backend_id=row['backend_id'] - ) for row in rows] - - def get_user(self, user_id: str) -> Optional[User]: - """Get user by ID.""" - cursor = self.connection.execute("SELECT * FROM users WHERE id = ?", (user_id,)) - row = cursor.fetchone() - if row: - return User( - id=row['id'], - username=row['username'], - display_name=row['display_name'], - email=row['email'], - avatar_url=row['avatar_url'], - backend_id=row['backend_id'] - ) - return None - - def search_users(self, query: str) -> List[User]: - """Search users.""" - cursor = self.connection.execute(""" - SELECT * FROM users - WHERE username LIKE ? OR display_name LIKE ? OR email LIKE ? - ORDER BY username - """, (f"%{query}%", f"%{query}%", f"%{query}%")) - rows = cursor.fetchall() - return [User( - id=row['id'], - username=row['username'], - display_name=row['display_name'], - email=row['email'], - avatar_url=row['avatar_url'], - backend_id=row['backend_id'] - ) for row in rows] - - # Milestone Operations - def create_milestone(self, milestone: Milestone) -> Milestone: - """Create milestone.""" - if not milestone.id: - milestone.id = str(uuid.uuid4()) - - self.connection.execute(""" - INSERT INTO milestones (id, title, description, state, due_date, created_at, updated_at, backend_id) - VALUES (?, ?, ?, ?, ?, ?, ?, ?) - """, ( - milestone.id, - milestone.title, - milestone.description, - milestone.state, - milestone.due_date.isoformat() if milestone.due_date else None, - milestone.created_at.isoformat() if milestone.created_at else datetime.now(timezone.utc).isoformat(), - milestone.updated_at.isoformat() if milestone.updated_at else datetime.now(timezone.utc).isoformat(), - milestone.backend_id - )) - self.connection.commit() - return milestone - - def get_milestones(self) -> List[Milestone]: - """Get all milestones.""" - cursor = self.connection.execute("SELECT * FROM milestones ORDER BY title") - rows = cursor.fetchall() - return [Milestone( - id=row['id'], - title=row['title'], - description=row['description'], - state=row['state'], - due_date=datetime.fromisoformat(row['due_date']) if row['due_date'] else None, - created_at=datetime.fromisoformat(row['created_at']) if row['created_at'] else None, - updated_at=datetime.fromisoformat(row['updated_at']) if row['updated_at'] else None, - backend_id=row['backend_id'] - ) for row in rows] - - def update_milestone(self, milestone: Milestone) -> Milestone: - """Update milestone.""" - self.connection.execute(""" - UPDATE milestones SET title = ?, description = ?, state = ?, due_date = ?, updated_at = ? - WHERE id = ? - """, ( - milestone.title, - milestone.description, - milestone.state, - milestone.due_date.isoformat() if milestone.due_date else None, - datetime.now(timezone.utc).isoformat(), - milestone.id - )) - self.connection.commit() - return milestone - - def delete_milestone(self, milestone_id: str) -> bool: - """Delete milestone.""" - cursor = self.connection.execute("DELETE FROM milestones WHERE id = ?", (milestone_id,)) - self.connection.commit() - return cursor.rowcount > 0 - - # Comment Operations - def add_comment(self, issue_id: str, comment: Comment) -> Comment: - """Add comment to issue.""" - if not comment.id: - comment.id = str(uuid.uuid4()) - - self._ensure_user_exists(comment.author) - - self.connection.execute(""" - INSERT INTO comments (id, issue_id, author_id, body, created_at, updated_at, backend_id) - VALUES (?, ?, ?, ?, ?, ?, ?) - """, ( - comment.id, - issue_id, - comment.author.id, - comment.body, - comment.created_at.isoformat(), - comment.updated_at.isoformat() if comment.updated_at else None, - comment.backend_id - )) - self.connection.commit() - return comment - - def get_comments(self, issue_id: str) -> List[Comment]: - """Get comments for issue.""" - cursor = self.connection.execute(""" - SELECT c.*, u.id as user_id, u.username, u.display_name, u.email, u.avatar_url, u.backend_id as user_backend_id - FROM comments c - JOIN users u ON c.author_id = u.id - WHERE c.issue_id = ? - ORDER BY c.created_at - """, (issue_id,)) - rows = cursor.fetchall() - - comments = [] - for row in rows: - author = User( - id=row['user_id'], - username=row['username'], - display_name=row['display_name'], - email=row['email'], - avatar_url=row['avatar_url'], - backend_id=row['user_backend_id'] - ) - comment = Comment( - id=row['id'], - body=row['body'], - author=author, - created_at=datetime.fromisoformat(row['created_at']), - updated_at=datetime.fromisoformat(row['updated_at']) if row['updated_at'] else None, - backend_id=row['backend_id'] - ) - comments.append(comment) - - return comments - - def update_comment(self, comment: Comment) -> Comment: - """Update comment.""" - self.connection.execute(""" - UPDATE comments SET body = ?, updated_at = ? WHERE id = ? - """, (comment.body, datetime.now(timezone.utc).isoformat(), comment.id)) - self.connection.commit() - return comment - - def delete_comment(self, comment_id: str) -> bool: - """Delete comment.""" - cursor = self.connection.execute("DELETE FROM comments WHERE id = ?", (comment_id,)) - self.connection.commit() - return cursor.rowcount > 0 - - # Sync Support - def get_last_sync_timestamp(self) -> Optional[datetime]: - """Get last sync timestamp.""" - cursor = self.connection.execute(""" - SELECT sync_timestamp FROM sync_history - WHERE success = 1 - ORDER BY sync_timestamp DESC - LIMIT 1 - """) - row = cursor.fetchone() - return datetime.fromisoformat(row[0]) if row else None - - def get_issues_modified_since(self, timestamp: datetime) -> List[Issue]: - """Get issues modified since timestamp.""" - filter_criteria = IssueFilter(updated_after=timestamp) - return self.list_issues(filter_criteria) - - # SyncableBackend Implementation - def prepare_for_sync(self) -> None: - """Prepare for sync operation.""" - # Could create backup or start transaction - pass - - def finalize_sync(self, success: bool) -> None: - """Finalize sync operation.""" - # Log sync operation - self.connection.execute(""" - INSERT INTO sync_history (backend_type, success, sync_timestamp) - VALUES (?, ?, ?) - """, ('sync', success, datetime.now(timezone.utc).isoformat())) - self.connection.commit() - - def get_sync_conflicts(self) -> List[Dict[str, Any]]: - """Get sync conflicts.""" - # For local backend, no conflicts since it's the source of truth - return [] - - def resolve_sync_conflict(self, issue_id: str, resolution: str) -> Issue: - """Resolve sync conflict.""" - # Local backend doesn't have conflicts - return self.get_issue(issue_id) \ No newline at end of file diff --git a/issue-facade/backends/local/schema.sql b/issue-facade/backends/local/schema.sql deleted file mode 100644 index 1dda8088..00000000 --- a/issue-facade/backends/local/schema.sql +++ /dev/null @@ -1,189 +0,0 @@ --- Local Issue Tracking Database Schema --- SQLite schema for local issue storage with full referential integrity - --- Enable foreign key constraints -PRAGMA foreign_keys = ON; - --- Issues table - core issue data -CREATE TABLE IF NOT EXISTS issues ( - id TEXT PRIMARY KEY, - number INTEGER UNIQUE NOT NULL, - title TEXT NOT NULL, - description TEXT NOT NULL DEFAULT '', - state TEXT NOT NULL CHECK (state IN ('open', 'closed', 'in_progress', 'blocked')), - created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - closed_at TIMESTAMP NULL, - milestone_id TEXT, - backend_id TEXT, - backend_type TEXT DEFAULT 'local', - sync_metadata TEXT, -- JSON for sync data - FOREIGN KEY (milestone_id) REFERENCES milestones(id) ON DELETE SET NULL -); - --- Create index for issue number lookups -CREATE INDEX IF NOT EXISTS idx_issues_number ON issues(number); -CREATE INDEX IF NOT EXISTS idx_issues_state ON issues(state); -CREATE INDEX IF NOT EXISTS idx_issues_updated_at ON issues(updated_at); -CREATE INDEX IF NOT EXISTS idx_issues_backend_id ON issues(backend_id); - --- Labels table -CREATE TABLE IF NOT EXISTS labels ( - id TEXT PRIMARY KEY, - name TEXT UNIQUE NOT NULL, - color TEXT, - description TEXT, - backend_id TEXT, - created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP -); - --- Create index for label name lookups -CREATE INDEX IF NOT EXISTS idx_labels_name ON labels(name); - --- Issue-Label many-to-many relationship -CREATE TABLE IF NOT EXISTS issue_labels ( - issue_id TEXT NOT NULL, - label_id TEXT NOT NULL, - created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (issue_id, label_id), - FOREIGN KEY (issue_id) REFERENCES issues(id) ON DELETE CASCADE, - FOREIGN KEY (label_id) REFERENCES labels(id) ON DELETE CASCADE -); - --- Users table -CREATE TABLE IF NOT EXISTS users ( - id TEXT PRIMARY KEY, - username TEXT UNIQUE NOT NULL, - display_name TEXT, - email TEXT, - avatar_url TEXT, - backend_id TEXT, - created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP -); - --- Create index for username lookups -CREATE INDEX IF NOT EXISTS idx_users_username ON users(username); - --- Issue-User assignment many-to-many relationship -CREATE TABLE IF NOT EXISTS issue_assignees ( - issue_id TEXT NOT NULL, - user_id TEXT NOT NULL, - assigned_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (issue_id, user_id), - FOREIGN KEY (issue_id) REFERENCES issues(id) ON DELETE CASCADE, - FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE -); - --- Milestones table -CREATE TABLE IF NOT EXISTS milestones ( - id TEXT PRIMARY KEY, - title TEXT NOT NULL, - description TEXT, - state TEXT NOT NULL DEFAULT 'open' CHECK (state IN ('open', 'closed')), - due_date TIMESTAMP, - created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - backend_id TEXT -); - --- Create index for milestone title lookups -CREATE INDEX IF NOT EXISTS idx_milestones_title ON milestones(title); - --- Comments table -CREATE TABLE IF NOT EXISTS comments ( - id TEXT PRIMARY KEY, - issue_id TEXT NOT NULL, - author_id TEXT NOT NULL, - body TEXT NOT NULL, - created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - updated_at TIMESTAMP, - backend_id TEXT, - FOREIGN KEY (issue_id) REFERENCES issues(id) ON DELETE CASCADE, - FOREIGN KEY (author_id) REFERENCES users(id) ON DELETE CASCADE -); - --- Create index for comment lookups -CREATE INDEX IF NOT EXISTS idx_comments_issue_id ON comments(issue_id); -CREATE INDEX IF NOT EXISTS idx_comments_created_at ON comments(created_at); - --- Sync tracking table -CREATE TABLE IF NOT EXISTS sync_history ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - backend_type TEXT NOT NULL, - sync_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - success BOOLEAN NOT NULL, - issues_synced INTEGER DEFAULT 0, - errors_count INTEGER DEFAULT 0, - details TEXT -- JSON for sync details -); - --- Configuration table for backend settings -CREATE TABLE IF NOT EXISTS config ( - key TEXT PRIMARY KEY, - value TEXT NOT NULL, - updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP -); - --- Triggers to automatically update updated_at timestamps -CREATE TRIGGER IF NOT EXISTS update_issues_timestamp - AFTER UPDATE ON issues -BEGIN - UPDATE issues SET updated_at = CURRENT_TIMESTAMP WHERE id = NEW.id; -END; - -CREATE TRIGGER IF NOT EXISTS update_milestones_timestamp - AFTER UPDATE ON milestones -BEGIN - UPDATE milestones SET updated_at = CURRENT_TIMESTAMP WHERE id = NEW.id; -END; - --- Views for common queries -CREATE VIEW IF NOT EXISTS issue_summary AS -SELECT - i.id, - i.number, - i.title, - i.state, - i.created_at, - i.updated_at, - i.closed_at, - m.title as milestone_title, - COUNT(c.id) as comment_count, - GROUP_CONCAT(l.name) as labels, - GROUP_CONCAT(u.username) as assignees -FROM issues i -LEFT JOIN milestones m ON i.milestone_id = m.id -LEFT JOIN comments c ON i.id = c.issue_id -LEFT JOIN issue_labels il ON i.id = il.issue_id -LEFT JOIN labels l ON il.label_id = l.id -LEFT JOIN issue_assignees ia ON i.id = ia.issue_id -LEFT JOIN users u ON ia.user_id = u.id -GROUP BY i.id, i.number, i.title, i.state, i.created_at, i.updated_at, i.closed_at, m.title; - --- Full-text search setup (if SQLite supports FTS) -CREATE VIRTUAL TABLE IF NOT EXISTS issue_search USING fts5( - issue_id, - title, - description, - labels, - content='issues' -); - --- Trigger to keep FTS index updated -CREATE TRIGGER IF NOT EXISTS issue_search_insert AFTER INSERT ON issues -BEGIN - INSERT INTO issue_search(issue_id, title, description) - VALUES (NEW.id, NEW.title, NEW.description); -END; - -CREATE TRIGGER IF NOT EXISTS issue_search_update AFTER UPDATE ON issues -BEGIN - UPDATE issue_search - SET title = NEW.title, description = NEW.description - WHERE issue_id = NEW.id; -END; - -CREATE TRIGGER IF NOT EXISTS issue_search_delete AFTER DELETE ON issues -BEGIN - DELETE FROM issue_search WHERE issue_id = OLD.id; -END; \ No newline at end of file diff --git a/issue-facade/cli/__init__.py b/issue-facade/cli/__init__.py deleted file mode 100644 index fee37168..00000000 --- a/issue-facade/cli/__init__.py +++ /dev/null @@ -1,20 +0,0 @@ -""" -Command Line Interface for Universal Issue Tracking - -Provides a comprehensive CLI for managing issues across different backends. -The CLI is designed to be intuitive and follows common patterns from -tools like git, gh (GitHub CLI), and similar utilities. - -Commands: -- issue list: List issues -- issue show: Show issue details -- issue create: Create new issue -- issue edit: Edit existing issue -- issue close: Close issue -- issue reopen: Reopen issue -- issue comment: Add comment -- issue label: Manage labels -- issue assign: Manage assignments -- backend: Manage backends -- sync: Synchronization operations -""" \ No newline at end of file diff --git a/issue-facade/cli/backend_commands.py b/issue-facade/cli/backend_commands.py deleted file mode 100644 index 580b5d9b..00000000 --- a/issue-facade/cli/backend_commands.py +++ /dev/null @@ -1,141 +0,0 @@ -""" -Backend Management CLI Commands - -Commands for configuring and managing issue tracking backends. -""" - -import click -from .utils import ( - load_backend_configs, save_backend_configs, format_backend_list, - test_backend_connection, validate_backend_type, echo_success, - echo_error, echo_warning, confirm_action -) - - -@click.group() -def backend_group(): - """Backend configuration and management.""" - pass - - -@backend_group.command('list') -def list_backends(): - """List configured backends.""" - configs = load_backend_configs() - click.echo(format_backend_list(configs)) - - -@backend_group.command('add') -@click.argument('name') -@click.argument('backend_type', type=click.Choice(['local', 'gitea'])) -@click.pass_context -def add_backend(ctx, name, backend_type): - """Add a new backend configuration.""" - configs = load_backend_configs() - - if name in configs: - if not confirm_action(f"Backend '{name}' already exists. Overwrite?"): - click.echo("Aborted") - return - - if backend_type == 'local': - db_path = click.prompt('Database path', default=f'~/.config/issue-tracker/{name}.db') - config = { - 'type': 'local', - 'db_path': str(db_path) - } - elif backend_type == 'gitea': - base_url = click.prompt('Gitea base URL (e.g., https://git.example.com)') - owner = click.prompt('Repository owner/organization') - repo = click.prompt('Repository name') - token = click.prompt('Access token', hide_input=True) - - config = { - 'type': 'gitea', - 'base_url': base_url.rstrip('/'), - 'owner': owner, - 'repo': repo, - 'token': token - } - - # Test connection - click.echo("Testing connection...") - if test_backend_connection(config): - echo_success("Connection successful!") - else: - echo_warning("Connection test failed, but configuration will be saved anyway.") - - # Save configuration - configs[name] = config - save_backend_configs(configs) - - echo_success(f"Backend '{name}' added successfully") - - # Set as default if it's the first one - if 'default' not in configs: - configs['default'] = name - save_backend_configs(configs) - echo_info(f"Set '{name}' as default backend") - - -@backend_group.command('remove') -@click.argument('name') -def remove_backend(name): - """Remove a backend configuration.""" - configs = load_backend_configs() - - if name not in configs: - echo_error(f"Backend '{name}' not found") - return - - if not confirm_action(f"Remove backend '{name}'?"): - click.echo("Aborted") - return - - del configs[name] - - # Update default if necessary - if configs.get('default') == name: - remaining_backends = [k for k in configs.keys() if k != 'default'] - if remaining_backends: - configs['default'] = remaining_backends[0] - echo_info(f"Set '{configs['default']}' as new default backend") - else: - del configs['default'] - - save_backend_configs(configs) - echo_success(f"Backend '{name}' removed") - - -@backend_group.command('test') -@click.argument('name') -def test_backend(name): - """Test backend connection.""" - configs = load_backend_configs() - - if name not in configs: - echo_error(f"Backend '{name}' not found") - return - - config = configs[name] - click.echo(f"Testing connection to '{name}'...") - - if test_backend_connection(config): - echo_success("Connection successful!") - else: - echo_error("Connection failed!") - - -@backend_group.command('set-default') -@click.argument('name') -def set_default_backend(name): - """Set default backend.""" - configs = load_backend_configs() - - if name not in configs: - echo_error(f"Backend '{name}' not found") - return - - configs['default'] = name - save_backend_configs(configs) - echo_success(f"Set '{name}' as default backend") \ No newline at end of file diff --git a/issue-facade/cli/commands.py b/issue-facade/cli/commands.py deleted file mode 100644 index 09ff08b2..00000000 --- a/issue-facade/cli/commands.py +++ /dev/null @@ -1,417 +0,0 @@ -""" -Issue Management CLI Commands - -Core commands for managing issues: create, list, show, edit, close, etc. -""" - -import click -from datetime import datetime, timezone -from typing import Optional - -from ..core.models import Issue, Label, User, IssueState, Priority, IssueType -from ..core.interfaces import IssueFilter -from .utils import get_backend, format_issue, format_issue_list, get_user_input - - -@click.group() -def issue_group(): - """Issue management commands.""" - pass - - -@issue_group.command('list') -@click.option('--state', type=click.Choice(['open', 'closed', 'all']), default='open', help='Issue state filter') -@click.option('--assignee', help='Filter by assignee') -@click.option('--label', multiple=True, help='Filter by labels') -@click.option('--milestone', help='Filter by milestone') -@click.option('--search', help='Search in title and description') -@click.option('--limit', type=int, default=30, help='Maximum number of issues to show') -@click.option('--format', 'output_format', type=click.Choice(['table', 'json', 'compact']), default='table', help='Output format') -@click.pass_context -def list_issues(ctx, state, assignee, label, milestone, search, limit, output_format): - """List issues with optional filtering.""" - backend = get_backend(ctx) - - # Build filter criteria - filter_criteria = IssueFilter( - state=None if state == 'all' else state, - assignee=assignee, - labels=list(label) if label else None, - milestone=milestone, - search=search, - limit=limit - ) - - try: - issues = backend.list_issues(filter_criteria) - - if not issues: - click.echo("No issues found.") - return - - if output_format == 'json': - import json - click.echo(json.dumps([issue.to_dict() for issue in issues], indent=2)) - elif output_format == 'compact': - for issue in issues: - labels_str = ', '.join(label.name for label in issue.labels[:3]) - if len(issue.labels) > 3: - labels_str += f' (+{len(issue.labels) - 3} more)' - - assignee_str = issue.primary_assignee.username if issue.primary_assignee else 'unassigned' - - click.echo(f"#{issue.number:4d} {issue.state.value:10s} {issue.title[:50]:50s} {assignee_str:15s} {labels_str}") - else: # table format - click.echo(format_issue_list(issues)) - - except Exception as e: - raise click.ClickException(f"Failed to list issues: {e}") - - -@issue_group.command('show') -@click.argument('issue_number', type=int) -@click.option('--comments', is_flag=True, help='Show comments') -@click.option('--format', 'output_format', type=click.Choice(['detailed', 'json', 'compact']), default='detailed', help='Output format') -@click.pass_context -def show_issue(ctx, issue_number, comments, output_format): - """Show detailed information about an issue.""" - backend = get_backend(ctx) - - try: - issue = backend.get_issue_by_number(issue_number) - if not issue: - raise click.ClickException(f"Issue #{issue_number} not found") - - if output_format == 'json': - import json - issue_dict = issue.to_dict() - if comments: - issue_dict['comments'] = [ - { - 'id': c.id, - 'body': c.body, - 'author': c.author.username, - 'created_at': c.created_at.isoformat() - } - for c in backend.get_comments(issue.id) - ] - click.echo(json.dumps(issue_dict, indent=2)) - else: - click.echo(format_issue(issue, show_comments=comments, backend=backend if comments else None)) - - except Exception as e: - raise click.ClickException(f"Failed to show issue: {e}") - - -@issue_group.command('create') -@click.argument('title') -@click.option('--description', '-d', help='Issue description') -@click.option('--label', '-l', multiple=True, help='Labels to add') -@click.option('--assignee', '-a', help='Assign to user') -@click.option('--milestone', '-m', help='Milestone') -@click.option('--priority', type=click.Choice(['low', 'medium', 'high', 'critical']), help='Issue priority') -@click.option('--type', 'issue_type', type=click.Choice(['bug', 'feature', 'enhancement', 'task', 'documentation', 'question']), help='Issue type') -@click.option('--interactive', '-i', is_flag=True, help='Interactive mode') -@click.pass_context -def create_issue(ctx, title, description, label, assignee, milestone, priority, issue_type, interactive): - """Create a new issue.""" - backend = get_backend(ctx) - - try: - # Interactive mode - if interactive: - title = title or click.prompt('Title') - description = get_user_input('Description (optional)', multiline=True) - - # Show available labels - available_labels = backend.get_labels() - if available_labels: - click.echo(f"\nAvailable labels: {', '.join(l.name for l in available_labels)}") - label = click.prompt('Labels (comma-separated, optional)', default='').split(',') if click.prompt('Add labels?', type=bool, default=False) else [] - - # Show available users - available_users = backend.get_users() - if available_users: - click.echo(f"\nAvailable users: {', '.join(u.username for u in available_users)}") - assignee = click.prompt('Assignee (optional)', default='') or None - - # Show available milestones - available_milestones = backend.get_milestones() - if available_milestones: - click.echo(f"\nAvailable milestones: {', '.join(m.title for m in available_milestones)}") - milestone = click.prompt('Milestone (optional)', default='') or None - - # Build labels list - labels = [] - - # Add explicit labels - for label_name in label: - label_name = label_name.strip() - if label_name: - labels.append(Label(name=label_name)) - - # Add priority label - if priority: - labels.append(Label(name=f'priority:{priority}')) - - # Add type label - if issue_type: - labels.append(Label(name=issue_type)) - - # Build assignees list - assignees = [] - if assignee: - # Try to find user - users = backend.search_users(assignee) - if users: - assignees.append(users[0]) - else: - # Create a basic user object - assignees.append(User(id=assignee, username=assignee)) - - # Find milestone - milestone_obj = None - if milestone: - milestones = backend.get_milestones() - for m in milestones: - if m.title == milestone or m.id == milestone: - milestone_obj = m - break - - # Create issue - now = datetime.now(timezone.utc) - issue = Issue( - id="", # Will be set by backend - number=0, # Will be set by backend - title=title, - description=description or "", - state=IssueState.OPEN, - created_at=now, - updated_at=now, - labels=labels, - assignees=assignees, - milestone=milestone_obj - ) - - created_issue = backend.create_issue(issue) - click.echo(f"Created issue #{created_issue.number}: {created_issue.title}") - - if ctx.obj.get('verbose'): - click.echo(format_issue(created_issue)) - - except Exception as e: - raise click.ClickException(f"Failed to create issue: {e}") - - -@issue_group.command('edit') -@click.argument('issue_number', type=int) -@click.option('--title', help='New title') -@click.option('--description', help='New description') -@click.option('--add-label', multiple=True, help='Labels to add') -@click.option('--remove-label', multiple=True, help='Labels to remove') -@click.option('--assign', help='User to assign') -@click.option('--unassign', help='User to unassign') -@click.option('--milestone', help='Milestone to set') -@click.option('--interactive', '-i', is_flag=True, help='Interactive editing') -@click.pass_context -def edit_issue(ctx, issue_number, title, description, add_label, remove_label, assign, unassign, milestone, interactive): - """Edit an existing issue.""" - backend = get_backend(ctx) - - try: - issue = backend.get_issue_by_number(issue_number) - if not issue: - raise click.ClickException(f"Issue #{issue_number} not found") - - # Interactive mode - if interactive: - click.echo(f"Editing issue #{issue.number}: {issue.title}") - - new_title = click.prompt('Title', default=issue.title) - if new_title != issue.title: - title = new_title - - new_description = get_user_input('Description', default=issue.description, multiline=True) - if new_description != issue.description: - description = new_description - - # Apply changes - modified = False - - if title: - issue.title = title - modified = True - - if description is not None: - issue.description = description - modified = True - - # Add labels - for label_name in add_label: - issue.add_label(Label(name=label_name.strip())) - modified = True - - # Remove labels - for label_name in remove_label: - if issue.remove_label(label_name.strip()): - modified = True - - # Assign user - if assign: - users = backend.search_users(assign) - if users: - issue.add_assignee(users[0]) - modified = True - else: - issue.add_assignee(User(id=assign, username=assign)) - modified = True - - # Unassign user - if unassign: - if issue.remove_assignee(unassign): - modified = True - - # Set milestone - if milestone: - milestones = backend.get_milestones() - for m in milestones: - if m.title == milestone or m.id == milestone: - issue.milestone = m - modified = True - break - - if modified: - issue.updated_at = datetime.now(timezone.utc) - updated_issue = backend.update_issue(issue) - click.echo(f"Updated issue #{updated_issue.number}") - - if ctx.obj.get('verbose'): - click.echo(format_issue(updated_issue)) - else: - click.echo("No changes made") - - except Exception as e: - raise click.ClickException(f"Failed to edit issue: {e}") - - -@issue_group.command('close') -@click.argument('issue_number', type=int) -@click.option('--comment', '-c', help='Closing comment') -@click.pass_context -def close_issue(ctx, issue_number, comment): - """Close an issue.""" - backend = get_backend(ctx) - - try: - issue = backend.get_issue_by_number(issue_number) - if not issue: - raise click.ClickException(f"Issue #{issue_number} not found") - - if issue.state == IssueState.CLOSED: - click.echo(f"Issue #{issue_number} is already closed") - return - - # Close the issue - issue.close() - - # Add closing comment if provided - if comment: - from ..core.models import Comment - current_user = User(id="cli-user", username="cli-user") # TODO: Get actual user - closing_comment = Comment( - id="", - body=comment, - author=current_user, - created_at=datetime.now(timezone.utc) - ) - backend.add_comment(issue.id, closing_comment) - - updated_issue = backend.update_issue(issue) - click.echo(f"Closed issue #{updated_issue.number}: {updated_issue.title}") - - except Exception as e: - raise click.ClickException(f"Failed to close issue: {e}") - - -@issue_group.command('reopen') -@click.argument('issue_number', type=int) -@click.option('--comment', '-c', help='Reopening comment') -@click.pass_context -def reopen_issue(ctx, issue_number, comment): - """Reopen a closed issue.""" - backend = get_backend(ctx) - - try: - issue = backend.get_issue_by_number(issue_number) - if not issue: - raise click.ClickException(f"Issue #{issue_number} not found") - - if issue.state != IssueState.CLOSED: - click.echo(f"Issue #{issue_number} is not closed (current state: {issue.state.value})") - return - - # Reopen the issue - issue.reopen() - - # Add reopening comment if provided - if comment: - from ..core.models import Comment - current_user = User(id="cli-user", username="cli-user") # TODO: Get actual user - reopening_comment = Comment( - id="", - body=comment, - author=current_user, - created_at=datetime.now(timezone.utc) - ) - backend.add_comment(issue.id, reopening_comment) - - updated_issue = backend.update_issue(issue) - click.echo(f"Reopened issue #{updated_issue.number}: {updated_issue.title}") - - except Exception as e: - raise click.ClickException(f"Failed to reopen issue: {e}") - - -@issue_group.command('comment') -@click.argument('issue_number', type=int) -@click.argument('comment_text', required=False) -@click.option('--editor', is_flag=True, help='Open editor for comment') -@click.pass_context -def add_comment(ctx, issue_number, comment_text, editor): - """Add a comment to an issue.""" - backend = get_backend(ctx) - - try: - issue = backend.get_issue_by_number(issue_number) - if not issue: - raise click.ClickException(f"Issue #{issue_number} not found") - - # Get comment text - if editor: - comment_text = click.edit() or "" - elif not comment_text: - comment_text = get_user_input("Comment", multiline=True) - - if not comment_text.strip(): - click.echo("Empty comment, aborting") - return - - # Create comment - from ..core.models import Comment - current_user = User(id="cli-user", username="cli-user") # TODO: Get actual user - comment = Comment( - id="", - body=comment_text.strip(), - author=current_user, - created_at=datetime.now(timezone.utc) - ) - - added_comment = backend.add_comment(issue.id, comment) - click.echo(f"Added comment to issue #{issue_number}") - - if ctx.obj.get('verbose'): - click.echo(f"\nComment by {added_comment.author.username} at {added_comment.created_at}:") - click.echo(added_comment.body) - - except Exception as e: - raise click.ClickException(f"Failed to add comment: {e}") \ No newline at end of file diff --git a/issue-facade/cli/main.py b/issue-facade/cli/main.py deleted file mode 100644 index 567e1a4f..00000000 --- a/issue-facade/cli/main.py +++ /dev/null @@ -1,117 +0,0 @@ -""" -Main CLI Entry Point - -Universal Issue Tracking System CLI -""" - -import click -import sys -from pathlib import Path - -from .commands import issue_group -from .backend_commands import backend_group -from .sync_commands import sync_group - - -@click.group() -@click.version_option() -@click.option('--config', type=click.Path(), help='Configuration file path') -@click.option('--backend', help='Backend to use (local, gitea)') -@click.option('--verbose', '-v', is_flag=True, help='Verbose output') -@click.pass_context -def cli(ctx, config, backend, verbose): - """ - Universal Issue Tracking System - - A backend-agnostic issue tracking tool that works with local SQLite, - Gitea, GitHub, and other issue tracking systems. - - Examples: - issue list # List all issues - issue create "Bug in parser" # Create new issue - issue show 42 # Show issue #42 - issue close 42 # Close issue #42 - - backend add local ~/.issues # Add local backend - backend add gitea myrepo # Add Gitea backend - - sync pull gitea # Sync from Gitea - sync push gitea # Sync to Gitea - """ - # Ensure the object exists - ctx.ensure_object(dict) - - # Store global options in context - ctx.obj['config_path'] = config - ctx.obj['backend'] = backend - ctx.obj['verbose'] = verbose - - -# Register command groups -cli.add_command(issue_group, name='issue') -cli.add_command(backend_group, name='backend') -cli.add_command(sync_group, name='sync') - - -# Convenience aliases - direct issue commands -@cli.command('list') -@click.pass_context -def list_issues(ctx): - """List all issues (alias for 'issue list').""" - ctx.invoke(issue_group.get_command(ctx, 'list')) - - -@cli.command('show') -@click.argument('issue_number', type=int) -@click.pass_context -def show_issue(ctx, issue_number): - """Show issue details (alias for 'issue show').""" - ctx.invoke(issue_group.get_command(ctx, 'show'), issue_number=issue_number) - - -@cli.command('create') -@click.argument('title') -@click.option('--description', '-d', help='Issue description') -@click.option('--label', '-l', multiple=True, help='Labels to add') -@click.option('--assignee', '-a', help='Assign to user') -@click.option('--milestone', '-m', help='Milestone') -@click.pass_context -def create_issue(ctx, title, description, label, assignee, milestone): - """Create new issue (alias for 'issue create').""" - ctx.invoke( - issue_group.get_command(ctx, 'create'), - title=title, - description=description, - label=label, - assignee=assignee, - milestone=milestone - ) - - -@cli.command('close') -@click.argument('issue_number', type=int) -@click.option('--comment', '-c', help='Closing comment') -@click.pass_context -def close_issue(ctx, issue_number, comment): - """Close issue (alias for 'issue close').""" - ctx.invoke( - issue_group.get_command(ctx, 'close'), - issue_number=issue_number, - comment=comment - ) - - -def main(): - """Main entry point for the CLI.""" - try: - cli(obj={}) - except KeyboardInterrupt: - click.echo("\nAborted by user", err=True) - sys.exit(1) - except Exception as e: - click.echo(f"Error: {e}", err=True) - sys.exit(1) - - -if __name__ == '__main__': - main() \ No newline at end of file diff --git a/issue-facade/cli/sync_commands.py b/issue-facade/cli/sync_commands.py deleted file mode 100644 index 11d8a54f..00000000 --- a/issue-facade/cli/sync_commands.py +++ /dev/null @@ -1,235 +0,0 @@ -""" -Synchronization CLI Commands - -Commands for synchronizing issues between different backends. -""" - -import click -from datetime import datetime, timezone -from .utils import ( - load_backend_configs, get_backend, echo_success, echo_error, - echo_warning, echo_info, progress_bar, confirm_action -) -from ..core.interfaces import BackendFactory - - -@click.group() -def sync_group(): - """Issue synchronization between backends.""" - pass - - -@sync_group.command('status') -@click.argument('backend_name', required=False) -@click.pass_context -def sync_status(ctx, backend_name): - """Show sync status for backends.""" - configs = load_backend_configs() - - if backend_name: - backends_to_check = [backend_name] if backend_name in configs else [] - if not backends_to_check: - echo_error(f"Backend '{backend_name}' not found") - return - else: - backends_to_check = [k for k in configs.keys() if k != 'default'] - - for name in backends_to_check: - config = configs[name] - try: - backend = BackendFactory.create_backend(config['type']) - backend.connect(config) - - # Get basic stats - all_issues = backend.list_issues() - open_issues = [i for i in all_issues if i.state.value != 'closed'] - - click.echo(f"\n{name} ({config['type']}):") - click.echo(f" Total issues: {len(all_issues)}") - click.echo(f" Open issues: {len(open_issues)}") - click.echo(f" Closed issues: {len(all_issues) - len(open_issues)}") - - # Check last sync - if hasattr(backend, 'get_last_sync_timestamp'): - last_sync = backend.get_last_sync_timestamp() - if last_sync: - click.echo(f" Last sync: {last_sync}") - else: - click.echo(f" Last sync: never") - - backend.disconnect() - - except Exception as e: - echo_error(f" Error accessing {name}: {e}") - - -@sync_group.command('pull') -@click.argument('source_backend') -@click.option('--target', default='local', help='Target backend (default: local)') -@click.option('--dry-run', is_flag=True, help='Show what would be synced without making changes') -@click.option('--force', is_flag=True, help='Force sync even with conflicts') -@click.pass_context -def sync_pull(ctx, source_backend, target, dry_run, force): - """Pull issues from source backend to target backend.""" - configs = load_backend_configs() - - if source_backend not in configs: - echo_error(f"Source backend '{source_backend}' not found") - return - - if target not in configs: - echo_error(f"Target backend '{target}' not found") - return - - if source_backend == target: - echo_error("Source and target backends cannot be the same") - return - - try: - # Connect to backends - source_config = configs[source_backend] - target_config = configs[target] - - source = BackendFactory.create_backend(source_config['type']) - source.connect(source_config) - - target = BackendFactory.create_backend(target_config['type']) - target.connect(target_config) - - echo_info(f"Syncing from {source_backend} to {target}") - - # Get issues from source - source_issues = source.list_issues() - echo_info(f"Found {len(source_issues)} issues in source") - - # Get existing issues in target - target_issues = target.list_issues() - target_numbers = {issue.number for issue in target_issues} - - # Determine what needs to be synced - new_issues = [] - updated_issues = [] - - for issue in source_issues: - if issue.number not in target_numbers: - new_issues.append(issue) - else: - # Check if update is needed (simplified check by updated_at) - target_issue = next((i for i in target_issues if i.number == issue.number), None) - if target_issue and issue.updated_at > target_issue.updated_at: - updated_issues.append(issue) - - echo_info(f"New issues to sync: {len(new_issues)}") - echo_info(f"Updated issues to sync: {len(updated_issues)}") - - if dry_run: - if new_issues: - click.echo("\nNew issues:") - for issue in new_issues: - click.echo(f" #{issue.number}: {issue.title}") - - if updated_issues: - click.echo("\nUpdated issues:") - for issue in updated_issues: - click.echo(f" #{issue.number}: {issue.title}") - - click.echo(f"\nDry run complete. {len(new_issues + updated_issues)} issues would be synced.") - return - - # Confirm sync - total_sync = len(new_issues) + len(updated_issues) - if total_sync == 0: - echo_success("No issues need syncing") - return - - if not force and not confirm_action(f"Sync {total_sync} issues?"): - click.echo("Aborted") - return - - # Perform sync - synced_count = 0 - errors = [] - - all_to_sync = new_issues + updated_issues - with progress_bar(all_to_sync, label="Syncing issues") as items: - for issue in items: - try: - # Clear backend-specific IDs for new backend - issue.backend_id = None - issue.backend_type = target_config['type'] - - if issue in new_issues: - target.create_issue(issue) - else: - # For updates, we need to find the target issue and update it - target_issue = target.get_issue_by_number(issue.number) - if target_issue: - # Copy relevant fields - target_issue.title = issue.title - target_issue.description = issue.description - target_issue.state = issue.state - target_issue.labels = issue.labels - target_issue.assignees = issue.assignees - target_issue.milestone = issue.milestone - target_issue.updated_at = issue.updated_at - target_issue.closed_at = issue.closed_at - target.update_issue(target_issue) - - synced_count += 1 - - except Exception as e: - errors.append(f"Issue #{issue.number}: {e}") - - # Report results - echo_success(f"Synced {synced_count} issues successfully") - - if errors: - echo_warning(f"{len(errors)} errors occurred:") - for error in errors[:5]: # Show first 5 errors - echo_error(f" {error}") - if len(errors) > 5: - echo_warning(f" ... and {len(errors) - 5} more errors") - - # Cleanup - source.disconnect() - target.disconnect() - - except Exception as e: - echo_error(f"Sync failed: {e}") - - -@sync_group.command('push') -@click.argument('target_backend') -@click.option('--source', default='local', help='Source backend (default: local)') -@click.option('--dry-run', is_flag=True, help='Show what would be synced without making changes') -@click.option('--force', is_flag=True, help='Force sync even with conflicts') -@click.pass_context -def sync_push(ctx, target_backend, source, dry_run, force): - """Push issues from source backend to target backend.""" - # This is essentially the same as pull but with arguments swapped - ctx.invoke(sync_pull, source_backend=source, target=target_backend, dry_run=dry_run, force=force) - - -@sync_group.command('bidirectional') -@click.argument('backend1') -@click.argument('backend2') -@click.option('--dry-run', is_flag=True, help='Show what would be synced without making changes') -@click.option('--force', is_flag=True, help='Force sync even with conflicts') -@click.pass_context -def sync_bidirectional(ctx, backend1, backend2, dry_run, force): - """Bidirectional sync between two backends.""" - echo_warning("Bidirectional sync is a complex operation that can cause conflicts.") - - if not force and not confirm_action("Continue with bidirectional sync?"): - click.echo("Aborted") - return - - # First sync backend1 -> backend2 - echo_info(f"Step 1: Syncing {backend1} -> {backend2}") - ctx.invoke(sync_pull, source_backend=backend1, target=backend2, dry_run=dry_run, force=True) - - # Then sync backend2 -> backend1 - echo_info(f"Step 2: Syncing {backend2} -> {backend1}") - ctx.invoke(sync_pull, source_backend=backend2, target=backend1, dry_run=dry_run, force=True) - - echo_success("Bidirectional sync completed") \ No newline at end of file diff --git a/issue-facade/cli/utils.py b/issue-facade/cli/utils.py deleted file mode 100644 index 32182cd9..00000000 --- a/issue-facade/cli/utils.py +++ /dev/null @@ -1,336 +0,0 @@ -""" -CLI Utility Functions - -Helper functions for the CLI commands including formatting, configuration, -backend management, and user interaction. -""" - -import click -import os -import sys -from datetime import datetime -from pathlib import Path -from typing import Optional, List -import tempfile - -from ..core.interfaces import IssueBackend, BackendFactory -from ..core.models import Issue, Comment -from ..backends.local import LocalSQLiteBackend -from ..backends.gitea import GiteaBackend - - -# Register available backends -BackendFactory.register_backend('local', LocalSQLiteBackend) -BackendFactory.register_backend('gitea', GiteaBackend) - - -def get_config_dir() -> Path: - """Get configuration directory.""" - config_dir = Path.home() / '.config' / 'issue-tracker' - config_dir.mkdir(parents=True, exist_ok=True) - return config_dir - - -def get_backend_config_path() -> Path: - """Get backend configuration file path.""" - return get_config_dir() / 'backends.json' - - -def load_backend_configs() -> dict: - """Load backend configurations.""" - config_path = get_backend_config_path() - if not config_path.exists(): - return {} - - import json - try: - with open(config_path, 'r') as f: - return json.load(f) - except (json.JSONDecodeError, IOError): - return {} - - -def save_backend_configs(configs: dict) -> None: - """Save backend configurations.""" - config_path = get_backend_config_path() - import json - with open(config_path, 'w') as f: - json.dump(configs, f, indent=2) - - -def get_default_backend() -> str: - """Get the default backend name.""" - configs = load_backend_configs() - return configs.get('default', 'local') - - -def get_backend(ctx) -> IssueBackend: - """Get backend instance from context.""" - backend_name = ctx.obj.get('backend') or get_default_backend() - configs = load_backend_configs() - - if backend_name not in configs: - if backend_name == 'local': - # Auto-configure local backend - local_config = { - 'type': 'local', - 'db_path': str(get_config_dir() / 'issues.db') - } - configs['local'] = local_config - save_backend_configs(configs) - else: - raise click.ClickException(f"Backend '{backend_name}' not configured. Use 'backend add' to configure it.") - - backend_config = configs[backend_name] - backend_type = backend_config['type'] - - try: - backend = BackendFactory.create_backend(backend_type) - backend.connect(backend_config) - return backend - except Exception as e: - raise click.ClickException(f"Failed to connect to backend '{backend_name}': {e}") - - -def format_issue_list(issues: List[Issue]) -> str: - """Format list of issues as a table.""" - if not issues: - return "No issues found." - - # Calculate column widths - max_title_width = min(50, max(len(issue.title) for issue in issues)) - max_assignee_width = 15 - - # Header - lines = [] - header = f"{'#':<6} {'State':<12} {'Title':<{max_title_width}} {'Assignee':<{max_assignee_width}} Labels" - lines.append(header) - lines.append("-" * len(header)) - - # Issues - for issue in issues: - title = issue.title[:max_title_width] - if len(issue.title) > max_title_width: - title = title[:-3] + "..." - - assignee = issue.primary_assignee.username if issue.primary_assignee else "unassigned" - assignee = assignee[:max_assignee_width] - - labels = ", ".join(label.name for label in issue.labels[:3]) - if len(issue.labels) > 3: - labels += f" (+{len(issue.labels) - 3})" - - line = f"{issue.number:<6} {issue.state.value:<12} {title:<{max_title_width}} {assignee:<{max_assignee_width}} {labels}" - lines.append(line) - - return "\n".join(lines) - - -def format_issue(issue: Issue, show_comments: bool = False, backend: Optional[IssueBackend] = None) -> str: - """Format a single issue with details.""" - lines = [] - - # Header - lines.append(f"#{issue.number}: {issue.title}") - lines.append("=" * (len(f"#{issue.number}: {issue.title}"))) - lines.append("") - - # Basic info - lines.append(f"State: {issue.state.value}") - lines.append(f"Created: {format_datetime(issue.created_at)}") - lines.append(f"Updated: {format_datetime(issue.updated_at)}") - - if issue.closed_at: - lines.append(f"Closed: {format_datetime(issue.closed_at)}") - - # Assignees - if issue.assignees: - assignees_str = ", ".join(assignee.username for assignee in issue.assignees) - lines.append(f"Assignees: {assignees_str}") - else: - lines.append("Assignees: none") - - # Milestone - if issue.milestone: - lines.append(f"Milestone: {issue.milestone.title}") - - # Labels - if issue.labels: - labels_by_category = {} - for label in issue.labels: - category = label.category - if category not in labels_by_category: - labels_by_category[category] = [] - labels_by_category[category].append(label.name) - - for category, label_names in labels_by_category.items(): - lines.append(f"{category.title()} labels: {', '.join(label_names)}") - else: - lines.append("Labels: none") - - # Description - lines.append("") - lines.append("Description:") - lines.append("-" * 12) - if issue.description: - lines.append(issue.description) - else: - lines.append("(no description)") - - # Comments - if show_comments and backend: - comments = backend.get_comments(issue.id) - if comments: - lines.append("") - lines.append(f"Comments ({len(comments)}):") - lines.append("-" * 20) - - for comment in comments: - lines.append("") - lines.append(f"Comment by {comment.author.username} at {format_datetime(comment.created_at)}:") - lines.append(comment.body) - - return "\n".join(lines) - - -def format_datetime(dt: datetime) -> str: - """Format datetime for display.""" - if dt.tzinfo: - dt = dt.astimezone() - return dt.strftime("%Y-%m-%d %H:%M:%S") - - -def get_user_input(prompt: str, default: str = "", multiline: bool = False) -> str: - """Get user input with optional default and multiline support.""" - if multiline: - click.echo(f"{prompt} (Press Ctrl+D when done, Ctrl+C to cancel):") - if default: - click.echo(f"Current value:\n{default}\n") - - lines = [] - try: - while True: - try: - line = input() - lines.append(line) - except EOFError: - break - except KeyboardInterrupt: - raise click.Abort() - - return "\n".join(lines) if lines else default - else: - return click.prompt(prompt, default=default) - - -def validate_backend_type(backend_type: str) -> bool: - """Validate that a backend type is supported.""" - return backend_type in BackendFactory.get_available_backends() - - -def test_backend_connection(backend_config: dict) -> bool: - """Test if a backend configuration works.""" - try: - backend_type = backend_config['type'] - backend = BackendFactory.create_backend(backend_type) - backend.connect(backend_config) - result = backend.test_connection() - backend.disconnect() - return result - except Exception: - return False - - -def format_backend_list(configs: dict) -> str: - """Format backend configurations for display.""" - if not configs: - return "No backends configured." - - lines = [] - default_backend = configs.get('default', 'local') - - lines.append(f"{'Name':<15} {'Type':<10} {'Status':<10} Description") - lines.append("-" * 60) - - for name, config in configs.items(): - if name == 'default': - continue - - backend_type = config.get('type', 'unknown') - is_default = name == default_backend - - # Test connection - try: - status = "connected" if test_backend_connection(config) else "error" - except Exception: - status = "error" - - # Description - if backend_type == 'local': - desc = f"Local SQLite: {config.get('db_path', 'unknown')}" - elif backend_type == 'gitea': - desc = f"Gitea: {config.get('base_url', 'unknown')}/{config.get('owner', 'unknown')}/{config.get('repo', 'unknown')}" - else: - desc = f"{backend_type} backend" - - if is_default: - desc += " (default)" - - line = f"{name:<15} {backend_type:<10} {status:<10} {desc}" - lines.append(line) - - return "\n".join(lines) - - -def get_editor() -> str: - """Get the user's preferred editor.""" - return os.environ.get('EDITOR', 'nano') - - -def edit_text(initial_text: str = "") -> Optional[str]: - """Open text in editor and return edited content.""" - with tempfile.NamedTemporaryFile(mode='w+', suffix='.md', delete=False) as f: - f.write(initial_text) - temp_path = f.name - - try: - editor = get_editor() - os.system(f'{editor} {temp_path}') - - with open(temp_path, 'r') as f: - edited_text = f.read() - - return edited_text if edited_text != initial_text else None - - finally: - os.unlink(temp_path) - - -def confirm_action(message: str, default: bool = False) -> bool: - """Ask user for confirmation.""" - return click.confirm(message, default=default) - - -def progress_bar(items, label: str = "Processing"): - """Create a progress bar for iterating over items.""" - return click.progressbar(items, label=label) - - -def echo_success(message: str) -> None: - """Echo success message in green.""" - click.echo(click.style(message, fg='green')) - - -def echo_warning(message: str) -> None: - """Echo warning message in yellow.""" - click.echo(click.style(message, fg='yellow')) - - -def echo_error(message: str) -> None: - """Echo error message in red.""" - click.echo(click.style(message, fg='red')) - - -def echo_info(message: str) -> None: - """Echo info message in blue.""" - click.echo(click.style(message, fg='blue')) \ No newline at end of file diff --git a/issue-facade/core/interfaces.py b/issue-facade/core/interfaces.py deleted file mode 100644 index 98118f42..00000000 --- a/issue-facade/core/interfaces.py +++ /dev/null @@ -1,407 +0,0 @@ -""" -Backend Plugin Interfaces - -Defines the contracts that all issue tracking backend plugins must implement. -This enables a clean plugin architecture where new backends can be added -without modifying core code. -""" - -from abc import ABC, abstractmethod -from typing import List, Optional, Dict, Any, Iterator -from datetime import datetime - -from .models import Issue, Label, User, Milestone, Comment - - -class IssueFilter: - """Filter criteria for issue queries.""" - - def __init__( - self, - state: Optional[str] = None, - assignee: Optional[str] = None, - labels: Optional[List[str]] = None, - milestone: Optional[str] = None, - created_after: Optional[datetime] = None, - created_before: Optional[datetime] = None, - updated_after: Optional[datetime] = None, - updated_before: Optional[datetime] = None, - search: Optional[str] = None, - limit: Optional[int] = None, - offset: Optional[int] = 0 - ): - self.state = state - self.assignee = assignee - self.labels = labels or [] - self.milestone = milestone - self.created_after = created_after - self.created_before = created_before - self.updated_after = updated_after - self.updated_before = updated_before - self.search = search - self.limit = limit - self.offset = offset - - -class BackendCapabilities: - """Describes what features a backend supports.""" - - def __init__( - self, - supports_milestones: bool = True, - supports_assignees: bool = True, - supports_comments: bool = True, - supports_labels: bool = True, - supports_search: bool = True, - supports_bulk_operations: bool = False, - supports_webhooks: bool = False, - supports_real_time: bool = False, - max_labels_per_issue: Optional[int] = None, - max_assignees_per_issue: Optional[int] = None - ): - self.supports_milestones = supports_milestones - self.supports_assignees = supports_assignees - self.supports_comments = supports_comments - self.supports_labels = supports_labels - self.supports_search = supports_search - self.supports_bulk_operations = supports_bulk_operations - self.supports_webhooks = supports_webhooks - self.supports_real_time = supports_real_time - self.max_labels_per_issue = max_labels_per_issue - self.max_assignees_per_issue = max_assignees_per_issue - - -class IssueBackend(ABC): - """ - Abstract base class for all issue tracking backends. - - Each backend plugin must implement this interface to provide - issue tracking functionality for a specific system. - """ - - @property - @abstractmethod - def backend_type(self) -> str: - """Return the backend type identifier (e.g., 'local', 'gitea', 'github').""" - pass - - @property - @abstractmethod - def capabilities(self) -> BackendCapabilities: - """Return the capabilities supported by this backend.""" - pass - - @abstractmethod - def connect(self, config: Dict[str, Any]) -> None: - """ - Connect to the backend using provided configuration. - - Args: - config: Backend-specific configuration (URLs, tokens, etc.) - """ - pass - - @abstractmethod - def disconnect(self) -> None: - """Disconnect from the backend and clean up resources.""" - pass - - @abstractmethod - def test_connection(self) -> bool: - """Test if the backend connection is working.""" - pass - - # Issue CRUD Operations - @abstractmethod - def create_issue(self, issue: Issue) -> Issue: - """ - Create a new issue in the backend. - - Args: - issue: Issue to create (id may be None for new issues) - - Returns: - Created issue with backend_id populated - """ - pass - - @abstractmethod - def get_issue(self, issue_id: str) -> Optional[Issue]: - """ - Retrieve an issue by its backend ID. - - Args: - issue_id: Backend-specific issue ID - - Returns: - Issue if found, None otherwise - """ - pass - - @abstractmethod - def get_issue_by_number(self, number: int) -> Optional[Issue]: - """ - Retrieve an issue by its human-readable number. - - Args: - number: Issue number - - Returns: - Issue if found, None otherwise - """ - pass - - @abstractmethod - def update_issue(self, issue: Issue) -> Issue: - """ - Update an existing issue in the backend. - - Args: - issue: Issue with modifications - - Returns: - Updated issue - """ - pass - - @abstractmethod - def delete_issue(self, issue_id: str) -> bool: - """ - Delete an issue from the backend. - - Args: - issue_id: Backend-specific issue ID - - Returns: - True if deleted successfully - """ - pass - - @abstractmethod - def list_issues(self, filter_criteria: Optional[IssueFilter] = None) -> List[Issue]: - """ - List issues matching filter criteria. - - Args: - filter_criteria: Optional filter to apply - - Returns: - List of matching issues - """ - pass - - @abstractmethod - def search_issues(self, query: str, limit: Optional[int] = None) -> List[Issue]: - """ - Search issues using backend-specific query syntax. - - Args: - query: Search query - limit: Maximum number of results - - Returns: - List of matching issues - """ - pass - - # Label Operations - @abstractmethod - def create_label(self, label: Label) -> Label: - """Create a new label.""" - pass - - @abstractmethod - def get_labels(self) -> List[Label]: - """Get all available labels.""" - pass - - @abstractmethod - def update_label(self, label: Label) -> Label: - """Update an existing label.""" - pass - - @abstractmethod - def delete_label(self, label_name: str) -> bool: - """Delete a label.""" - pass - - # User Operations - @abstractmethod - def get_users(self) -> List[User]: - """Get all available users.""" - pass - - @abstractmethod - def get_user(self, user_id: str) -> Optional[User]: - """Get a specific user by ID.""" - pass - - @abstractmethod - def search_users(self, query: str) -> List[User]: - """Search for users.""" - pass - - # Milestone Operations - @abstractmethod - def create_milestone(self, milestone: Milestone) -> Milestone: - """Create a new milestone.""" - pass - - @abstractmethod - def get_milestones(self) -> List[Milestone]: - """Get all milestones.""" - pass - - @abstractmethod - def update_milestone(self, milestone: Milestone) -> Milestone: - """Update a milestone.""" - pass - - @abstractmethod - def delete_milestone(self, milestone_id: str) -> bool: - """Delete a milestone.""" - pass - - # Comment Operations - @abstractmethod - def add_comment(self, issue_id: str, comment: Comment) -> Comment: - """Add a comment to an issue.""" - pass - - @abstractmethod - def get_comments(self, issue_id: str) -> List[Comment]: - """Get all comments for an issue.""" - pass - - @abstractmethod - def update_comment(self, comment: Comment) -> Comment: - """Update a comment.""" - pass - - @abstractmethod - def delete_comment(self, comment_id: str) -> bool: - """Delete a comment.""" - pass - - # Bulk Operations (optional, depends on capabilities) - def bulk_update_issues(self, updates: List[Dict[str, Any]]) -> List[Issue]: - """ - Bulk update multiple issues. - - Args: - updates: List of update operations - - Returns: - List of updated issues - """ - if not self.capabilities.supports_bulk_operations: - raise NotImplementedError(f"{self.backend_type} backend does not support bulk operations") - - # Default implementation: update one by one - results = [] - for update in updates: - issue_id = update['id'] - issue = self.get_issue(issue_id) - if issue: - # Apply updates to issue - for key, value in update.items(): - if key != 'id' and hasattr(issue, key): - setattr(issue, key, value) - updated_issue = self.update_issue(issue) - results.append(updated_issue) - return results - - # Sync Support - def get_last_sync_timestamp(self) -> Optional[datetime]: - """ - Get the timestamp of the last successful sync. - Used for incremental synchronization. - """ - return None - - def get_issues_modified_since(self, timestamp: datetime) -> List[Issue]: - """ - Get issues modified since a specific timestamp. - Used for incremental synchronization. - """ - filter_criteria = IssueFilter(updated_after=timestamp) - return self.list_issues(filter_criteria) - - -class LocalBackend(IssueBackend): - """ - Marker interface for local backends. - - Local backends store data locally and can work offline. - They serve as the source of truth for synchronization. - """ - pass - - -class RemoteBackend(IssueBackend): - """ - Marker interface for remote backends. - - Remote backends connect to external issue tracking systems. - They participate in bidirectional synchronization. - """ - pass - - -class SyncableBackend(ABC): - """ - Interface for backends that support synchronization. - - Backends implementing this interface can participate in - bidirectional sync operations. - """ - - @abstractmethod - def prepare_for_sync(self) -> None: - """Prepare backend for sync operation (e.g., create backup).""" - pass - - @abstractmethod - def finalize_sync(self, success: bool) -> None: - """Finalize sync operation (e.g., commit or rollback).""" - pass - - @abstractmethod - def get_sync_conflicts(self) -> List[Dict[str, Any]]: - """Get issues that have sync conflicts.""" - pass - - @abstractmethod - def resolve_sync_conflict(self, issue_id: str, resolution: str) -> Issue: - """ - Resolve a sync conflict. - - Args: - issue_id: Issue with conflict - resolution: 'local' or 'remote' or 'merge' - """ - pass - - -class BackendFactory: - """Factory for creating backend instances.""" - - _backends: Dict[str, type] = {} - - @classmethod - def register_backend(cls, backend_type: str, backend_class: type) -> None: - """Register a backend implementation.""" - cls._backends[backend_type] = backend_class - - @classmethod - def create_backend(cls, backend_type: str) -> IssueBackend: - """Create a backend instance.""" - if backend_type not in cls._backends: - raise ValueError(f"Unknown backend type: {backend_type}") - - return cls._backends[backend_type]() - - @classmethod - def get_available_backends(cls) -> List[str]: - """Get list of available backend types.""" - return list(cls._backends.keys()) \ No newline at end of file diff --git a/issue-facade/core/models.py b/issue-facade/core/models.py deleted file mode 100644 index fae45d57..00000000 --- a/issue-facade/core/models.py +++ /dev/null @@ -1,341 +0,0 @@ -""" -Core Issue Domain Models - -Unified, backend-agnostic issue models that serve as the single source of truth -for all issue tracking operations. These models combine the best features from -various issue tracking systems while maintaining clean domain logic. -""" - -from dataclasses import dataclass, field -from datetime import datetime, timezone -from enum import Enum -from typing import List, Optional, Dict, Any -from functools import cached_property - - -class IssueState(Enum): - """Universal issue state enumeration with backend mapping support.""" - OPEN = "open" - CLOSED = "closed" - IN_PROGRESS = "in_progress" - BLOCKED = "blocked" - - @classmethod - def from_string(cls, state_str: str) -> 'IssueState': - """Convert string to IssueState, with fallback handling.""" - state_map = { - 'open': cls.OPEN, - 'closed': cls.CLOSED, - 'in_progress': cls.IN_PROGRESS, - 'in-progress': cls.IN_PROGRESS, - 'progress': cls.IN_PROGRESS, - 'blocked': cls.BLOCKED, - } - return state_map.get(state_str.lower(), cls.OPEN) - - def to_backend_string(self, backend_type: str) -> str: - """Convert to backend-specific string representation.""" - if backend_type == 'gitea': - return 'open' if self in [self.OPEN, self.IN_PROGRESS, self.BLOCKED] else 'closed' - elif backend_type == 'github': - return self.value - else: - return self.value - - -class Priority(Enum): - """Universal priority levels.""" - LOW = "low" - MEDIUM = "medium" - HIGH = "high" - CRITICAL = "critical" - - @classmethod - def from_label(cls, label_name: str) -> Optional['Priority']: - """Extract priority from label name.""" - if label_name.startswith('priority:'): - priority_str = label_name.replace('priority:', '') - try: - return cls(priority_str) - except ValueError: - return None - return None - - -class IssueType(Enum): - """Universal issue types.""" - BUG = "bug" - FEATURE = "feature" - ENHANCEMENT = "enhancement" - TASK = "task" - DOCUMENTATION = "documentation" - QUESTION = "question" - - @classmethod - def from_label(cls, label_name: str) -> Optional['IssueType']: - """Extract type from label name.""" - try: - return cls(label_name.lower()) - except ValueError: - return None - - -@dataclass(frozen=True) -class Label: - """Universal label model with backend mapping support.""" - name: str - color: Optional[str] = None - description: Optional[str] = None - backend_id: Optional[str] = None # Backend-specific ID for sync - - @cached_property - def category(self) -> str: - """Categorize label for efficient filtering.""" - if self.name.startswith('priority:'): - return 'priority' - elif self.name.startswith('status:'): - return 'status' - elif self.name.startswith('type:'): - return 'type' - elif self.name in ['bug', 'feature', 'enhancement', 'task', 'documentation', 'question']: - return 'type' - else: - return 'other' - - @cached_property - def priority(self) -> Optional[Priority]: - """Extract priority if this is a priority label.""" - return Priority.from_label(self.name) - - @cached_property - def issue_type(self) -> Optional[IssueType]: - """Extract issue type if this is a type label.""" - return IssueType.from_label(self.name) - - -@dataclass(frozen=True) -class LabelCategories: - """Categorized labels for efficient access.""" - priority_labels: List[Label] - type_labels: List[Label] - status_labels: List[Label] - other_labels: List[Label] - - @cached_property - def priority(self) -> Optional[Priority]: - """Get the issue priority.""" - for label in self.priority_labels: - if label.priority: - return label.priority - return None - - @cached_property - def issue_type(self) -> Optional[IssueType]: - """Get the issue type.""" - for label in self.type_labels: - if label.issue_type: - return label.issue_type - return None - - -@dataclass -class User: - """Universal user model.""" - id: str # String ID to handle different backend ID types - username: str - display_name: Optional[str] = None - email: Optional[str] = None - avatar_url: Optional[str] = None - backend_id: Optional[str] = None # Backend-specific ID for sync - - -@dataclass -class Milestone: - """Universal milestone/project model.""" - id: str - title: str - description: Optional[str] = None - state: str = "open" # open, closed - due_date: Optional[datetime] = None - created_at: Optional[datetime] = None - updated_at: Optional[datetime] = None - backend_id: Optional[str] = None - - -@dataclass -class Comment: - """Universal comment model.""" - id: str - body: str - author: User - created_at: datetime - updated_at: Optional[datetime] = None - backend_id: Optional[str] = None - - -@dataclass -class Issue: - """ - Universal Issue model - single source of truth. - - Combines the best features from domain and API models while maintaining - clean separation between core data and backend-specific details. - """ - # Core Issue Data - id: str # Universal ID (UUID for local, external ID for remotes) - number: int # Human-readable number - title: str - description: str - state: IssueState - - # Metadata - created_at: datetime - updated_at: datetime - closed_at: Optional[datetime] = None - - # Relationships - labels: List[Label] = field(default_factory=list) - assignees: List[User] = field(default_factory=list) - milestone: Optional[Milestone] = None - comments: List[Comment] = field(default_factory=list) - - # Backend Integration - backend_id: Optional[str] = None # Backend-specific ID - backend_type: Optional[str] = None # e.g., 'local', 'gitea', 'github' - sync_metadata: Dict[str, Any] = field(default_factory=dict) - - # Performance Optimization - _label_categories: Optional[LabelCategories] = field(default=None, init=False) - - @cached_property - def label_categories(self) -> LabelCategories: - """Efficiently categorize labels with caching.""" - if self._label_categories is None: - # Single-pass categorization for performance - priority_labels = [] - type_labels = [] - status_labels = [] - other_labels = [] - - for label in self.labels: - if label.category == 'priority': - priority_labels.append(label) - elif label.category == 'type': - type_labels.append(label) - elif label.category == 'status': - status_labels.append(label) - else: - other_labels.append(label) - - self._label_categories = LabelCategories( - priority_labels=priority_labels, - type_labels=type_labels, - status_labels=status_labels, - other_labels=other_labels - ) - return self._label_categories - - @property - def priority(self) -> Optional[Priority]: - """Get issue priority from labels.""" - return self.label_categories.priority - - @property - def issue_type(self) -> Optional[IssueType]: - """Get issue type from labels.""" - return self.label_categories.issue_type - - @property - def primary_assignee(self) -> Optional[User]: - """Get primary assignee (first one).""" - return self.assignees[0] if self.assignees else None - - def invalidate_cache(self) -> None: - """Invalidate cached properties when labels change.""" - if hasattr(self, '_label_categories'): - object.__setattr__(self, '_label_categories', None) - - # Domain Logic Methods - def close(self, closed_at: Optional[datetime] = None) -> None: - """Close the issue with business rule validation.""" - if self.state == IssueState.CLOSED: - raise ValueError(f"Issue #{self.number} is already closed") - - self.state = IssueState.CLOSED - self.closed_at = closed_at or datetime.now(timezone.utc) - self.updated_at = datetime.now(timezone.utc) - - def reopen(self) -> None: - """Reopen the issue with business rule validation.""" - if self.state != IssueState.CLOSED: - raise ValueError(f"Issue #{self.number} is not closed (current state: {self.state.value})") - - self.state = IssueState.OPEN - self.closed_at = None - self.updated_at = datetime.now(timezone.utc) - - def add_label(self, label: Label) -> None: - """Add a label to the issue.""" - if label not in self.labels: - self.labels.append(label) - self.invalidate_cache() - self.updated_at = datetime.now(timezone.utc) - - def remove_label(self, label_name: str) -> bool: - """Remove a label by name. Returns True if removed.""" - original_count = len(self.labels) - self.labels = [label for label in self.labels if label.name != label_name] - if len(self.labels) < original_count: - self.invalidate_cache() - self.updated_at = datetime.now(timezone.utc) - return True - return False - - def has_label(self, label_name: str) -> bool: - """Check if issue has a specific label.""" - return any(label.name == label_name for label in self.labels) - - def add_assignee(self, user: User) -> None: - """Add an assignee to the issue.""" - if user not in self.assignees: - self.assignees.append(user) - self.updated_at = datetime.now(timezone.utc) - - def remove_assignee(self, user_id: str) -> bool: - """Remove an assignee by ID. Returns True if removed.""" - original_count = len(self.assignees) - self.assignees = [user for user in self.assignees if user.id != user_id] - if len(self.assignees) < original_count: - self.updated_at = datetime.now(timezone.utc) - return True - return False - - def add_comment(self, comment: Comment) -> None: - """Add a comment to the issue.""" - self.comments.append(comment) - self.updated_at = datetime.now(timezone.utc) - - def to_dict(self) -> Dict[str, Any]: - """Convert to dictionary for serialization.""" - return { - 'id': self.id, - 'number': self.number, - 'title': self.title, - 'description': self.description, - 'state': self.state.value, - 'created_at': self.created_at.isoformat(), - 'updated_at': self.updated_at.isoformat(), - 'closed_at': self.closed_at.isoformat() if self.closed_at else None, - 'labels': [{'name': l.name, 'color': l.color, 'description': l.description} for l in self.labels], - 'assignees': [{'id': u.id, 'username': u.username, 'display_name': u.display_name} for u in self.assignees], - 'milestone': {'id': self.milestone.id, 'title': self.milestone.title} if self.milestone else None, - 'backend_id': self.backend_id, - 'backend_type': self.backend_type, - } - - @classmethod - def from_dict(cls, data: Dict[str, Any]) -> 'Issue': - """Create Issue from dictionary.""" - # This would be implemented with proper parsing - # Simplified version for now - raise NotImplementedError("from_dict implementation needed") \ No newline at end of file diff --git a/issue-facade/core/repository.py b/issue-facade/core/repository.py deleted file mode 100644 index 447a60e4..00000000 --- a/issue-facade/core/repository.py +++ /dev/null @@ -1,454 +0,0 @@ -""" -Repository Pattern Implementation - -Provides a high-level repository interface that abstracts backend operations -and adds features like caching, transaction support, and business logic. -""" - -from abc import ABC, abstractmethod -from typing import List, Optional, Dict, Any, Union -from datetime import datetime, timezone -import logging - -from .interfaces import IssueBackend, IssueFilter -from .models import Issue, Label, User, Milestone, Comment, IssueState - - -logger = logging.getLogger(__name__) - - -class IssueRepository: - """ - High-level repository for issue operations. - - Provides a clean interface for issue management with additional features - like caching, validation, and business rule enforcement. - """ - - def __init__(self, backend: IssueBackend, enable_caching: bool = True): - self.backend = backend - self.enable_caching = enable_caching - self._cache: Dict[str, Any] = {} - self._cache_timeout = 300 # 5 minutes - - def _cache_key(self, operation: str, *args) -> str: - """Generate cache key for operation.""" - return f"{operation}:{':'.join(str(arg) for arg in args)}" - - def _get_from_cache(self, key: str) -> Optional[Any]: - """Get value from cache if enabled and not expired.""" - if not self.enable_caching or key not in self._cache: - return None - - cached_item = self._cache[key] - if datetime.now(timezone.utc).timestamp() - cached_item['timestamp'] > self._cache_timeout: - del self._cache[key] - return None - - return cached_item['value'] - - def _set_cache(self, key: str, value: Any) -> None: - """Set value in cache if enabled.""" - if self.enable_caching: - self._cache[key] = { - 'value': value, - 'timestamp': datetime.now(timezone.utc).timestamp() - } - - def _invalidate_cache_pattern(self, pattern: str) -> None: - """Invalidate cache entries matching pattern.""" - if not self.enable_caching: - return - - keys_to_remove = [key for key in self._cache.keys() if pattern in key] - for key in keys_to_remove: - del self._cache[key] - - # Issue Operations - def create_issue( - self, - title: str, - description: str = "", - labels: Optional[List[str]] = None, - assignees: Optional[List[str]] = None, - milestone: Optional[str] = None, - issue_type: Optional[str] = None, - priority: Optional[str] = None - ) -> Issue: - """ - Create a new issue with business rule validation. - - Args: - title: Issue title (required) - description: Issue description - labels: List of label names - assignees: List of usernames to assign - milestone: Milestone title or ID - issue_type: Issue type (bug, feature, etc.) - priority: Priority level (low, medium, high, critical) - - Returns: - Created issue - """ - # Validation - if not title.strip(): - raise ValueError("Issue title cannot be empty") - - # Build labels - issue_labels = [] - if labels: - for label_name in labels: - issue_labels.append(Label(name=label_name.strip())) - - # Add type and priority as labels - if issue_type: - issue_labels.append(Label(name=issue_type)) - - if priority: - issue_labels.append(Label(name=f'priority:{priority}')) - - # Resolve assignees - issue_assignees = [] - if assignees: - for username in assignees: - users = self.backend.search_users(username) - if users: - issue_assignees.append(users[0]) - else: - # Create basic user if not found - issue_assignees.append(User(id=username, username=username)) - - # Resolve milestone - issue_milestone = None - if milestone: - milestones = self.backend.get_milestones() - for m in milestones: - if m.title == milestone or m.id == milestone: - issue_milestone = m - break - - # Create issue - now = datetime.now(timezone.utc) - issue = Issue( - id="", # Will be set by backend - number=0, # Will be set by backend - title=title.strip(), - description=description.strip(), - state=IssueState.OPEN, - created_at=now, - updated_at=now, - labels=issue_labels, - assignees=issue_assignees, - milestone=issue_milestone - ) - - created_issue = self.backend.create_issue(issue) - - # Invalidate relevant caches - self._invalidate_cache_pattern("list_issues") - self._invalidate_cache_pattern("search_issues") - - logger.info(f"Created issue #{created_issue.number}: {created_issue.title}") - return created_issue - - def get_issue(self, issue_id: Union[str, int]) -> Optional[Issue]: - """Get issue by ID or number.""" - if isinstance(issue_id, int): - return self.get_issue_by_number(issue_id) - - cache_key = self._cache_key("get_issue", issue_id) - cached_issue = self._get_from_cache(cache_key) - if cached_issue: - return cached_issue - - issue = self.backend.get_issue(str(issue_id)) - if issue: - self._set_cache(cache_key, issue) - - return issue - - def get_issue_by_number(self, number: int) -> Optional[Issue]: - """Get issue by number.""" - cache_key = self._cache_key("get_issue_by_number", number) - cached_issue = self._get_from_cache(cache_key) - if cached_issue: - return cached_issue - - issue = self.backend.get_issue_by_number(number) - if issue: - self._set_cache(cache_key, issue) - - return issue - - def update_issue(self, issue: Issue) -> Issue: - """Update issue with validation.""" - if not issue.title.strip(): - raise ValueError("Issue title cannot be empty") - - issue.updated_at = datetime.now(timezone.utc) - updated_issue = self.backend.update_issue(issue) - - # Invalidate caches - self._invalidate_cache_pattern("get_issue") - self._invalidate_cache_pattern("list_issues") - self._invalidate_cache_pattern("search_issues") - - logger.info(f"Updated issue #{updated_issue.number}: {updated_issue.title}") - return updated_issue - - def close_issue(self, issue_id: Union[str, int], comment: Optional[str] = None) -> Issue: - """Close issue with optional comment.""" - issue = self.get_issue(issue_id) - if not issue: - raise ValueError(f"Issue {issue_id} not found") - - if issue.state == IssueState.CLOSED: - raise ValueError(f"Issue #{issue.number} is already closed") - - issue.close() - - # Add closing comment if provided - if comment: - self.add_comment(issue.id, comment) - - return self.update_issue(issue) - - def reopen_issue(self, issue_id: Union[str, int], comment: Optional[str] = None) -> Issue: - """Reopen issue with optional comment.""" - issue = self.get_issue(issue_id) - if not issue: - raise ValueError(f"Issue {issue_id} not found") - - if issue.state != IssueState.CLOSED: - raise ValueError(f"Issue #{issue.number} is not closed") - - issue.reopen() - - # Add reopening comment if provided - if comment: - self.add_comment(issue.id, comment) - - return self.update_issue(issue) - - def list_issues( - self, - state: Optional[str] = None, - assignee: Optional[str] = None, - labels: Optional[List[str]] = None, - milestone: Optional[str] = None, - search: Optional[str] = None, - limit: Optional[int] = None, - offset: Optional[int] = None - ) -> List[Issue]: - """List issues with filtering and caching.""" - filter_criteria = IssueFilter( - state=state, - assignee=assignee, - labels=labels, - milestone=milestone, - search=search, - limit=limit, - offset=offset - ) - - # Create cache key from filter criteria - cache_key = self._cache_key( - "list_issues", - state or "all", - assignee or "any", - ",".join(labels) if labels else "any", - milestone or "any", - search or "any", - limit or 0, - offset or 0 - ) - - cached_issues = self._get_from_cache(cache_key) - if cached_issues: - return cached_issues - - issues = self.backend.list_issues(filter_criteria) - self._set_cache(cache_key, issues) - - return issues - - def search_issues(self, query: str, limit: Optional[int] = None) -> List[Issue]: - """Search issues with caching.""" - cache_key = self._cache_key("search_issues", query, limit or 0) - cached_issues = self._get_from_cache(cache_key) - if cached_issues: - return cached_issues - - issues = self.backend.search_issues(query, limit) - self._set_cache(cache_key, issues) - - return issues - - # Comment Operations - def add_comment(self, issue_id: str, comment_text: str, author_username: str = "cli-user") -> Comment: - """Add comment to issue.""" - if not comment_text.strip(): - raise ValueError("Comment text cannot be empty") - - # Try to find user - users = self.backend.search_users(author_username) - if users: - author = users[0] - else: - author = User(id=author_username, username=author_username) - - comment = Comment( - id="", - body=comment_text.strip(), - author=author, - created_at=datetime.now(timezone.utc) - ) - - added_comment = self.backend.add_comment(issue_id, comment) - - # Invalidate issue cache - self._invalidate_cache_pattern("get_issue") - - logger.info(f"Added comment to issue {issue_id}") - return added_comment - - def get_comments(self, issue_id: str) -> List[Comment]: - """Get comments for issue.""" - cache_key = self._cache_key("get_comments", issue_id) - cached_comments = self._get_from_cache(cache_key) - if cached_comments: - return cached_comments - - comments = self.backend.get_comments(issue_id) - self._set_cache(cache_key, comments) - - return comments - - # Label Operations - def get_or_create_label(self, name: str, color: Optional[str] = None, description: Optional[str] = None) -> Label: - """Get existing label or create new one.""" - existing_labels = self.backend.get_labels() - for label in existing_labels: - if label.name == name: - return label - - # Create new label - new_label = Label(name=name, color=color, description=description) - return self.backend.create_label(new_label) - - # Statistics and Analytics - def get_issue_stats(self) -> Dict[str, Any]: - """Get issue statistics.""" - cache_key = self._cache_key("get_issue_stats") - cached_stats = self._get_from_cache(cache_key) - if cached_stats: - return cached_stats - - all_issues = self.backend.list_issues() - - stats = { - 'total': len(all_issues), - 'open': len([i for i in all_issues if i.state != IssueState.CLOSED]), - 'closed': len([i for i in all_issues if i.state == IssueState.CLOSED]), - 'by_state': {}, - 'by_priority': {}, - 'by_type': {}, - 'by_assignee': {}, - 'recent_activity': 0 - } - - # Count by state - for issue in all_issues: - state = issue.state.value - stats['by_state'][state] = stats['by_state'].get(state, 0) + 1 - - # Count by priority and type - one_week_ago = datetime.now(timezone.utc).timestamp() - 604800 # 7 days - - for issue in all_issues: - # Priority - priority = issue.priority - if priority: - priority_name = priority.value - stats['by_priority'][priority_name] = stats['by_priority'].get(priority_name, 0) + 1 - - # Type - issue_type = issue.issue_type - if issue_type: - type_name = issue_type.value - stats['by_type'][type_name] = stats['by_type'].get(type_name, 0) + 1 - - # Assignee - if issue.assignees: - for assignee in issue.assignees: - username = assignee.username - stats['by_assignee'][username] = stats['by_assignee'].get(username, 0) + 1 - - # Recent activity - if issue.updated_at.timestamp() > one_week_ago: - stats['recent_activity'] += 1 - - self._set_cache(cache_key, stats) - return stats - - # Bulk Operations - def bulk_close_issues(self, issue_numbers: List[int], comment: Optional[str] = None) -> List[Issue]: - """Close multiple issues.""" - results = [] - for number in issue_numbers: - try: - closed_issue = self.close_issue(number, comment) - results.append(closed_issue) - except Exception as e: - logger.error(f"Failed to close issue #{number}: {e}") - - return results - - def bulk_add_label(self, issue_numbers: List[int], label_name: str) -> List[Issue]: - """Add label to multiple issues.""" - label = self.get_or_create_label(label_name) - results = [] - - for number in issue_numbers: - try: - issue = self.get_issue_by_number(number) - if issue: - issue.add_label(label) - updated_issue = self.update_issue(issue) - results.append(updated_issue) - except Exception as e: - logger.error(f"Failed to add label to issue #{number}: {e}") - - return results - - # Cache Management - def clear_cache(self) -> None: - """Clear all cached data.""" - self._cache.clear() - logger.info("Repository cache cleared") - - def get_cache_stats(self) -> Dict[str, Any]: - """Get cache statistics.""" - if not self.enable_caching: - return {'enabled': False} - - now = datetime.now(timezone.utc).timestamp() - expired_count = 0 - for cached_item in self._cache.values(): - if now - cached_item['timestamp'] > self._cache_timeout: - expired_count += 1 - - return { - 'enabled': True, - 'total_entries': len(self._cache), - 'expired_entries': expired_count, - 'cache_timeout': self._cache_timeout - } - - # Context Manager Support - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - if hasattr(self.backend, 'disconnect'): - self.backend.disconnect() \ No newline at end of file