init: first extract of implementation

This commit is contained in:
2025-10-25 00:54:20 +02:00
parent dffe374460
commit 51aea5effb
18 changed files with 4441 additions and 23 deletions

363
README.md
View File

@@ -1,23 +1,340 @@
# Issue Facade - Universal CLI for Issue Tracking
A convenient command-line facade that provides a unified interface to the repository's main issue tracker, regardless of which backend system is actually being used.
## Purpose
The **Issue Facade** acts as a convenient CLI wrapper that automatically detects and interfaces with whatever issue tracking system is configured for the current repository. This means you get a consistent, intuitive command-line experience whether your project uses:
- GitHub Issues
- GitLab Issues
- Gitea Issues
- JIRA
- Local SQLite storage
- Any other supported backend
## Philosophy
Rather than learning different commands and workflows for each issue tracking system, the Issue Facade provides:
- **One CLI to rule them all**: Same commands work across all backends
- **Repository-aware**: Automatically detects the relevant issue tracker for your repo
- **Offline capability**: Local SQLite fallback when remote systems are unavailable
- **Seamless sync**: Keep local and remote issue trackers synchronized
# Issue Facade - Universal CLI for Issue Tracking
A convenient command-line facade that provides a unified interface to the repository's main issue tracker, regardless of which backend system is actually being used.
## Purpose
The **Issue Facade** acts as a convenient CLI wrapper that automatically detects and interfaces with whatever issue tracking system is configured for the current repository. This means you get a consistent, intuitive command-line experience whether your project uses:
- GitHub Issues
- GitLab Issues
- Gitea Issues
- JIRA
- Local SQLite storage
- Any other supported backend
## Philosophy
Rather than learning different commands and workflows for each issue tracking system, the Issue Facade provides:
- **One CLI to rule them all**: Same commands work across all backends
- **Repository-aware**: Automatically detects the relevant issue tracker for your repo
- **Offline capability**: Local SQLite fallback when remote systems are unavailable
- **Seamless sync**: Keep local and remote issue trackers synchronized
## How It Works
```bash
# The facade automatically detects your repository's issue tracker
cd /path/to/my-project
issue list # Lists issues from the repo's configured tracker
cd /path/to/another-project
issue list # Lists issues from THIS repo's tracker
# Same commands, different backends - transparent to the user
```
## Key Features
### 🎯 **Repository Context Awareness**
The facade automatically detects:
- Git remotes (GitHub, GitLab, Gitea URLs)
- Configuration files (`.issue-config`, `pyproject.toml`, etc.)
- Environment variables
- Default fallbacks
### 🖥️ **Unified CLI Experience**
```bash
# Core issue operations (same across all backends)
issue list # List issues
issue show 42 # Show issue details
issue create "Bug in parser" # Create new issue
issue edit 42 --add-label bug # Edit existing issue
issue close 42 --comment "Fixed" # Close with comment
issue comment 42 "Still broken" # Add comment
# Advanced operations
issue list --assignee=me --state=open
issue search "memory leak"
issue stats # Show issue statistics
```
### 🔄 **Automatic Backend Detection**
```bash
# GitHub repository
cd my-github-project
issue list # → Automatically uses GitHub Issues API
# Gitea repository
cd my-gitea-project
issue list # → Automatically uses Gitea Issues API
# Offline/local work
cd any-project
issue backend set local
issue list # → Uses local SQLite storage
```
### 🌐 **Seamless Synchronization**
```bash
# Work offline, sync later
issue create "Bug found offline" --local
issue sync # Pushes to remote when online
# Keep backups
issue sync pull # Download all remote issues locally
issue export backup.json # Export for archival
```
## Installation & Setup
### 1. Install the Facade
```bash
pip install issue-facade
```
### 2. Automatic Configuration
The facade auto-detects your repository's issue tracker:
```bash
cd your-repository
issue config detect # Auto-configure based on git remotes
issue list # Ready to use!
```
### 3. Manual Configuration (if needed)
```bash
# Configure specific backends
issue backend add github my-repo
issue backend add gitea company-repo
issue backend add local offline
# Set repository-specific backend
issue config set-backend github # For current repository
```
## Repository Integration Examples
### GitHub Repository
```bash
cd my-github-project
issue config detect
# → Automatically configures GitHub Issues via API
# → Uses .github/ISSUE_TEMPLATE/ for templates
# → Respects repository labels and milestones
issue create "Security vulnerability" --template security
```
### Corporate Gitea
```bash
cd company-project
issue config detect
# → Detects Gitea instance from git remote
# → Prompts for access token (one-time setup)
# → Uses corporate labels and workflows
issue list --milestone "Q4 Release"
```
### Offline Development
```bash
cd any-project
issue backend set local
# → Creates local SQLite database in .issue-facade/
# → Full offline functionality
# → Sync to remote when connection available
issue create "Performance issue" --offline
issue sync when-online
```
## Configuration
### Per-Repository Configuration
Each repository can have its own configuration in `.issue-facade/config.json`:
```json
{
"backend": "github",
"github": {
"owner": "myorg",
"repo": "myproject",
"token_env": "GITHUB_TOKEN"
},
"local": {
"db_path": ".issue-facade/issues.db",
"sync_enabled": true
},
"templates": {
"bug": ".github/ISSUE_TEMPLATE/bug_report.md",
"feature": ".github/ISSUE_TEMPLATE/feature_request.md"
}
}
```
### Global Configuration
User-wide settings in `~/.config/issue-facade/config.json`:
```json
{
"default_backend": "local",
"github_token": "env:GITHUB_TOKEN",
"gitea_instances": {
"company": {
"url": "https://git.company.com",
"token": "env:GITEA_TOKEN"
}
},
"offline_mode": false,
"sync_interval": "1h"
}
```
## Architecture
### Facade Pattern Implementation
```
Repository Working Directory
├── .git/ # Git repository
├── .issue-facade/ # Facade configuration
│ ├── config.json # Repository-specific config
│ ├── issues.db # Local SQLite cache/backup
│ └── templates/ # Issue templates
├── your-project-files...
└── issue # CLI command (context-aware)
```
### Backend Detection Logic
1. **Check repository configuration**: `.issue-facade/config.json`
2. **Analyze git remotes**: Detect GitHub/GitLab/Gitea URLs
3. **Look for platform files**: `.github/`, `.gitlab/`, etc.
4. **Check environment**: `GITHUB_TOKEN`, `GITLAB_TOKEN`, etc.
5. **Fall back to local**: SQLite storage if no remote detected
### Multi-Repository Support
```bash
# Each repository maintains its own context
/projects/web-app/ → GitHub Issues
/projects/api-server/ → GitLab Issues
/projects/cli-tool/ → Gitea Issues
/projects/experiment/ → Local SQLite
# Same commands work in all contexts
cd web-app && issue list # GitHub
cd api-server && issue list # GitLab
cd cli-tool && issue list # Gitea
cd experiment && issue list # Local
```
## Use Cases
### 1. **Multi-Platform Developer**
You work with repositories across GitHub, GitLab, and company Gitea:
```bash
# Learn one CLI, use everywhere
issue list --assignee=me # Works on all platforms
issue create "Cross-platform bug" --label bug
```
### 2. **Offline Developer**
You need to track issues without constant internet:
```bash
issue create "Found while flying" --offline
issue list --local # View offline issues
issue sync # Upload when back online
```
### 3. **Repository Migration**
Moving from GitHub to GitLab:
```bash
issue export github-backup.json # Backup from GitHub
issue backend set gitlab # Switch to GitLab
issue import github-backup.json # Import to GitLab
```
### 4. **Cross-Repository Analytics**
Track issues across multiple repositories:
```bash
issue stats --all-repos # Statistics across all configured repos
issue search "security" --global # Search across all issue trackers
```
## Integration with Development Workflow
### Git Hooks Integration
```bash
# .git/hooks/pre-commit
issue list --assignee=me --state=open > ISSUES.md
git add ISSUES.md
```
### CI/CD Integration
```bash
# In your CI pipeline
issue create "Build failed on commit $SHA" --label ci-failure
issue close-if-fixed $ISSUE_NUMBER
```
### IDE Integration
```bash
# VS Code, Vim, Emacs plugins can use the CLI
:IssueList # List issues in editor
:IssueCreate "Typo in function" # Create issue from editor
```
## Comparison with Native Tools
| Feature | issue-facade | gh (GitHub CLI) | glab (GitLab CLI) | Platform Web UI |
|---------|--------------|-----------------|-------------------|-----------------|
| **Multi-platform** | ✅ All backends | ❌ GitHub only | ❌ GitLab only | ❌ Single platform |
| **Offline support** | ✅ Local SQLite | ❌ Online only | ❌ Online only | ❌ Online only |
| **Consistent CLI** | ✅ Same commands | ❌ GitHub-specific | ❌ GitLab-specific | ❌ Web interface |
| **Repository context** | ✅ Auto-detect | ✅ Git-aware | ✅ Git-aware | ❌ Manual navigation |
| **Cross-repo search** | ✅ Global search | ❌ Single repo | ❌ Single repo | ❌ Single repo |
| **Data portability** | ✅ Export/import | ❌ Platform-locked | ❌ Platform-locked | ❌ Platform-locked |
## Future Roadmap
### Version 1.0 (Current)
- [x] Core facade architecture
- [x] GitHub/GitLab/Gitea backend support
- [x] Local SQLite backend
- [x] Automatic repository detection
- [x] Basic synchronization
### Version 1.1
- [ ] Advanced sync (conflict resolution)
- [ ] Issue templates support
- [ ] Workflow automation hooks
- [ ] Plugin system for custom backends
### Version 2.0
- [ ] Web dashboard for multi-repo overview
- [ ] Advanced analytics and reporting
- [ ] Team collaboration features
- [ ] Integration with project management tools
## Contributing
The Issue Facade is designed to be:
- **Backend-agnostic**: Easy to add new issue tracking systems
- **Repository-aware**: Respects the conventions of each platform
- **Developer-friendly**: Consistent CLI across all environments
To add a new backend:
1. Implement the `IssueBackend` interface
2. Add detection logic for the platform
3. Register the backend in the factory
4. Add CLI configuration support
## Why "Facade"?
The **Facade Pattern** perfectly describes this tool's purpose:
> *"Provide a unified interface to a set of interfaces in a subsystem. Facade defines a higher-level interface that makes the subsystem easier to use."*
Instead of learning different CLIs for GitHub (`gh`), GitLab (`glab`), JIRA, etc., the Issue Facade provides one consistent interface that works with all of them. It's the universal remote control for issue tracking systems.
The facade doesn't replace the underlying issue trackers - it makes them easier to use consistently across different platforms and repositories.

24
__init__.py Normal file
View File

@@ -0,0 +1,24 @@
"""
Universal Issue Tracking System
A backend-agnostic issue tracking system that supports multiple backends
through a plugin architecture. Designed to be extracted into a standalone
repository for use across multiple projects.
Features:
- Unified issue model across all backends
- Plugin-based backend architecture
- Local SQLite backend for offline work
- Bidirectional synchronization
- CLI-first interface
- Support for GitHub-style and other issue tracking systems
Supported Backends:
- Local SQLite (for offline/standalone use)
- Gitea (GitHub-compatible API)
- Future: GitHub, GitLab, JIRA, Redmine, etc.
"""
__version__ = "0.1.0"
__author__ = "MarkiTect Project"
__description__ = "Universal Issue Tracking System with Plugin Architecture"

11
backends/__init__.py Normal file
View File

@@ -0,0 +1,11 @@
"""
Issue Tracking Backend Plugins
This package contains implementations for various issue tracking backends.
Each backend implements the IssueBackend interface to provide a consistent
API regardless of the underlying issue tracking system.
Available Backends:
- local: SQLite-based local backend for offline use
- gitea: Gitea API backend for GitHub-compatible systems
"""

View File

@@ -0,0 +1,17 @@
"""
Gitea Backend
A backend implementation for Gitea issue tracking systems.
This backend provides integration with Gitea API for remote issue management.
Features:
- Full Gitea API integration
- GitHub-compatible operations
- Remote synchronization
- Authentication support
- Rate limiting compliance
"""
from .backend import GiteaBackend
__all__ = ['GiteaBackend']

591
backends/gitea/backend.py Normal file
View File

@@ -0,0 +1,591 @@
"""
Gitea Backend Implementation
Provides integration with Gitea API for remote issue tracking.
This backend adapts the Gitea API to our unified issue model.
"""
import requests
import time
from datetime import datetime, timezone
from typing import List, Optional, Dict, Any
from urllib.parse import urljoin
from ...core.interfaces import RemoteBackend, BackendCapabilities, IssueFilter, SyncableBackend
from ...core.models import Issue, Label, User, Milestone, Comment, IssueState, Priority, IssueType
class GiteaAPIError(Exception):
"""Gitea API specific errors."""
pass
class GiteaRateLimitError(GiteaAPIError):
"""Rate limit exceeded."""
pass
class GiteaBackend(RemoteBackend, SyncableBackend):
"""Gitea API backend for remote issue tracking."""
def __init__(self):
self.base_url: Optional[str] = None
self.token: Optional[str] = None
self.owner: Optional[str] = None
self.repo: Optional[str] = None
self.session = requests.Session()
self._capabilities = BackendCapabilities(
supports_milestones=True,
supports_assignees=True,
supports_comments=True,
supports_labels=True,
supports_search=True,
supports_bulk_operations=False,
supports_webhooks=True,
supports_real_time=False,
max_labels_per_issue=None,
max_assignees_per_issue=10 # Gitea typical limit
)
@property
def backend_type(self) -> str:
return "gitea"
@property
def capabilities(self) -> BackendCapabilities:
return self._capabilities
def connect(self, config: Dict[str, Any]) -> None:
"""Connect to Gitea API."""
self.base_url = config['base_url'].rstrip('/')
self.token = config['token']
self.owner = config['owner']
self.repo = config['repo']
# Setup session with authentication
self.session.headers.update({
'Authorization': f'token {self.token}',
'Content-Type': 'application/json',
'Accept': 'application/json'
})
# Test connection
if not self.test_connection():
raise GiteaAPIError("Failed to connect to Gitea API")
def disconnect(self) -> None:
"""Disconnect from Gitea API."""
self.session.close()
self.base_url = None
self.token = None
self.owner = None
self.repo = None
def test_connection(self) -> bool:
"""Test Gitea API connection."""
try:
response = self._api_request('GET', f'/repos/{self.owner}/{self.repo}')
return response.status_code == 200
except Exception:
return False
def _api_request(self, method: str, endpoint: str, data: Optional[Dict] = None, params: Optional[Dict] = None) -> requests.Response:
"""Make API request with error handling and rate limiting."""
url = urljoin(f"{self.base_url}/api/v1", endpoint)
try:
response = self.session.request(method, url, json=data, params=params)
# Handle rate limiting
if response.status_code == 429:
retry_after = int(response.headers.get('Retry-After', 60))
raise GiteaRateLimitError(f"Rate limit exceeded. Retry after {retry_after} seconds")
response.raise_for_status()
return response
except requests.exceptions.RequestException as e:
raise GiteaAPIError(f"API request failed: {e}")
def _gitea_issue_to_unified(self, gitea_issue: Dict[str, Any]) -> Issue:
"""Convert Gitea issue JSON to unified Issue model."""
# Convert labels
labels = []
for label_data in gitea_issue.get('labels', []):
labels.append(Label(
name=label_data['name'],
color=label_data['color'],
description=label_data.get('description', ''),
backend_id=str(label_data['id'])
))
# Convert assignees
assignees = []
if gitea_issue.get('assignees'):
for assignee_data in gitea_issue['assignees']:
assignees.append(User(
id=str(assignee_data['id']),
username=assignee_data['login'],
display_name=assignee_data.get('full_name', ''),
email=assignee_data.get('email', ''),
avatar_url=assignee_data.get('avatar_url', ''),
backend_id=str(assignee_data['id'])
))
# Convert milestone
milestone = None
if gitea_issue.get('milestone'):
milestone_data = gitea_issue['milestone']
milestone = Milestone(
id=str(milestone_data['id']),
title=milestone_data['title'],
description=milestone_data.get('description', ''),
state=milestone_data['state'],
due_date=datetime.fromisoformat(milestone_data['due_on'].replace('Z', '+00:00')) if milestone_data.get('due_on') else None,
created_at=datetime.fromisoformat(milestone_data['created_at'].replace('Z', '+00:00')) if milestone_data.get('created_at') else None,
updated_at=datetime.fromisoformat(milestone_data['updated_at'].replace('Z', '+00:00')) if milestone_data.get('updated_at') else None,
backend_id=str(milestone_data['id'])
)
# Determine state
if gitea_issue['state'] == 'closed':
state = IssueState.CLOSED
else:
# Check for status labels to determine more specific state
for label in labels:
if label.name == 'status:in_progress' or label.name == 'status:in-progress':
state = IssueState.IN_PROGRESS
break
elif label.name == 'status:blocked':
state = IssueState.BLOCKED
break
else:
state = IssueState.OPEN
return Issue(
id=str(gitea_issue['id']),
number=gitea_issue['number'],
title=gitea_issue['title'],
description=gitea_issue.get('body', ''),
state=state,
created_at=datetime.fromisoformat(gitea_issue['created_at'].replace('Z', '+00:00')),
updated_at=datetime.fromisoformat(gitea_issue['updated_at'].replace('Z', '+00:00')),
closed_at=datetime.fromisoformat(gitea_issue['closed_at'].replace('Z', '+00:00')) if gitea_issue.get('closed_at') else None,
labels=labels,
assignees=assignees,
milestone=milestone,
backend_id=str(gitea_issue['id']),
backend_type='gitea'
)
def _unified_issue_to_gitea(self, issue: Issue) -> Dict[str, Any]:
"""Convert unified Issue to Gitea API format."""
data = {
'title': issue.title,
'body': issue.description,
'state': 'closed' if issue.state == IssueState.CLOSED else 'open'
}
if issue.assignees:
data['assignees'] = [assignee.username for assignee in issue.assignees]
if issue.milestone:
data['milestone'] = int(issue.milestone.backend_id) if issue.milestone.backend_id else None
# Convert labels
if issue.labels:
data['labels'] = [label.name for label in issue.labels]
return data
# Issue CRUD Operations
def create_issue(self, issue: Issue) -> Issue:
"""Create issue in Gitea."""
data = self._unified_issue_to_gitea(issue)
response = self._api_request('POST', f'/repos/{self.owner}/{self.repo}/issues', data=data)
gitea_issue = response.json()
return self._gitea_issue_to_unified(gitea_issue)
def get_issue(self, issue_id: str) -> Optional[Issue]:
"""Get issue from Gitea by ID."""
try:
response = self._api_request('GET', f'/repos/{self.owner}/{self.repo}/issues/{issue_id}')
gitea_issue = response.json()
return self._gitea_issue_to_unified(gitea_issue)
except GiteaAPIError:
return None
def get_issue_by_number(self, number: int) -> Optional[Issue]:
"""Get issue by number."""
return self.get_issue(str(number))
def update_issue(self, issue: Issue) -> Issue:
"""Update issue in Gitea."""
data = self._unified_issue_to_gitea(issue)
response = self._api_request('PATCH', f'/repos/{self.owner}/{self.repo}/issues/{issue.backend_id}', data=data)
gitea_issue = response.json()
return self._gitea_issue_to_unified(gitea_issue)
def delete_issue(self, issue_id: str) -> bool:
"""Delete issue - not supported by Gitea API."""
# Gitea doesn't support deleting issues via API
# We could close it instead
try:
issue = self.get_issue(issue_id)
if issue:
issue.close()
self.update_issue(issue)
return True
except Exception:
pass
return False
def list_issues(self, filter_criteria: Optional[IssueFilter] = None) -> List[Issue]:
"""List issues from Gitea."""
params = {
'state': 'all', # Get both open and closed
'sort': 'updated',
'order': 'desc'
}
if filter_criteria:
if filter_criteria.state:
if filter_criteria.state == 'open':
params['state'] = 'open'
elif filter_criteria.state == 'closed':
params['state'] = 'closed'
if filter_criteria.assignee:
params['assignee'] = filter_criteria.assignee
if filter_criteria.milestone:
params['milestone'] = filter_criteria.milestone
if filter_criteria.labels:
params['labels'] = ','.join(filter_criteria.labels)
if filter_criteria.created_after:
params['since'] = filter_criteria.created_after.isoformat()
if filter_criteria.limit:
params['limit'] = filter_criteria.limit
if filter_criteria.offset:
params['page'] = (filter_criteria.offset // (filter_criteria.limit or 30)) + 1
response = self._api_request('GET', f'/repos/{self.owner}/{self.repo}/issues', params=params)
gitea_issues = response.json()
issues = [self._gitea_issue_to_unified(gitea_issue) for gitea_issue in gitea_issues]
# Apply additional filtering that Gitea API doesn't support
if filter_criteria:
if filter_criteria.search:
search_term = filter_criteria.search.lower()
issues = [
issue for issue in issues
if search_term in issue.title.lower() or search_term in issue.description.lower()
]
return issues
def search_issues(self, query: str, limit: Optional[int] = None) -> List[Issue]:
"""Search issues - limited Gitea API support."""
# Gitea has limited search API, fallback to list with search filter
filter_criteria = IssueFilter(search=query, limit=limit)
return self.list_issues(filter_criteria)
# Label Operations
def create_label(self, label: Label) -> Label:
"""Create label in Gitea."""
data = {
'name': label.name,
'color': label.color or '#000000',
'description': label.description or ''
}
response = self._api_request('POST', f'/repos/{self.owner}/{self.repo}/labels', data=data)
gitea_label = response.json()
return Label(
name=gitea_label['name'],
color=gitea_label['color'],
description=gitea_label.get('description', ''),
backend_id=str(gitea_label['id'])
)
def get_labels(self) -> List[Label]:
"""Get all labels from Gitea."""
response = self._api_request('GET', f'/repos/{self.owner}/{self.repo}/labels')
gitea_labels = response.json()
return [Label(
name=label['name'],
color=label['color'],
description=label.get('description', ''),
backend_id=str(label['id'])
) for label in gitea_labels]
def update_label(self, label: Label) -> Label:
"""Update label in Gitea."""
data = {
'name': label.name,
'color': label.color or '#000000',
'description': label.description or ''
}
response = self._api_request('PATCH', f'/repos/{self.owner}/{self.repo}/labels/{label.name}', data=data)
gitea_label = response.json()
return Label(
name=gitea_label['name'],
color=gitea_label['color'],
description=gitea_label.get('description', ''),
backend_id=str(gitea_label['id'])
)
def delete_label(self, label_name: str) -> bool:
"""Delete label from Gitea."""
try:
self._api_request('DELETE', f'/repos/{self.owner}/{self.repo}/labels/{label_name}')
return True
except GiteaAPIError:
return False
# User Operations
def get_users(self) -> List[User]:
"""Get repository collaborators."""
response = self._api_request('GET', f'/repos/{self.owner}/{self.repo}/collaborators')
gitea_users = response.json()
return [User(
id=str(user['id']),
username=user['login'],
display_name=user.get('full_name', ''),
email=user.get('email', ''),
avatar_url=user.get('avatar_url', ''),
backend_id=str(user['id'])
) for user in gitea_users]
def get_user(self, user_id: str) -> Optional[User]:
"""Get specific user."""
try:
response = self._api_request('GET', f'/users/{user_id}')
user = response.json()
return User(
id=str(user['id']),
username=user['login'],
display_name=user.get('full_name', ''),
email=user.get('email', ''),
avatar_url=user.get('avatar_url', ''),
backend_id=str(user['id'])
)
except GiteaAPIError:
return None
def search_users(self, query: str) -> List[User]:
"""Search users in Gitea."""
params = {'q': query, 'limit': 50}
response = self._api_request('GET', '/users/search', params=params)
search_result = response.json()
return [User(
id=str(user['id']),
username=user['login'],
display_name=user.get('full_name', ''),
email=user.get('email', ''),
avatar_url=user.get('avatar_url', ''),
backend_id=str(user['id'])
) for user in search_result.get('data', [])]
# Milestone Operations
def create_milestone(self, milestone: Milestone) -> Milestone:
"""Create milestone in Gitea."""
data = {
'title': milestone.title,
'description': milestone.description or '',
'due_on': milestone.due_date.isoformat() if milestone.due_date else None
}
response = self._api_request('POST', f'/repos/{self.owner}/{self.repo}/milestones', data=data)
gitea_milestone = response.json()
return Milestone(
id=str(gitea_milestone['id']),
title=gitea_milestone['title'],
description=gitea_milestone.get('description', ''),
state=gitea_milestone['state'],
due_date=datetime.fromisoformat(gitea_milestone['due_on'].replace('Z', '+00:00')) if gitea_milestone.get('due_on') else None,
created_at=datetime.fromisoformat(gitea_milestone['created_at'].replace('Z', '+00:00')),
updated_at=datetime.fromisoformat(gitea_milestone['updated_at'].replace('Z', '+00:00')),
backend_id=str(gitea_milestone['id'])
)
def get_milestones(self) -> List[Milestone]:
"""Get all milestones."""
response = self._api_request('GET', f'/repos/{self.owner}/{self.repo}/milestones')
gitea_milestones = response.json()
return [Milestone(
id=str(m['id']),
title=m['title'],
description=m.get('description', ''),
state=m['state'],
due_date=datetime.fromisoformat(m['due_on'].replace('Z', '+00:00')) if m.get('due_on') else None,
created_at=datetime.fromisoformat(m['created_at'].replace('Z', '+00:00')),
updated_at=datetime.fromisoformat(m['updated_at'].replace('Z', '+00:00')),
backend_id=str(m['id'])
) for m in gitea_milestones]
def update_milestone(self, milestone: Milestone) -> Milestone:
"""Update milestone."""
data = {
'title': milestone.title,
'description': milestone.description or '',
'state': milestone.state,
'due_on': milestone.due_date.isoformat() if milestone.due_date else None
}
response = self._api_request('PATCH', f'/repos/{self.owner}/{self.repo}/milestones/{milestone.backend_id}', data=data)
gitea_milestone = response.json()
return Milestone(
id=str(gitea_milestone['id']),
title=gitea_milestone['title'],
description=gitea_milestone.get('description', ''),
state=gitea_milestone['state'],
due_date=datetime.fromisoformat(gitea_milestone['due_on'].replace('Z', '+00:00')) if gitea_milestone.get('due_on') else None,
created_at=datetime.fromisoformat(gitea_milestone['created_at'].replace('Z', '+00:00')),
updated_at=datetime.fromisoformat(gitea_milestone['updated_at'].replace('Z', '+00:00')),
backend_id=str(gitea_milestone['id'])
)
def delete_milestone(self, milestone_id: str) -> bool:
"""Delete milestone."""
try:
self._api_request('DELETE', f'/repos/{self.owner}/{self.repo}/milestones/{milestone_id}')
return True
except GiteaAPIError:
return False
# Comment Operations
def add_comment(self, issue_id: str, comment: Comment) -> Comment:
"""Add comment to issue."""
data = {'body': comment.body}
response = self._api_request('POST', f'/repos/{self.owner}/{self.repo}/issues/{issue_id}/comments', data=data)
gitea_comment = response.json()
# Convert author
author_data = gitea_comment['user']
author = User(
id=str(author_data['id']),
username=author_data['login'],
display_name=author_data.get('full_name', ''),
email=author_data.get('email', ''),
avatar_url=author_data.get('avatar_url', ''),
backend_id=str(author_data['id'])
)
return Comment(
id=str(gitea_comment['id']),
body=gitea_comment['body'],
author=author,
created_at=datetime.fromisoformat(gitea_comment['created_at'].replace('Z', '+00:00')),
updated_at=datetime.fromisoformat(gitea_comment['updated_at'].replace('Z', '+00:00')),
backend_id=str(gitea_comment['id'])
)
def get_comments(self, issue_id: str) -> List[Comment]:
"""Get comments for issue."""
response = self._api_request('GET', f'/repos/{self.owner}/{self.repo}/issues/{issue_id}/comments')
gitea_comments = response.json()
comments = []
for gc in gitea_comments:
author_data = gc['user']
author = User(
id=str(author_data['id']),
username=author_data['login'],
display_name=author_data.get('full_name', ''),
email=author_data.get('email', ''),
avatar_url=author_data.get('avatar_url', ''),
backend_id=str(author_data['id'])
)
comment = Comment(
id=str(gc['id']),
body=gc['body'],
author=author,
created_at=datetime.fromisoformat(gc['created_at'].replace('Z', '+00:00')),
updated_at=datetime.fromisoformat(gc['updated_at'].replace('Z', '+00:00')),
backend_id=str(gc['id'])
)
comments.append(comment)
return comments
def update_comment(self, comment: Comment) -> Comment:
"""Update comment."""
data = {'body': comment.body}
response = self._api_request('PATCH', f'/repos/{self.owner}/{self.repo}/issues/comments/{comment.backend_id}', data=data)
gitea_comment = response.json()
# Update comment object
comment.updated_at = datetime.fromisoformat(gitea_comment['updated_at'].replace('Z', '+00:00'))
return comment
def delete_comment(self, comment_id: str) -> bool:
"""Delete comment."""
try:
self._api_request('DELETE', f'/repos/{self.owner}/{self.repo}/issues/comments/{comment_id}')
return True
except GiteaAPIError:
return False
# Sync Support
def get_last_sync_timestamp(self) -> Optional[datetime]:
"""Get last sync timestamp - stored in metadata."""
# Could be stored in repository description or other metadata
return None
def get_issues_modified_since(self, timestamp: datetime) -> List[Issue]:
"""Get issues modified since timestamp."""
filter_criteria = IssueFilter(updated_after=timestamp)
return self.list_issues(filter_criteria)
# SyncableBackend Implementation
def prepare_for_sync(self) -> None:
"""Prepare for sync operation."""
# Could implement rate limiting preparation
pass
def finalize_sync(self, success: bool) -> None:
"""Finalize sync operation."""
# Could log sync status
pass
def get_sync_conflicts(self) -> List[Dict[str, Any]]:
"""Get sync conflicts."""
# Would compare timestamps and detect conflicts
return []
def resolve_sync_conflict(self, issue_id: str, resolution: str) -> Issue:
"""Resolve sync conflict."""
issue = self.get_issue(issue_id)
if not issue:
raise GiteaAPIError(f"Issue {issue_id} not found")
if resolution == 'remote':
# Keep remote version (current issue)
return issue
elif resolution == 'local':
# This would require the local version to be provided
raise NotImplementedError("Local resolution requires local issue data")
else:
raise ValueError(f"Unknown resolution: {resolution}")

View File

@@ -0,0 +1,19 @@
"""
Local SQLite Backend
A local, file-based issue tracking backend using SQLite for storage.
This backend provides complete offline functionality and serves as the
reference implementation for the backend interface.
Features:
- Full CRUD operations
- SQLite database storage
- No external dependencies
- Offline operation
- Fast local search
- Backup and export capabilities
"""
from .backend import LocalSQLiteBackend
__all__ = ['LocalSQLiteBackend']

618
backends/local/backend.py Normal file
View File

@@ -0,0 +1,618 @@
"""
Local SQLite Backend Implementation
Provides a complete local issue tracking backend using SQLite for storage.
This implementation serves as the reference for the backend interface and
provides full offline functionality.
"""
import sqlite3
import json
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import List, Optional, Dict, Any
from ...core.interfaces import LocalBackend, BackendCapabilities, IssueFilter, SyncableBackend
from ...core.models import Issue, Label, User, Milestone, Comment, IssueState, Priority, IssueType
class LocalSQLiteBackend(LocalBackend, SyncableBackend):
"""SQLite-based local backend for issue tracking."""
def __init__(self, db_path: Optional[str] = None):
self.db_path = db_path or "issues.db"
self.connection: Optional[sqlite3.Connection] = None
self._capabilities = BackendCapabilities(
supports_milestones=True,
supports_assignees=True,
supports_comments=True,
supports_labels=True,
supports_search=True,
supports_bulk_operations=True,
supports_webhooks=False,
supports_real_time=False,
max_labels_per_issue=None,
max_assignees_per_issue=None
)
@property
def backend_type(self) -> str:
return "local"
@property
def capabilities(self) -> BackendCapabilities:
return self._capabilities
def connect(self, config: Dict[str, Any]) -> None:
"""Connect to SQLite database."""
db_path = config.get('db_path', self.db_path)
self.db_path = db_path
# Ensure directory exists
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
self.connection = sqlite3.connect(db_path)
self.connection.row_factory = sqlite3.Row # Enable dict-like access
self.connection.execute("PRAGMA foreign_keys = ON")
# Initialize schema
self._initialize_schema()
def disconnect(self) -> None:
"""Disconnect from database."""
if self.connection:
self.connection.close()
self.connection = None
def test_connection(self) -> bool:
"""Test database connection."""
if not self.connection:
return False
try:
self.connection.execute("SELECT 1")
return True
except sqlite3.Error:
return False
def _initialize_schema(self) -> None:
"""Initialize database schema."""
schema_path = Path(__file__).parent / "schema.sql"
with open(schema_path, 'r') as f:
schema_sql = f.read()
# Execute schema in parts (SQLite doesn't like multiple statements)
for statement in schema_sql.split(';'):
statement = statement.strip()
if statement:
self.connection.execute(statement)
self.connection.commit()
def _get_next_issue_number(self) -> int:
"""Get the next available issue number."""
cursor = self.connection.execute("SELECT MAX(number) FROM issues")
result = cursor.fetchone()
return (result[0] or 0) + 1
def _issue_from_row(self, row: sqlite3.Row) -> Issue:
"""Convert database row to Issue object."""
# Get labels
cursor = self.connection.execute("""
SELECT l.id, l.name, l.color, l.description, l.backend_id
FROM labels l
JOIN issue_labels il ON l.id = il.label_id
WHERE il.issue_id = ?
""", (row['id'],))
label_rows = cursor.fetchall()
labels = [Label(
name=lr['name'],
color=lr['color'],
description=lr['description'],
backend_id=lr['backend_id']
) for lr in label_rows]
# Get assignees
cursor = self.connection.execute("""
SELECT u.id, u.username, u.display_name, u.email, u.avatar_url, u.backend_id
FROM users u
JOIN issue_assignees ia ON u.id = ia.user_id
WHERE ia.issue_id = ?
""", (row['id'],))
user_rows = cursor.fetchall()
assignees = [User(
id=ur['id'],
username=ur['username'],
display_name=ur['display_name'],
email=ur['email'],
avatar_url=ur['avatar_url'],
backend_id=ur['backend_id']
) for ur in user_rows]
# Get milestone
milestone = None
if row['milestone_id']:
cursor = self.connection.execute("""
SELECT id, title, description, state, due_date, created_at, updated_at, backend_id
FROM milestones WHERE id = ?
""", (row['milestone_id'],))
m_row = cursor.fetchone()
if m_row:
milestone = Milestone(
id=m_row['id'],
title=m_row['title'],
description=m_row['description'],
state=m_row['state'],
due_date=datetime.fromisoformat(m_row['due_date']) if m_row['due_date'] else None,
created_at=datetime.fromisoformat(m_row['created_at']) if m_row['created_at'] else None,
updated_at=datetime.fromisoformat(m_row['updated_at']) if m_row['updated_at'] else None,
backend_id=m_row['backend_id']
)
# Parse sync metadata
sync_metadata = {}
if row['sync_metadata']:
try:
sync_metadata = json.loads(row['sync_metadata'])
except json.JSONDecodeError:
pass
return Issue(
id=row['id'],
number=row['number'],
title=row['title'],
description=row['description'],
state=IssueState.from_string(row['state']),
created_at=datetime.fromisoformat(row['created_at']),
updated_at=datetime.fromisoformat(row['updated_at']),
closed_at=datetime.fromisoformat(row['closed_at']) if row['closed_at'] else None,
labels=labels,
assignees=assignees,
milestone=milestone,
backend_id=row['backend_id'],
backend_type=row['backend_type'],
sync_metadata=sync_metadata
)
# Issue CRUD Operations
def create_issue(self, issue: Issue) -> Issue:
"""Create a new issue."""
if not issue.id:
issue.id = str(uuid.uuid4())
if not issue.number:
issue.number = self._get_next_issue_number()
# Insert issue
self.connection.execute("""
INSERT INTO issues (id, number, title, description, state, created_at, updated_at,
closed_at, milestone_id, backend_id, backend_type, sync_metadata)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
issue.id,
issue.number,
issue.title,
issue.description,
issue.state.value,
issue.created_at.isoformat(),
issue.updated_at.isoformat(),
issue.closed_at.isoformat() if issue.closed_at else None,
issue.milestone.id if issue.milestone else None,
issue.backend_id,
issue.backend_type or 'local',
json.dumps(issue.sync_metadata) if issue.sync_metadata else None
))
# Add labels
for label in issue.labels:
self._ensure_label_exists(label)
self.connection.execute("""
INSERT OR IGNORE INTO issue_labels (issue_id, label_id)
VALUES (?, ?)
""", (issue.id, label.name)) # Using name as ID for simplicity
# Add assignees
for user in issue.assignees:
self._ensure_user_exists(user)
self.connection.execute("""
INSERT OR IGNORE INTO issue_assignees (issue_id, user_id)
VALUES (?, ?)
""", (issue.id, user.id))
self.connection.commit()
return issue
def get_issue(self, issue_id: str) -> Optional[Issue]:
"""Get issue by ID."""
cursor = self.connection.execute("""
SELECT * FROM issues WHERE id = ? OR backend_id = ?
""", (issue_id, issue_id))
row = cursor.fetchone()
return self._issue_from_row(row) if row else None
def get_issue_by_number(self, number: int) -> Optional[Issue]:
"""Get issue by number."""
cursor = self.connection.execute("""
SELECT * FROM issues WHERE number = ?
""", (number,))
row = cursor.fetchone()
return self._issue_from_row(row) if row else None
def update_issue(self, issue: Issue) -> Issue:
"""Update existing issue."""
# Update main issue record
self.connection.execute("""
UPDATE issues SET
title = ?, description = ?, state = ?, updated_at = ?,
closed_at = ?, milestone_id = ?, sync_metadata = ?
WHERE id = ?
""", (
issue.title,
issue.description,
issue.state.value,
issue.updated_at.isoformat(),
issue.closed_at.isoformat() if issue.closed_at else None,
issue.milestone.id if issue.milestone else None,
json.dumps(issue.sync_metadata) if issue.sync_metadata else None,
issue.id
))
# Update labels (remove all and re-add)
self.connection.execute("DELETE FROM issue_labels WHERE issue_id = ?", (issue.id,))
for label in issue.labels:
self._ensure_label_exists(label)
self.connection.execute("""
INSERT INTO issue_labels (issue_id, label_id) VALUES (?, ?)
""", (issue.id, label.name))
# Update assignees (remove all and re-add)
self.connection.execute("DELETE FROM issue_assignees WHERE issue_id = ?", (issue.id,))
for user in issue.assignees:
self._ensure_user_exists(user)
self.connection.execute("""
INSERT INTO issue_assignees (issue_id, user_id) VALUES (?, ?)
""", (issue.id, user.id))
self.connection.commit()
return issue
def delete_issue(self, issue_id: str) -> bool:
"""Delete issue."""
cursor = self.connection.execute("DELETE FROM issues WHERE id = ?", (issue_id,))
self.connection.commit()
return cursor.rowcount > 0
def list_issues(self, filter_criteria: Optional[IssueFilter] = None) -> List[Issue]:
"""List issues with optional filtering."""
query = "SELECT * FROM issues WHERE 1=1"
params = []
if filter_criteria:
if filter_criteria.state:
query += " AND state = ?"
params.append(filter_criteria.state)
if filter_criteria.search:
query += " AND (title LIKE ? OR description LIKE ?)"
search_term = f"%{filter_criteria.search}%"
params.extend([search_term, search_term])
if filter_criteria.created_after:
query += " AND created_at >= ?"
params.append(filter_criteria.created_after.isoformat())
if filter_criteria.created_before:
query += " AND created_at <= ?"
params.append(filter_criteria.created_before.isoformat())
if filter_criteria.updated_after:
query += " AND updated_at >= ?"
params.append(filter_criteria.updated_after.isoformat())
if filter_criteria.updated_before:
query += " AND updated_at <= ?"
params.append(filter_criteria.updated_before.isoformat())
query += " ORDER BY updated_at DESC"
if filter_criteria and filter_criteria.limit:
query += " LIMIT ?"
params.append(filter_criteria.limit)
if filter_criteria.offset:
query += " OFFSET ?"
params.append(filter_criteria.offset)
cursor = self.connection.execute(query, params)
rows = cursor.fetchall()
return [self._issue_from_row(row) for row in rows]
def search_issues(self, query: str, limit: Optional[int] = None) -> List[Issue]:
"""Search issues using FTS if available, otherwise fallback to LIKE."""
try:
# Try FTS search first
fts_query = """
SELECT i.* FROM issues i
JOIN issue_search s ON i.id = s.issue_id
WHERE issue_search MATCH ?
ORDER BY rank
"""
params = [query]
if limit:
fts_query += " LIMIT ?"
params.append(limit)
cursor = self.connection.execute(fts_query, params)
rows = cursor.fetchall()
return [self._issue_from_row(row) for row in rows]
except sqlite3.OperationalError:
# Fallback to LIKE search
filter_criteria = IssueFilter(search=query, limit=limit)
return self.list_issues(filter_criteria)
# Helper methods
def _ensure_label_exists(self, label: Label) -> None:
"""Ensure label exists in database."""
self.connection.execute("""
INSERT OR IGNORE INTO labels (id, name, color, description, backend_id)
VALUES (?, ?, ?, ?, ?)
""", (label.name, label.name, label.color, label.description, label.backend_id))
def _ensure_user_exists(self, user: User) -> None:
"""Ensure user exists in database."""
self.connection.execute("""
INSERT OR IGNORE INTO users (id, username, display_name, email, avatar_url, backend_id)
VALUES (?, ?, ?, ?, ?, ?)
""", (user.id, user.username, user.display_name, user.email, user.avatar_url, user.backend_id))
# Label Operations
def create_label(self, label: Label) -> Label:
"""Create a new label."""
label_id = label.name # Use name as ID
self.connection.execute("""
INSERT INTO labels (id, name, color, description, backend_id)
VALUES (?, ?, ?, ?, ?)
""", (label_id, label.name, label.color, label.description, label.backend_id))
self.connection.commit()
return label
def get_labels(self) -> List[Label]:
"""Get all labels."""
cursor = self.connection.execute("SELECT * FROM labels ORDER BY name")
rows = cursor.fetchall()
return [Label(
name=row['name'],
color=row['color'],
description=row['description'],
backend_id=row['backend_id']
) for row in rows]
def update_label(self, label: Label) -> Label:
"""Update label."""
self.connection.execute("""
UPDATE labels SET color = ?, description = ? WHERE name = ?
""", (label.color, label.description, label.name))
self.connection.commit()
return label
def delete_label(self, label_name: str) -> bool:
"""Delete label."""
cursor = self.connection.execute("DELETE FROM labels WHERE name = ?", (label_name,))
self.connection.commit()
return cursor.rowcount > 0
# User Operations
def get_users(self) -> List[User]:
"""Get all users."""
cursor = self.connection.execute("SELECT * FROM users ORDER BY username")
rows = cursor.fetchall()
return [User(
id=row['id'],
username=row['username'],
display_name=row['display_name'],
email=row['email'],
avatar_url=row['avatar_url'],
backend_id=row['backend_id']
) for row in rows]
def get_user(self, user_id: str) -> Optional[User]:
"""Get user by ID."""
cursor = self.connection.execute("SELECT * FROM users WHERE id = ?", (user_id,))
row = cursor.fetchone()
if row:
return User(
id=row['id'],
username=row['username'],
display_name=row['display_name'],
email=row['email'],
avatar_url=row['avatar_url'],
backend_id=row['backend_id']
)
return None
def search_users(self, query: str) -> List[User]:
"""Search users."""
cursor = self.connection.execute("""
SELECT * FROM users
WHERE username LIKE ? OR display_name LIKE ? OR email LIKE ?
ORDER BY username
""", (f"%{query}%", f"%{query}%", f"%{query}%"))
rows = cursor.fetchall()
return [User(
id=row['id'],
username=row['username'],
display_name=row['display_name'],
email=row['email'],
avatar_url=row['avatar_url'],
backend_id=row['backend_id']
) for row in rows]
# Milestone Operations
def create_milestone(self, milestone: Milestone) -> Milestone:
"""Create milestone."""
if not milestone.id:
milestone.id = str(uuid.uuid4())
self.connection.execute("""
INSERT INTO milestones (id, title, description, state, due_date, created_at, updated_at, backend_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
milestone.id,
milestone.title,
milestone.description,
milestone.state,
milestone.due_date.isoformat() if milestone.due_date else None,
milestone.created_at.isoformat() if milestone.created_at else datetime.now(timezone.utc).isoformat(),
milestone.updated_at.isoformat() if milestone.updated_at else datetime.now(timezone.utc).isoformat(),
milestone.backend_id
))
self.connection.commit()
return milestone
def get_milestones(self) -> List[Milestone]:
"""Get all milestones."""
cursor = self.connection.execute("SELECT * FROM milestones ORDER BY title")
rows = cursor.fetchall()
return [Milestone(
id=row['id'],
title=row['title'],
description=row['description'],
state=row['state'],
due_date=datetime.fromisoformat(row['due_date']) if row['due_date'] else None,
created_at=datetime.fromisoformat(row['created_at']) if row['created_at'] else None,
updated_at=datetime.fromisoformat(row['updated_at']) if row['updated_at'] else None,
backend_id=row['backend_id']
) for row in rows]
def update_milestone(self, milestone: Milestone) -> Milestone:
"""Update milestone."""
self.connection.execute("""
UPDATE milestones SET title = ?, description = ?, state = ?, due_date = ?, updated_at = ?
WHERE id = ?
""", (
milestone.title,
milestone.description,
milestone.state,
milestone.due_date.isoformat() if milestone.due_date else None,
datetime.now(timezone.utc).isoformat(),
milestone.id
))
self.connection.commit()
return milestone
def delete_milestone(self, milestone_id: str) -> bool:
"""Delete milestone."""
cursor = self.connection.execute("DELETE FROM milestones WHERE id = ?", (milestone_id,))
self.connection.commit()
return cursor.rowcount > 0
# Comment Operations
def add_comment(self, issue_id: str, comment: Comment) -> Comment:
"""Add comment to issue."""
if not comment.id:
comment.id = str(uuid.uuid4())
self._ensure_user_exists(comment.author)
self.connection.execute("""
INSERT INTO comments (id, issue_id, author_id, body, created_at, updated_at, backend_id)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
comment.id,
issue_id,
comment.author.id,
comment.body,
comment.created_at.isoformat(),
comment.updated_at.isoformat() if comment.updated_at else None,
comment.backend_id
))
self.connection.commit()
return comment
def get_comments(self, issue_id: str) -> List[Comment]:
"""Get comments for issue."""
cursor = self.connection.execute("""
SELECT c.*, u.id as user_id, u.username, u.display_name, u.email, u.avatar_url, u.backend_id as user_backend_id
FROM comments c
JOIN users u ON c.author_id = u.id
WHERE c.issue_id = ?
ORDER BY c.created_at
""", (issue_id,))
rows = cursor.fetchall()
comments = []
for row in rows:
author = User(
id=row['user_id'],
username=row['username'],
display_name=row['display_name'],
email=row['email'],
avatar_url=row['avatar_url'],
backend_id=row['user_backend_id']
)
comment = Comment(
id=row['id'],
body=row['body'],
author=author,
created_at=datetime.fromisoformat(row['created_at']),
updated_at=datetime.fromisoformat(row['updated_at']) if row['updated_at'] else None,
backend_id=row['backend_id']
)
comments.append(comment)
return comments
def update_comment(self, comment: Comment) -> Comment:
"""Update comment."""
self.connection.execute("""
UPDATE comments SET body = ?, updated_at = ? WHERE id = ?
""", (comment.body, datetime.now(timezone.utc).isoformat(), comment.id))
self.connection.commit()
return comment
def delete_comment(self, comment_id: str) -> bool:
"""Delete comment."""
cursor = self.connection.execute("DELETE FROM comments WHERE id = ?", (comment_id,))
self.connection.commit()
return cursor.rowcount > 0
# Sync Support
def get_last_sync_timestamp(self) -> Optional[datetime]:
"""Get last sync timestamp."""
cursor = self.connection.execute("""
SELECT sync_timestamp FROM sync_history
WHERE success = 1
ORDER BY sync_timestamp DESC
LIMIT 1
""")
row = cursor.fetchone()
return datetime.fromisoformat(row[0]) if row else None
def get_issues_modified_since(self, timestamp: datetime) -> List[Issue]:
"""Get issues modified since timestamp."""
filter_criteria = IssueFilter(updated_after=timestamp)
return self.list_issues(filter_criteria)
# SyncableBackend Implementation
def prepare_for_sync(self) -> None:
"""Prepare for sync operation."""
# Could create backup or start transaction
pass
def finalize_sync(self, success: bool) -> None:
"""Finalize sync operation."""
# Log sync operation
self.connection.execute("""
INSERT INTO sync_history (backend_type, success, sync_timestamp)
VALUES (?, ?, ?)
""", ('sync', success, datetime.now(timezone.utc).isoformat()))
self.connection.commit()
def get_sync_conflicts(self) -> List[Dict[str, Any]]:
"""Get sync conflicts."""
# For local backend, no conflicts since it's the source of truth
return []
def resolve_sync_conflict(self, issue_id: str, resolution: str) -> Issue:
"""Resolve sync conflict."""
# Local backend doesn't have conflicts
return self.get_issue(issue_id)

189
backends/local/schema.sql Normal file
View File

@@ -0,0 +1,189 @@
-- Local Issue Tracking Database Schema
-- SQLite schema for local issue storage with full referential integrity
-- Enable foreign key constraints
PRAGMA foreign_keys = ON;
-- Issues table - core issue data
CREATE TABLE IF NOT EXISTS issues (
id TEXT PRIMARY KEY,
number INTEGER UNIQUE NOT NULL,
title TEXT NOT NULL,
description TEXT NOT NULL DEFAULT '',
state TEXT NOT NULL CHECK (state IN ('open', 'closed', 'in_progress', 'blocked')),
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
closed_at TIMESTAMP NULL,
milestone_id TEXT,
backend_id TEXT,
backend_type TEXT DEFAULT 'local',
sync_metadata TEXT, -- JSON for sync data
FOREIGN KEY (milestone_id) REFERENCES milestones(id) ON DELETE SET NULL
);
-- Create index for issue number lookups
CREATE INDEX IF NOT EXISTS idx_issues_number ON issues(number);
CREATE INDEX IF NOT EXISTS idx_issues_state ON issues(state);
CREATE INDEX IF NOT EXISTS idx_issues_updated_at ON issues(updated_at);
CREATE INDEX IF NOT EXISTS idx_issues_backend_id ON issues(backend_id);
-- Labels table
CREATE TABLE IF NOT EXISTS labels (
id TEXT PRIMARY KEY,
name TEXT UNIQUE NOT NULL,
color TEXT,
description TEXT,
backend_id TEXT,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
-- Create index for label name lookups
CREATE INDEX IF NOT EXISTS idx_labels_name ON labels(name);
-- Issue-Label many-to-many relationship
CREATE TABLE IF NOT EXISTS issue_labels (
issue_id TEXT NOT NULL,
label_id TEXT NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (issue_id, label_id),
FOREIGN KEY (issue_id) REFERENCES issues(id) ON DELETE CASCADE,
FOREIGN KEY (label_id) REFERENCES labels(id) ON DELETE CASCADE
);
-- Users table
CREATE TABLE IF NOT EXISTS users (
id TEXT PRIMARY KEY,
username TEXT UNIQUE NOT NULL,
display_name TEXT,
email TEXT,
avatar_url TEXT,
backend_id TEXT,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
-- Create index for username lookups
CREATE INDEX IF NOT EXISTS idx_users_username ON users(username);
-- Issue-User assignment many-to-many relationship
CREATE TABLE IF NOT EXISTS issue_assignees (
issue_id TEXT NOT NULL,
user_id TEXT NOT NULL,
assigned_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (issue_id, user_id),
FOREIGN KEY (issue_id) REFERENCES issues(id) ON DELETE CASCADE,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
);
-- Milestones table
CREATE TABLE IF NOT EXISTS milestones (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
description TEXT,
state TEXT NOT NULL DEFAULT 'open' CHECK (state IN ('open', 'closed')),
due_date TIMESTAMP,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
backend_id TEXT
);
-- Create index for milestone title lookups
CREATE INDEX IF NOT EXISTS idx_milestones_title ON milestones(title);
-- Comments table
CREATE TABLE IF NOT EXISTS comments (
id TEXT PRIMARY KEY,
issue_id TEXT NOT NULL,
author_id TEXT NOT NULL,
body TEXT NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP,
backend_id TEXT,
FOREIGN KEY (issue_id) REFERENCES issues(id) ON DELETE CASCADE,
FOREIGN KEY (author_id) REFERENCES users(id) ON DELETE CASCADE
);
-- Create index for comment lookups
CREATE INDEX IF NOT EXISTS idx_comments_issue_id ON comments(issue_id);
CREATE INDEX IF NOT EXISTS idx_comments_created_at ON comments(created_at);
-- Sync tracking table
CREATE TABLE IF NOT EXISTS sync_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
backend_type TEXT NOT NULL,
sync_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
success BOOLEAN NOT NULL,
issues_synced INTEGER DEFAULT 0,
errors_count INTEGER DEFAULT 0,
details TEXT -- JSON for sync details
);
-- Configuration table for backend settings
CREATE TABLE IF NOT EXISTS config (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
-- Triggers to automatically update updated_at timestamps
CREATE TRIGGER IF NOT EXISTS update_issues_timestamp
AFTER UPDATE ON issues
BEGIN
UPDATE issues SET updated_at = CURRENT_TIMESTAMP WHERE id = NEW.id;
END;
CREATE TRIGGER IF NOT EXISTS update_milestones_timestamp
AFTER UPDATE ON milestones
BEGIN
UPDATE milestones SET updated_at = CURRENT_TIMESTAMP WHERE id = NEW.id;
END;
-- Views for common queries
CREATE VIEW IF NOT EXISTS issue_summary AS
SELECT
i.id,
i.number,
i.title,
i.state,
i.created_at,
i.updated_at,
i.closed_at,
m.title as milestone_title,
COUNT(c.id) as comment_count,
GROUP_CONCAT(l.name) as labels,
GROUP_CONCAT(u.username) as assignees
FROM issues i
LEFT JOIN milestones m ON i.milestone_id = m.id
LEFT JOIN comments c ON i.id = c.issue_id
LEFT JOIN issue_labels il ON i.id = il.issue_id
LEFT JOIN labels l ON il.label_id = l.id
LEFT JOIN issue_assignees ia ON i.id = ia.issue_id
LEFT JOIN users u ON ia.user_id = u.id
GROUP BY i.id, i.number, i.title, i.state, i.created_at, i.updated_at, i.closed_at, m.title;
-- Full-text search setup (if SQLite supports FTS)
CREATE VIRTUAL TABLE IF NOT EXISTS issue_search USING fts5(
issue_id,
title,
description,
labels,
content='issues'
);
-- Trigger to keep FTS index updated
CREATE TRIGGER IF NOT EXISTS issue_search_insert AFTER INSERT ON issues
BEGIN
INSERT INTO issue_search(issue_id, title, description)
VALUES (NEW.id, NEW.title, NEW.description);
END;
CREATE TRIGGER IF NOT EXISTS issue_search_update AFTER UPDATE ON issues
BEGIN
UPDATE issue_search
SET title = NEW.title, description = NEW.description
WHERE issue_id = NEW.id;
END;
CREATE TRIGGER IF NOT EXISTS issue_search_delete AFTER DELETE ON issues
BEGIN
DELETE FROM issue_search WHERE issue_id = OLD.id;
END;

20
cli/__init__.py Normal file
View File

@@ -0,0 +1,20 @@
"""
Command Line Interface for Universal Issue Tracking
Provides a comprehensive CLI for managing issues across different backends.
The CLI is designed to be intuitive and follows common patterns from
tools like git, gh (GitHub CLI), and similar utilities.
Commands:
- issue list: List issues
- issue show: Show issue details
- issue create: Create new issue
- issue edit: Edit existing issue
- issue close: Close issue
- issue reopen: Reopen issue
- issue comment: Add comment
- issue label: Manage labels
- issue assign: Manage assignments
- backend: Manage backends
- sync: Synchronization operations
"""

141
cli/backend_commands.py Normal file
View File

@@ -0,0 +1,141 @@
"""
Backend Management CLI Commands
Commands for configuring and managing issue tracking backends.
"""
import click
from .utils import (
load_backend_configs, save_backend_configs, format_backend_list,
test_backend_connection, validate_backend_type, echo_success,
echo_error, echo_warning, confirm_action
)
@click.group()
def backend_group():
"""Backend configuration and management."""
pass
@backend_group.command('list')
def list_backends():
"""List configured backends."""
configs = load_backend_configs()
click.echo(format_backend_list(configs))
@backend_group.command('add')
@click.argument('name')
@click.argument('backend_type', type=click.Choice(['local', 'gitea']))
@click.pass_context
def add_backend(ctx, name, backend_type):
"""Add a new backend configuration."""
configs = load_backend_configs()
if name in configs:
if not confirm_action(f"Backend '{name}' already exists. Overwrite?"):
click.echo("Aborted")
return
if backend_type == 'local':
db_path = click.prompt('Database path', default=f'~/.config/issue-tracker/{name}.db')
config = {
'type': 'local',
'db_path': str(db_path)
}
elif backend_type == 'gitea':
base_url = click.prompt('Gitea base URL (e.g., https://git.example.com)')
owner = click.prompt('Repository owner/organization')
repo = click.prompt('Repository name')
token = click.prompt('Access token', hide_input=True)
config = {
'type': 'gitea',
'base_url': base_url.rstrip('/'),
'owner': owner,
'repo': repo,
'token': token
}
# Test connection
click.echo("Testing connection...")
if test_backend_connection(config):
echo_success("Connection successful!")
else:
echo_warning("Connection test failed, but configuration will be saved anyway.")
# Save configuration
configs[name] = config
save_backend_configs(configs)
echo_success(f"Backend '{name}' added successfully")
# Set as default if it's the first one
if 'default' not in configs:
configs['default'] = name
save_backend_configs(configs)
echo_info(f"Set '{name}' as default backend")
@backend_group.command('remove')
@click.argument('name')
def remove_backend(name):
"""Remove a backend configuration."""
configs = load_backend_configs()
if name not in configs:
echo_error(f"Backend '{name}' not found")
return
if not confirm_action(f"Remove backend '{name}'?"):
click.echo("Aborted")
return
del configs[name]
# Update default if necessary
if configs.get('default') == name:
remaining_backends = [k for k in configs.keys() if k != 'default']
if remaining_backends:
configs['default'] = remaining_backends[0]
echo_info(f"Set '{configs['default']}' as new default backend")
else:
del configs['default']
save_backend_configs(configs)
echo_success(f"Backend '{name}' removed")
@backend_group.command('test')
@click.argument('name')
def test_backend(name):
"""Test backend connection."""
configs = load_backend_configs()
if name not in configs:
echo_error(f"Backend '{name}' not found")
return
config = configs[name]
click.echo(f"Testing connection to '{name}'...")
if test_backend_connection(config):
echo_success("Connection successful!")
else:
echo_error("Connection failed!")
@backend_group.command('set-default')
@click.argument('name')
def set_default_backend(name):
"""Set default backend."""
configs = load_backend_configs()
if name not in configs:
echo_error(f"Backend '{name}' not found")
return
configs['default'] = name
save_backend_configs(configs)
echo_success(f"Set '{name}' as default backend")

417
cli/commands.py Normal file
View File

@@ -0,0 +1,417 @@
"""
Issue Management CLI Commands
Core commands for managing issues: create, list, show, edit, close, etc.
"""
import click
from datetime import datetime, timezone
from typing import Optional
from ..core.models import Issue, Label, User, IssueState, Priority, IssueType
from ..core.interfaces import IssueFilter
from .utils import get_backend, format_issue, format_issue_list, get_user_input
@click.group()
def issue_group():
"""Issue management commands."""
pass
@issue_group.command('list')
@click.option('--state', type=click.Choice(['open', 'closed', 'all']), default='open', help='Issue state filter')
@click.option('--assignee', help='Filter by assignee')
@click.option('--label', multiple=True, help='Filter by labels')
@click.option('--milestone', help='Filter by milestone')
@click.option('--search', help='Search in title and description')
@click.option('--limit', type=int, default=30, help='Maximum number of issues to show')
@click.option('--format', 'output_format', type=click.Choice(['table', 'json', 'compact']), default='table', help='Output format')
@click.pass_context
def list_issues(ctx, state, assignee, label, milestone, search, limit, output_format):
"""List issues with optional filtering."""
backend = get_backend(ctx)
# Build filter criteria
filter_criteria = IssueFilter(
state=None if state == 'all' else state,
assignee=assignee,
labels=list(label) if label else None,
milestone=milestone,
search=search,
limit=limit
)
try:
issues = backend.list_issues(filter_criteria)
if not issues:
click.echo("No issues found.")
return
if output_format == 'json':
import json
click.echo(json.dumps([issue.to_dict() for issue in issues], indent=2))
elif output_format == 'compact':
for issue in issues:
labels_str = ', '.join(label.name for label in issue.labels[:3])
if len(issue.labels) > 3:
labels_str += f' (+{len(issue.labels) - 3} more)'
assignee_str = issue.primary_assignee.username if issue.primary_assignee else 'unassigned'
click.echo(f"#{issue.number:4d} {issue.state.value:10s} {issue.title[:50]:50s} {assignee_str:15s} {labels_str}")
else: # table format
click.echo(format_issue_list(issues))
except Exception as e:
raise click.ClickException(f"Failed to list issues: {e}")
@issue_group.command('show')
@click.argument('issue_number', type=int)
@click.option('--comments', is_flag=True, help='Show comments')
@click.option('--format', 'output_format', type=click.Choice(['detailed', 'json', 'compact']), default='detailed', help='Output format')
@click.pass_context
def show_issue(ctx, issue_number, comments, output_format):
"""Show detailed information about an issue."""
backend = get_backend(ctx)
try:
issue = backend.get_issue_by_number(issue_number)
if not issue:
raise click.ClickException(f"Issue #{issue_number} not found")
if output_format == 'json':
import json
issue_dict = issue.to_dict()
if comments:
issue_dict['comments'] = [
{
'id': c.id,
'body': c.body,
'author': c.author.username,
'created_at': c.created_at.isoformat()
}
for c in backend.get_comments(issue.id)
]
click.echo(json.dumps(issue_dict, indent=2))
else:
click.echo(format_issue(issue, show_comments=comments, backend=backend if comments else None))
except Exception as e:
raise click.ClickException(f"Failed to show issue: {e}")
@issue_group.command('create')
@click.argument('title')
@click.option('--description', '-d', help='Issue description')
@click.option('--label', '-l', multiple=True, help='Labels to add')
@click.option('--assignee', '-a', help='Assign to user')
@click.option('--milestone', '-m', help='Milestone')
@click.option('--priority', type=click.Choice(['low', 'medium', 'high', 'critical']), help='Issue priority')
@click.option('--type', 'issue_type', type=click.Choice(['bug', 'feature', 'enhancement', 'task', 'documentation', 'question']), help='Issue type')
@click.option('--interactive', '-i', is_flag=True, help='Interactive mode')
@click.pass_context
def create_issue(ctx, title, description, label, assignee, milestone, priority, issue_type, interactive):
"""Create a new issue."""
backend = get_backend(ctx)
try:
# Interactive mode
if interactive:
title = title or click.prompt('Title')
description = get_user_input('Description (optional)', multiline=True)
# Show available labels
available_labels = backend.get_labels()
if available_labels:
click.echo(f"\nAvailable labels: {', '.join(l.name for l in available_labels)}")
label = click.prompt('Labels (comma-separated, optional)', default='').split(',') if click.prompt('Add labels?', type=bool, default=False) else []
# Show available users
available_users = backend.get_users()
if available_users:
click.echo(f"\nAvailable users: {', '.join(u.username for u in available_users)}")
assignee = click.prompt('Assignee (optional)', default='') or None
# Show available milestones
available_milestones = backend.get_milestones()
if available_milestones:
click.echo(f"\nAvailable milestones: {', '.join(m.title for m in available_milestones)}")
milestone = click.prompt('Milestone (optional)', default='') or None
# Build labels list
labels = []
# Add explicit labels
for label_name in label:
label_name = label_name.strip()
if label_name:
labels.append(Label(name=label_name))
# Add priority label
if priority:
labels.append(Label(name=f'priority:{priority}'))
# Add type label
if issue_type:
labels.append(Label(name=issue_type))
# Build assignees list
assignees = []
if assignee:
# Try to find user
users = backend.search_users(assignee)
if users:
assignees.append(users[0])
else:
# Create a basic user object
assignees.append(User(id=assignee, username=assignee))
# Find milestone
milestone_obj = None
if milestone:
milestones = backend.get_milestones()
for m in milestones:
if m.title == milestone or m.id == milestone:
milestone_obj = m
break
# Create issue
now = datetime.now(timezone.utc)
issue = Issue(
id="", # Will be set by backend
number=0, # Will be set by backend
title=title,
description=description or "",
state=IssueState.OPEN,
created_at=now,
updated_at=now,
labels=labels,
assignees=assignees,
milestone=milestone_obj
)
created_issue = backend.create_issue(issue)
click.echo(f"Created issue #{created_issue.number}: {created_issue.title}")
if ctx.obj.get('verbose'):
click.echo(format_issue(created_issue))
except Exception as e:
raise click.ClickException(f"Failed to create issue: {e}")
@issue_group.command('edit')
@click.argument('issue_number', type=int)
@click.option('--title', help='New title')
@click.option('--description', help='New description')
@click.option('--add-label', multiple=True, help='Labels to add')
@click.option('--remove-label', multiple=True, help='Labels to remove')
@click.option('--assign', help='User to assign')
@click.option('--unassign', help='User to unassign')
@click.option('--milestone', help='Milestone to set')
@click.option('--interactive', '-i', is_flag=True, help='Interactive editing')
@click.pass_context
def edit_issue(ctx, issue_number, title, description, add_label, remove_label, assign, unassign, milestone, interactive):
"""Edit an existing issue."""
backend = get_backend(ctx)
try:
issue = backend.get_issue_by_number(issue_number)
if not issue:
raise click.ClickException(f"Issue #{issue_number} not found")
# Interactive mode
if interactive:
click.echo(f"Editing issue #{issue.number}: {issue.title}")
new_title = click.prompt('Title', default=issue.title)
if new_title != issue.title:
title = new_title
new_description = get_user_input('Description', default=issue.description, multiline=True)
if new_description != issue.description:
description = new_description
# Apply changes
modified = False
if title:
issue.title = title
modified = True
if description is not None:
issue.description = description
modified = True
# Add labels
for label_name in add_label:
issue.add_label(Label(name=label_name.strip()))
modified = True
# Remove labels
for label_name in remove_label:
if issue.remove_label(label_name.strip()):
modified = True
# Assign user
if assign:
users = backend.search_users(assign)
if users:
issue.add_assignee(users[0])
modified = True
else:
issue.add_assignee(User(id=assign, username=assign))
modified = True
# Unassign user
if unassign:
if issue.remove_assignee(unassign):
modified = True
# Set milestone
if milestone:
milestones = backend.get_milestones()
for m in milestones:
if m.title == milestone or m.id == milestone:
issue.milestone = m
modified = True
break
if modified:
issue.updated_at = datetime.now(timezone.utc)
updated_issue = backend.update_issue(issue)
click.echo(f"Updated issue #{updated_issue.number}")
if ctx.obj.get('verbose'):
click.echo(format_issue(updated_issue))
else:
click.echo("No changes made")
except Exception as e:
raise click.ClickException(f"Failed to edit issue: {e}")
@issue_group.command('close')
@click.argument('issue_number', type=int)
@click.option('--comment', '-c', help='Closing comment')
@click.pass_context
def close_issue(ctx, issue_number, comment):
"""Close an issue."""
backend = get_backend(ctx)
try:
issue = backend.get_issue_by_number(issue_number)
if not issue:
raise click.ClickException(f"Issue #{issue_number} not found")
if issue.state == IssueState.CLOSED:
click.echo(f"Issue #{issue_number} is already closed")
return
# Close the issue
issue.close()
# Add closing comment if provided
if comment:
from ..core.models import Comment
current_user = User(id="cli-user", username="cli-user") # TODO: Get actual user
closing_comment = Comment(
id="",
body=comment,
author=current_user,
created_at=datetime.now(timezone.utc)
)
backend.add_comment(issue.id, closing_comment)
updated_issue = backend.update_issue(issue)
click.echo(f"Closed issue #{updated_issue.number}: {updated_issue.title}")
except Exception as e:
raise click.ClickException(f"Failed to close issue: {e}")
@issue_group.command('reopen')
@click.argument('issue_number', type=int)
@click.option('--comment', '-c', help='Reopening comment')
@click.pass_context
def reopen_issue(ctx, issue_number, comment):
"""Reopen a closed issue."""
backend = get_backend(ctx)
try:
issue = backend.get_issue_by_number(issue_number)
if not issue:
raise click.ClickException(f"Issue #{issue_number} not found")
if issue.state != IssueState.CLOSED:
click.echo(f"Issue #{issue_number} is not closed (current state: {issue.state.value})")
return
# Reopen the issue
issue.reopen()
# Add reopening comment if provided
if comment:
from ..core.models import Comment
current_user = User(id="cli-user", username="cli-user") # TODO: Get actual user
reopening_comment = Comment(
id="",
body=comment,
author=current_user,
created_at=datetime.now(timezone.utc)
)
backend.add_comment(issue.id, reopening_comment)
updated_issue = backend.update_issue(issue)
click.echo(f"Reopened issue #{updated_issue.number}: {updated_issue.title}")
except Exception as e:
raise click.ClickException(f"Failed to reopen issue: {e}")
@issue_group.command('comment')
@click.argument('issue_number', type=int)
@click.argument('comment_text', required=False)
@click.option('--editor', is_flag=True, help='Open editor for comment')
@click.pass_context
def add_comment(ctx, issue_number, comment_text, editor):
"""Add a comment to an issue."""
backend = get_backend(ctx)
try:
issue = backend.get_issue_by_number(issue_number)
if not issue:
raise click.ClickException(f"Issue #{issue_number} not found")
# Get comment text
if editor:
comment_text = click.edit() or ""
elif not comment_text:
comment_text = get_user_input("Comment", multiline=True)
if not comment_text.strip():
click.echo("Empty comment, aborting")
return
# Create comment
from ..core.models import Comment
current_user = User(id="cli-user", username="cli-user") # TODO: Get actual user
comment = Comment(
id="",
body=comment_text.strip(),
author=current_user,
created_at=datetime.now(timezone.utc)
)
added_comment = backend.add_comment(issue.id, comment)
click.echo(f"Added comment to issue #{issue_number}")
if ctx.obj.get('verbose'):
click.echo(f"\nComment by {added_comment.author.username} at {added_comment.created_at}:")
click.echo(added_comment.body)
except Exception as e:
raise click.ClickException(f"Failed to add comment: {e}")

117
cli/main.py Normal file
View File

@@ -0,0 +1,117 @@
"""
Main CLI Entry Point
Universal Issue Tracking System CLI
"""
import click
import sys
from pathlib import Path
from .commands import issue_group
from .backend_commands import backend_group
from .sync_commands import sync_group
@click.group()
@click.version_option()
@click.option('--config', type=click.Path(), help='Configuration file path')
@click.option('--backend', help='Backend to use (local, gitea)')
@click.option('--verbose', '-v', is_flag=True, help='Verbose output')
@click.pass_context
def cli(ctx, config, backend, verbose):
"""
Universal Issue Tracking System
A backend-agnostic issue tracking tool that works with local SQLite,
Gitea, GitHub, and other issue tracking systems.
Examples:
issue list # List all issues
issue create "Bug in parser" # Create new issue
issue show 42 # Show issue #42
issue close 42 # Close issue #42
backend add local ~/.issues # Add local backend
backend add gitea myrepo # Add Gitea backend
sync pull gitea # Sync from Gitea
sync push gitea # Sync to Gitea
"""
# Ensure the object exists
ctx.ensure_object(dict)
# Store global options in context
ctx.obj['config_path'] = config
ctx.obj['backend'] = backend
ctx.obj['verbose'] = verbose
# Register command groups
cli.add_command(issue_group, name='issue')
cli.add_command(backend_group, name='backend')
cli.add_command(sync_group, name='sync')
# Convenience aliases - direct issue commands
@cli.command('list')
@click.pass_context
def list_issues(ctx):
"""List all issues (alias for 'issue list')."""
ctx.invoke(issue_group.get_command(ctx, 'list'))
@cli.command('show')
@click.argument('issue_number', type=int)
@click.pass_context
def show_issue(ctx, issue_number):
"""Show issue details (alias for 'issue show')."""
ctx.invoke(issue_group.get_command(ctx, 'show'), issue_number=issue_number)
@cli.command('create')
@click.argument('title')
@click.option('--description', '-d', help='Issue description')
@click.option('--label', '-l', multiple=True, help='Labels to add')
@click.option('--assignee', '-a', help='Assign to user')
@click.option('--milestone', '-m', help='Milestone')
@click.pass_context
def create_issue(ctx, title, description, label, assignee, milestone):
"""Create new issue (alias for 'issue create')."""
ctx.invoke(
issue_group.get_command(ctx, 'create'),
title=title,
description=description,
label=label,
assignee=assignee,
milestone=milestone
)
@cli.command('close')
@click.argument('issue_number', type=int)
@click.option('--comment', '-c', help='Closing comment')
@click.pass_context
def close_issue(ctx, issue_number, comment):
"""Close issue (alias for 'issue close')."""
ctx.invoke(
issue_group.get_command(ctx, 'close'),
issue_number=issue_number,
comment=comment
)
def main():
"""Main entry point for the CLI."""
try:
cli(obj={})
except KeyboardInterrupt:
click.echo("\nAborted by user", err=True)
sys.exit(1)
except Exception as e:
click.echo(f"Error: {e}", err=True)
sys.exit(1)
if __name__ == '__main__':
main()

235
cli/sync_commands.py Normal file
View File

@@ -0,0 +1,235 @@
"""
Synchronization CLI Commands
Commands for synchronizing issues between different backends.
"""
import click
from datetime import datetime, timezone
from .utils import (
load_backend_configs, get_backend, echo_success, echo_error,
echo_warning, echo_info, progress_bar, confirm_action
)
from ..core.interfaces import BackendFactory
@click.group()
def sync_group():
"""Issue synchronization between backends."""
pass
@sync_group.command('status')
@click.argument('backend_name', required=False)
@click.pass_context
def sync_status(ctx, backend_name):
"""Show sync status for backends."""
configs = load_backend_configs()
if backend_name:
backends_to_check = [backend_name] if backend_name in configs else []
if not backends_to_check:
echo_error(f"Backend '{backend_name}' not found")
return
else:
backends_to_check = [k for k in configs.keys() if k != 'default']
for name in backends_to_check:
config = configs[name]
try:
backend = BackendFactory.create_backend(config['type'])
backend.connect(config)
# Get basic stats
all_issues = backend.list_issues()
open_issues = [i for i in all_issues if i.state.value != 'closed']
click.echo(f"\n{name} ({config['type']}):")
click.echo(f" Total issues: {len(all_issues)}")
click.echo(f" Open issues: {len(open_issues)}")
click.echo(f" Closed issues: {len(all_issues) - len(open_issues)}")
# Check last sync
if hasattr(backend, 'get_last_sync_timestamp'):
last_sync = backend.get_last_sync_timestamp()
if last_sync:
click.echo(f" Last sync: {last_sync}")
else:
click.echo(f" Last sync: never")
backend.disconnect()
except Exception as e:
echo_error(f" Error accessing {name}: {e}")
@sync_group.command('pull')
@click.argument('source_backend')
@click.option('--target', default='local', help='Target backend (default: local)')
@click.option('--dry-run', is_flag=True, help='Show what would be synced without making changes')
@click.option('--force', is_flag=True, help='Force sync even with conflicts')
@click.pass_context
def sync_pull(ctx, source_backend, target, dry_run, force):
"""Pull issues from source backend to target backend."""
configs = load_backend_configs()
if source_backend not in configs:
echo_error(f"Source backend '{source_backend}' not found")
return
if target not in configs:
echo_error(f"Target backend '{target}' not found")
return
if source_backend == target:
echo_error("Source and target backends cannot be the same")
return
try:
# Connect to backends
source_config = configs[source_backend]
target_config = configs[target]
source = BackendFactory.create_backend(source_config['type'])
source.connect(source_config)
target = BackendFactory.create_backend(target_config['type'])
target.connect(target_config)
echo_info(f"Syncing from {source_backend} to {target}")
# Get issues from source
source_issues = source.list_issues()
echo_info(f"Found {len(source_issues)} issues in source")
# Get existing issues in target
target_issues = target.list_issues()
target_numbers = {issue.number for issue in target_issues}
# Determine what needs to be synced
new_issues = []
updated_issues = []
for issue in source_issues:
if issue.number not in target_numbers:
new_issues.append(issue)
else:
# Check if update is needed (simplified check by updated_at)
target_issue = next((i for i in target_issues if i.number == issue.number), None)
if target_issue and issue.updated_at > target_issue.updated_at:
updated_issues.append(issue)
echo_info(f"New issues to sync: {len(new_issues)}")
echo_info(f"Updated issues to sync: {len(updated_issues)}")
if dry_run:
if new_issues:
click.echo("\nNew issues:")
for issue in new_issues:
click.echo(f" #{issue.number}: {issue.title}")
if updated_issues:
click.echo("\nUpdated issues:")
for issue in updated_issues:
click.echo(f" #{issue.number}: {issue.title}")
click.echo(f"\nDry run complete. {len(new_issues + updated_issues)} issues would be synced.")
return
# Confirm sync
total_sync = len(new_issues) + len(updated_issues)
if total_sync == 0:
echo_success("No issues need syncing")
return
if not force and not confirm_action(f"Sync {total_sync} issues?"):
click.echo("Aborted")
return
# Perform sync
synced_count = 0
errors = []
all_to_sync = new_issues + updated_issues
with progress_bar(all_to_sync, label="Syncing issues") as items:
for issue in items:
try:
# Clear backend-specific IDs for new backend
issue.backend_id = None
issue.backend_type = target_config['type']
if issue in new_issues:
target.create_issue(issue)
else:
# For updates, we need to find the target issue and update it
target_issue = target.get_issue_by_number(issue.number)
if target_issue:
# Copy relevant fields
target_issue.title = issue.title
target_issue.description = issue.description
target_issue.state = issue.state
target_issue.labels = issue.labels
target_issue.assignees = issue.assignees
target_issue.milestone = issue.milestone
target_issue.updated_at = issue.updated_at
target_issue.closed_at = issue.closed_at
target.update_issue(target_issue)
synced_count += 1
except Exception as e:
errors.append(f"Issue #{issue.number}: {e}")
# Report results
echo_success(f"Synced {synced_count} issues successfully")
if errors:
echo_warning(f"{len(errors)} errors occurred:")
for error in errors[:5]: # Show first 5 errors
echo_error(f" {error}")
if len(errors) > 5:
echo_warning(f" ... and {len(errors) - 5} more errors")
# Cleanup
source.disconnect()
target.disconnect()
except Exception as e:
echo_error(f"Sync failed: {e}")
@sync_group.command('push')
@click.argument('target_backend')
@click.option('--source', default='local', help='Source backend (default: local)')
@click.option('--dry-run', is_flag=True, help='Show what would be synced without making changes')
@click.option('--force', is_flag=True, help='Force sync even with conflicts')
@click.pass_context
def sync_push(ctx, target_backend, source, dry_run, force):
"""Push issues from source backend to target backend."""
# This is essentially the same as pull but with arguments swapped
ctx.invoke(sync_pull, source_backend=source, target=target_backend, dry_run=dry_run, force=force)
@sync_group.command('bidirectional')
@click.argument('backend1')
@click.argument('backend2')
@click.option('--dry-run', is_flag=True, help='Show what would be synced without making changes')
@click.option('--force', is_flag=True, help='Force sync even with conflicts')
@click.pass_context
def sync_bidirectional(ctx, backend1, backend2, dry_run, force):
"""Bidirectional sync between two backends."""
echo_warning("Bidirectional sync is a complex operation that can cause conflicts.")
if not force and not confirm_action("Continue with bidirectional sync?"):
click.echo("Aborted")
return
# First sync backend1 -> backend2
echo_info(f"Step 1: Syncing {backend1} -> {backend2}")
ctx.invoke(sync_pull, source_backend=backend1, target=backend2, dry_run=dry_run, force=True)
# Then sync backend2 -> backend1
echo_info(f"Step 2: Syncing {backend2} -> {backend1}")
ctx.invoke(sync_pull, source_backend=backend2, target=backend1, dry_run=dry_run, force=True)
echo_success("Bidirectional sync completed")

336
cli/utils.py Normal file
View File

@@ -0,0 +1,336 @@
"""
CLI Utility Functions
Helper functions for the CLI commands including formatting, configuration,
backend management, and user interaction.
"""
import click
import os
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional, List
import tempfile
from ..core.interfaces import IssueBackend, BackendFactory
from ..core.models import Issue, Comment
from ..backends.local import LocalSQLiteBackend
from ..backends.gitea import GiteaBackend
# Register available backends
BackendFactory.register_backend('local', LocalSQLiteBackend)
BackendFactory.register_backend('gitea', GiteaBackend)
def get_config_dir() -> Path:
"""Get configuration directory."""
config_dir = Path.home() / '.config' / 'issue-tracker'
config_dir.mkdir(parents=True, exist_ok=True)
return config_dir
def get_backend_config_path() -> Path:
"""Get backend configuration file path."""
return get_config_dir() / 'backends.json'
def load_backend_configs() -> dict:
"""Load backend configurations."""
config_path = get_backend_config_path()
if not config_path.exists():
return {}
import json
try:
with open(config_path, 'r') as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
return {}
def save_backend_configs(configs: dict) -> None:
"""Save backend configurations."""
config_path = get_backend_config_path()
import json
with open(config_path, 'w') as f:
json.dump(configs, f, indent=2)
def get_default_backend() -> str:
"""Get the default backend name."""
configs = load_backend_configs()
return configs.get('default', 'local')
def get_backend(ctx) -> IssueBackend:
"""Get backend instance from context."""
backend_name = ctx.obj.get('backend') or get_default_backend()
configs = load_backend_configs()
if backend_name not in configs:
if backend_name == 'local':
# Auto-configure local backend
local_config = {
'type': 'local',
'db_path': str(get_config_dir() / 'issues.db')
}
configs['local'] = local_config
save_backend_configs(configs)
else:
raise click.ClickException(f"Backend '{backend_name}' not configured. Use 'backend add' to configure it.")
backend_config = configs[backend_name]
backend_type = backend_config['type']
try:
backend = BackendFactory.create_backend(backend_type)
backend.connect(backend_config)
return backend
except Exception as e:
raise click.ClickException(f"Failed to connect to backend '{backend_name}': {e}")
def format_issue_list(issues: List[Issue]) -> str:
"""Format list of issues as a table."""
if not issues:
return "No issues found."
# Calculate column widths
max_title_width = min(50, max(len(issue.title) for issue in issues))
max_assignee_width = 15
# Header
lines = []
header = f"{'#':<6} {'State':<12} {'Title':<{max_title_width}} {'Assignee':<{max_assignee_width}} Labels"
lines.append(header)
lines.append("-" * len(header))
# Issues
for issue in issues:
title = issue.title[:max_title_width]
if len(issue.title) > max_title_width:
title = title[:-3] + "..."
assignee = issue.primary_assignee.username if issue.primary_assignee else "unassigned"
assignee = assignee[:max_assignee_width]
labels = ", ".join(label.name for label in issue.labels[:3])
if len(issue.labels) > 3:
labels += f" (+{len(issue.labels) - 3})"
line = f"{issue.number:<6} {issue.state.value:<12} {title:<{max_title_width}} {assignee:<{max_assignee_width}} {labels}"
lines.append(line)
return "\n".join(lines)
def format_issue(issue: Issue, show_comments: bool = False, backend: Optional[IssueBackend] = None) -> str:
"""Format a single issue with details."""
lines = []
# Header
lines.append(f"#{issue.number}: {issue.title}")
lines.append("=" * (len(f"#{issue.number}: {issue.title}")))
lines.append("")
# Basic info
lines.append(f"State: {issue.state.value}")
lines.append(f"Created: {format_datetime(issue.created_at)}")
lines.append(f"Updated: {format_datetime(issue.updated_at)}")
if issue.closed_at:
lines.append(f"Closed: {format_datetime(issue.closed_at)}")
# Assignees
if issue.assignees:
assignees_str = ", ".join(assignee.username for assignee in issue.assignees)
lines.append(f"Assignees: {assignees_str}")
else:
lines.append("Assignees: none")
# Milestone
if issue.milestone:
lines.append(f"Milestone: {issue.milestone.title}")
# Labels
if issue.labels:
labels_by_category = {}
for label in issue.labels:
category = label.category
if category not in labels_by_category:
labels_by_category[category] = []
labels_by_category[category].append(label.name)
for category, label_names in labels_by_category.items():
lines.append(f"{category.title()} labels: {', '.join(label_names)}")
else:
lines.append("Labels: none")
# Description
lines.append("")
lines.append("Description:")
lines.append("-" * 12)
if issue.description:
lines.append(issue.description)
else:
lines.append("(no description)")
# Comments
if show_comments and backend:
comments = backend.get_comments(issue.id)
if comments:
lines.append("")
lines.append(f"Comments ({len(comments)}):")
lines.append("-" * 20)
for comment in comments:
lines.append("")
lines.append(f"Comment by {comment.author.username} at {format_datetime(comment.created_at)}:")
lines.append(comment.body)
return "\n".join(lines)
def format_datetime(dt: datetime) -> str:
"""Format datetime for display."""
if dt.tzinfo:
dt = dt.astimezone()
return dt.strftime("%Y-%m-%d %H:%M:%S")
def get_user_input(prompt: str, default: str = "", multiline: bool = False) -> str:
"""Get user input with optional default and multiline support."""
if multiline:
click.echo(f"{prompt} (Press Ctrl+D when done, Ctrl+C to cancel):")
if default:
click.echo(f"Current value:\n{default}\n")
lines = []
try:
while True:
try:
line = input()
lines.append(line)
except EOFError:
break
except KeyboardInterrupt:
raise click.Abort()
return "\n".join(lines) if lines else default
else:
return click.prompt(prompt, default=default)
def validate_backend_type(backend_type: str) -> bool:
"""Validate that a backend type is supported."""
return backend_type in BackendFactory.get_available_backends()
def test_backend_connection(backend_config: dict) -> bool:
"""Test if a backend configuration works."""
try:
backend_type = backend_config['type']
backend = BackendFactory.create_backend(backend_type)
backend.connect(backend_config)
result = backend.test_connection()
backend.disconnect()
return result
except Exception:
return False
def format_backend_list(configs: dict) -> str:
"""Format backend configurations for display."""
if not configs:
return "No backends configured."
lines = []
default_backend = configs.get('default', 'local')
lines.append(f"{'Name':<15} {'Type':<10} {'Status':<10} Description")
lines.append("-" * 60)
for name, config in configs.items():
if name == 'default':
continue
backend_type = config.get('type', 'unknown')
is_default = name == default_backend
# Test connection
try:
status = "connected" if test_backend_connection(config) else "error"
except Exception:
status = "error"
# Description
if backend_type == 'local':
desc = f"Local SQLite: {config.get('db_path', 'unknown')}"
elif backend_type == 'gitea':
desc = f"Gitea: {config.get('base_url', 'unknown')}/{config.get('owner', 'unknown')}/{config.get('repo', 'unknown')}"
else:
desc = f"{backend_type} backend"
if is_default:
desc += " (default)"
line = f"{name:<15} {backend_type:<10} {status:<10} {desc}"
lines.append(line)
return "\n".join(lines)
def get_editor() -> str:
"""Get the user's preferred editor."""
return os.environ.get('EDITOR', 'nano')
def edit_text(initial_text: str = "") -> Optional[str]:
"""Open text in editor and return edited content."""
with tempfile.NamedTemporaryFile(mode='w+', suffix='.md', delete=False) as f:
f.write(initial_text)
temp_path = f.name
try:
editor = get_editor()
os.system(f'{editor} {temp_path}')
with open(temp_path, 'r') as f:
edited_text = f.read()
return edited_text if edited_text != initial_text else None
finally:
os.unlink(temp_path)
def confirm_action(message: str, default: bool = False) -> bool:
"""Ask user for confirmation."""
return click.confirm(message, default=default)
def progress_bar(items, label: str = "Processing"):
"""Create a progress bar for iterating over items."""
return click.progressbar(items, label=label)
def echo_success(message: str) -> None:
"""Echo success message in green."""
click.echo(click.style(message, fg='green'))
def echo_warning(message: str) -> None:
"""Echo warning message in yellow."""
click.echo(click.style(message, fg='yellow'))
def echo_error(message: str) -> None:
"""Echo error message in red."""
click.echo(click.style(message, fg='red'))
def echo_info(message: str) -> None:
"""Echo info message in blue."""
click.echo(click.style(message, fg='blue'))

407
core/interfaces.py Normal file
View File

@@ -0,0 +1,407 @@
"""
Backend Plugin Interfaces
Defines the contracts that all issue tracking backend plugins must implement.
This enables a clean plugin architecture where new backends can be added
without modifying core code.
"""
from abc import ABC, abstractmethod
from typing import List, Optional, Dict, Any, Iterator
from datetime import datetime
from .models import Issue, Label, User, Milestone, Comment
class IssueFilter:
"""Filter criteria for issue queries."""
def __init__(
self,
state: Optional[str] = None,
assignee: Optional[str] = None,
labels: Optional[List[str]] = None,
milestone: Optional[str] = None,
created_after: Optional[datetime] = None,
created_before: Optional[datetime] = None,
updated_after: Optional[datetime] = None,
updated_before: Optional[datetime] = None,
search: Optional[str] = None,
limit: Optional[int] = None,
offset: Optional[int] = 0
):
self.state = state
self.assignee = assignee
self.labels = labels or []
self.milestone = milestone
self.created_after = created_after
self.created_before = created_before
self.updated_after = updated_after
self.updated_before = updated_before
self.search = search
self.limit = limit
self.offset = offset
class BackendCapabilities:
"""Describes what features a backend supports."""
def __init__(
self,
supports_milestones: bool = True,
supports_assignees: bool = True,
supports_comments: bool = True,
supports_labels: bool = True,
supports_search: bool = True,
supports_bulk_operations: bool = False,
supports_webhooks: bool = False,
supports_real_time: bool = False,
max_labels_per_issue: Optional[int] = None,
max_assignees_per_issue: Optional[int] = None
):
self.supports_milestones = supports_milestones
self.supports_assignees = supports_assignees
self.supports_comments = supports_comments
self.supports_labels = supports_labels
self.supports_search = supports_search
self.supports_bulk_operations = supports_bulk_operations
self.supports_webhooks = supports_webhooks
self.supports_real_time = supports_real_time
self.max_labels_per_issue = max_labels_per_issue
self.max_assignees_per_issue = max_assignees_per_issue
class IssueBackend(ABC):
"""
Abstract base class for all issue tracking backends.
Each backend plugin must implement this interface to provide
issue tracking functionality for a specific system.
"""
@property
@abstractmethod
def backend_type(self) -> str:
"""Return the backend type identifier (e.g., 'local', 'gitea', 'github')."""
pass
@property
@abstractmethod
def capabilities(self) -> BackendCapabilities:
"""Return the capabilities supported by this backend."""
pass
@abstractmethod
def connect(self, config: Dict[str, Any]) -> None:
"""
Connect to the backend using provided configuration.
Args:
config: Backend-specific configuration (URLs, tokens, etc.)
"""
pass
@abstractmethod
def disconnect(self) -> None:
"""Disconnect from the backend and clean up resources."""
pass
@abstractmethod
def test_connection(self) -> bool:
"""Test if the backend connection is working."""
pass
# Issue CRUD Operations
@abstractmethod
def create_issue(self, issue: Issue) -> Issue:
"""
Create a new issue in the backend.
Args:
issue: Issue to create (id may be None for new issues)
Returns:
Created issue with backend_id populated
"""
pass
@abstractmethod
def get_issue(self, issue_id: str) -> Optional[Issue]:
"""
Retrieve an issue by its backend ID.
Args:
issue_id: Backend-specific issue ID
Returns:
Issue if found, None otherwise
"""
pass
@abstractmethod
def get_issue_by_number(self, number: int) -> Optional[Issue]:
"""
Retrieve an issue by its human-readable number.
Args:
number: Issue number
Returns:
Issue if found, None otherwise
"""
pass
@abstractmethod
def update_issue(self, issue: Issue) -> Issue:
"""
Update an existing issue in the backend.
Args:
issue: Issue with modifications
Returns:
Updated issue
"""
pass
@abstractmethod
def delete_issue(self, issue_id: str) -> bool:
"""
Delete an issue from the backend.
Args:
issue_id: Backend-specific issue ID
Returns:
True if deleted successfully
"""
pass
@abstractmethod
def list_issues(self, filter_criteria: Optional[IssueFilter] = None) -> List[Issue]:
"""
List issues matching filter criteria.
Args:
filter_criteria: Optional filter to apply
Returns:
List of matching issues
"""
pass
@abstractmethod
def search_issues(self, query: str, limit: Optional[int] = None) -> List[Issue]:
"""
Search issues using backend-specific query syntax.
Args:
query: Search query
limit: Maximum number of results
Returns:
List of matching issues
"""
pass
# Label Operations
@abstractmethod
def create_label(self, label: Label) -> Label:
"""Create a new label."""
pass
@abstractmethod
def get_labels(self) -> List[Label]:
"""Get all available labels."""
pass
@abstractmethod
def update_label(self, label: Label) -> Label:
"""Update an existing label."""
pass
@abstractmethod
def delete_label(self, label_name: str) -> bool:
"""Delete a label."""
pass
# User Operations
@abstractmethod
def get_users(self) -> List[User]:
"""Get all available users."""
pass
@abstractmethod
def get_user(self, user_id: str) -> Optional[User]:
"""Get a specific user by ID."""
pass
@abstractmethod
def search_users(self, query: str) -> List[User]:
"""Search for users."""
pass
# Milestone Operations
@abstractmethod
def create_milestone(self, milestone: Milestone) -> Milestone:
"""Create a new milestone."""
pass
@abstractmethod
def get_milestones(self) -> List[Milestone]:
"""Get all milestones."""
pass
@abstractmethod
def update_milestone(self, milestone: Milestone) -> Milestone:
"""Update a milestone."""
pass
@abstractmethod
def delete_milestone(self, milestone_id: str) -> bool:
"""Delete a milestone."""
pass
# Comment Operations
@abstractmethod
def add_comment(self, issue_id: str, comment: Comment) -> Comment:
"""Add a comment to an issue."""
pass
@abstractmethod
def get_comments(self, issue_id: str) -> List[Comment]:
"""Get all comments for an issue."""
pass
@abstractmethod
def update_comment(self, comment: Comment) -> Comment:
"""Update a comment."""
pass
@abstractmethod
def delete_comment(self, comment_id: str) -> bool:
"""Delete a comment."""
pass
# Bulk Operations (optional, depends on capabilities)
def bulk_update_issues(self, updates: List[Dict[str, Any]]) -> List[Issue]:
"""
Bulk update multiple issues.
Args:
updates: List of update operations
Returns:
List of updated issues
"""
if not self.capabilities.supports_bulk_operations:
raise NotImplementedError(f"{self.backend_type} backend does not support bulk operations")
# Default implementation: update one by one
results = []
for update in updates:
issue_id = update['id']
issue = self.get_issue(issue_id)
if issue:
# Apply updates to issue
for key, value in update.items():
if key != 'id' and hasattr(issue, key):
setattr(issue, key, value)
updated_issue = self.update_issue(issue)
results.append(updated_issue)
return results
# Sync Support
def get_last_sync_timestamp(self) -> Optional[datetime]:
"""
Get the timestamp of the last successful sync.
Used for incremental synchronization.
"""
return None
def get_issues_modified_since(self, timestamp: datetime) -> List[Issue]:
"""
Get issues modified since a specific timestamp.
Used for incremental synchronization.
"""
filter_criteria = IssueFilter(updated_after=timestamp)
return self.list_issues(filter_criteria)
class LocalBackend(IssueBackend):
"""
Marker interface for local backends.
Local backends store data locally and can work offline.
They serve as the source of truth for synchronization.
"""
pass
class RemoteBackend(IssueBackend):
"""
Marker interface for remote backends.
Remote backends connect to external issue tracking systems.
They participate in bidirectional synchronization.
"""
pass
class SyncableBackend(ABC):
"""
Interface for backends that support synchronization.
Backends implementing this interface can participate in
bidirectional sync operations.
"""
@abstractmethod
def prepare_for_sync(self) -> None:
"""Prepare backend for sync operation (e.g., create backup)."""
pass
@abstractmethod
def finalize_sync(self, success: bool) -> None:
"""Finalize sync operation (e.g., commit or rollback)."""
pass
@abstractmethod
def get_sync_conflicts(self) -> List[Dict[str, Any]]:
"""Get issues that have sync conflicts."""
pass
@abstractmethod
def resolve_sync_conflict(self, issue_id: str, resolution: str) -> Issue:
"""
Resolve a sync conflict.
Args:
issue_id: Issue with conflict
resolution: 'local' or 'remote' or 'merge'
"""
pass
class BackendFactory:
"""Factory for creating backend instances."""
_backends: Dict[str, type] = {}
@classmethod
def register_backend(cls, backend_type: str, backend_class: type) -> None:
"""Register a backend implementation."""
cls._backends[backend_type] = backend_class
@classmethod
def create_backend(cls, backend_type: str) -> IssueBackend:
"""Create a backend instance."""
if backend_type not in cls._backends:
raise ValueError(f"Unknown backend type: {backend_type}")
return cls._backends[backend_type]()
@classmethod
def get_available_backends(cls) -> List[str]:
"""Get list of available backend types."""
return list(cls._backends.keys())

341
core/models.py Normal file
View File

@@ -0,0 +1,341 @@
"""
Core Issue Domain Models
Unified, backend-agnostic issue models that serve as the single source of truth
for all issue tracking operations. These models combine the best features from
various issue tracking systems while maintaining clean domain logic.
"""
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from typing import List, Optional, Dict, Any
from functools import cached_property
class IssueState(Enum):
"""Universal issue state enumeration with backend mapping support."""
OPEN = "open"
CLOSED = "closed"
IN_PROGRESS = "in_progress"
BLOCKED = "blocked"
@classmethod
def from_string(cls, state_str: str) -> 'IssueState':
"""Convert string to IssueState, with fallback handling."""
state_map = {
'open': cls.OPEN,
'closed': cls.CLOSED,
'in_progress': cls.IN_PROGRESS,
'in-progress': cls.IN_PROGRESS,
'progress': cls.IN_PROGRESS,
'blocked': cls.BLOCKED,
}
return state_map.get(state_str.lower(), cls.OPEN)
def to_backend_string(self, backend_type: str) -> str:
"""Convert to backend-specific string representation."""
if backend_type == 'gitea':
return 'open' if self in [self.OPEN, self.IN_PROGRESS, self.BLOCKED] else 'closed'
elif backend_type == 'github':
return self.value
else:
return self.value
class Priority(Enum):
"""Universal priority levels."""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@classmethod
def from_label(cls, label_name: str) -> Optional['Priority']:
"""Extract priority from label name."""
if label_name.startswith('priority:'):
priority_str = label_name.replace('priority:', '')
try:
return cls(priority_str)
except ValueError:
return None
return None
class IssueType(Enum):
"""Universal issue types."""
BUG = "bug"
FEATURE = "feature"
ENHANCEMENT = "enhancement"
TASK = "task"
DOCUMENTATION = "documentation"
QUESTION = "question"
@classmethod
def from_label(cls, label_name: str) -> Optional['IssueType']:
"""Extract type from label name."""
try:
return cls(label_name.lower())
except ValueError:
return None
@dataclass(frozen=True)
class Label:
"""Universal label model with backend mapping support."""
name: str
color: Optional[str] = None
description: Optional[str] = None
backend_id: Optional[str] = None # Backend-specific ID for sync
@cached_property
def category(self) -> str:
"""Categorize label for efficient filtering."""
if self.name.startswith('priority:'):
return 'priority'
elif self.name.startswith('status:'):
return 'status'
elif self.name.startswith('type:'):
return 'type'
elif self.name in ['bug', 'feature', 'enhancement', 'task', 'documentation', 'question']:
return 'type'
else:
return 'other'
@cached_property
def priority(self) -> Optional[Priority]:
"""Extract priority if this is a priority label."""
return Priority.from_label(self.name)
@cached_property
def issue_type(self) -> Optional[IssueType]:
"""Extract issue type if this is a type label."""
return IssueType.from_label(self.name)
@dataclass(frozen=True)
class LabelCategories:
"""Categorized labels for efficient access."""
priority_labels: List[Label]
type_labels: List[Label]
status_labels: List[Label]
other_labels: List[Label]
@cached_property
def priority(self) -> Optional[Priority]:
"""Get the issue priority."""
for label in self.priority_labels:
if label.priority:
return label.priority
return None
@cached_property
def issue_type(self) -> Optional[IssueType]:
"""Get the issue type."""
for label in self.type_labels:
if label.issue_type:
return label.issue_type
return None
@dataclass
class User:
"""Universal user model."""
id: str # String ID to handle different backend ID types
username: str
display_name: Optional[str] = None
email: Optional[str] = None
avatar_url: Optional[str] = None
backend_id: Optional[str] = None # Backend-specific ID for sync
@dataclass
class Milestone:
"""Universal milestone/project model."""
id: str
title: str
description: Optional[str] = None
state: str = "open" # open, closed
due_date: Optional[datetime] = None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
backend_id: Optional[str] = None
@dataclass
class Comment:
"""Universal comment model."""
id: str
body: str
author: User
created_at: datetime
updated_at: Optional[datetime] = None
backend_id: Optional[str] = None
@dataclass
class Issue:
"""
Universal Issue model - single source of truth.
Combines the best features from domain and API models while maintaining
clean separation between core data and backend-specific details.
"""
# Core Issue Data
id: str # Universal ID (UUID for local, external ID for remotes)
number: int # Human-readable number
title: str
description: str
state: IssueState
# Metadata
created_at: datetime
updated_at: datetime
closed_at: Optional[datetime] = None
# Relationships
labels: List[Label] = field(default_factory=list)
assignees: List[User] = field(default_factory=list)
milestone: Optional[Milestone] = None
comments: List[Comment] = field(default_factory=list)
# Backend Integration
backend_id: Optional[str] = None # Backend-specific ID
backend_type: Optional[str] = None # e.g., 'local', 'gitea', 'github'
sync_metadata: Dict[str, Any] = field(default_factory=dict)
# Performance Optimization
_label_categories: Optional[LabelCategories] = field(default=None, init=False)
@cached_property
def label_categories(self) -> LabelCategories:
"""Efficiently categorize labels with caching."""
if self._label_categories is None:
# Single-pass categorization for performance
priority_labels = []
type_labels = []
status_labels = []
other_labels = []
for label in self.labels:
if label.category == 'priority':
priority_labels.append(label)
elif label.category == 'type':
type_labels.append(label)
elif label.category == 'status':
status_labels.append(label)
else:
other_labels.append(label)
self._label_categories = LabelCategories(
priority_labels=priority_labels,
type_labels=type_labels,
status_labels=status_labels,
other_labels=other_labels
)
return self._label_categories
@property
def priority(self) -> Optional[Priority]:
"""Get issue priority from labels."""
return self.label_categories.priority
@property
def issue_type(self) -> Optional[IssueType]:
"""Get issue type from labels."""
return self.label_categories.issue_type
@property
def primary_assignee(self) -> Optional[User]:
"""Get primary assignee (first one)."""
return self.assignees[0] if self.assignees else None
def invalidate_cache(self) -> None:
"""Invalidate cached properties when labels change."""
if hasattr(self, '_label_categories'):
object.__setattr__(self, '_label_categories', None)
# Domain Logic Methods
def close(self, closed_at: Optional[datetime] = None) -> None:
"""Close the issue with business rule validation."""
if self.state == IssueState.CLOSED:
raise ValueError(f"Issue #{self.number} is already closed")
self.state = IssueState.CLOSED
self.closed_at = closed_at or datetime.now(timezone.utc)
self.updated_at = datetime.now(timezone.utc)
def reopen(self) -> None:
"""Reopen the issue with business rule validation."""
if self.state != IssueState.CLOSED:
raise ValueError(f"Issue #{self.number} is not closed (current state: {self.state.value})")
self.state = IssueState.OPEN
self.closed_at = None
self.updated_at = datetime.now(timezone.utc)
def add_label(self, label: Label) -> None:
"""Add a label to the issue."""
if label not in self.labels:
self.labels.append(label)
self.invalidate_cache()
self.updated_at = datetime.now(timezone.utc)
def remove_label(self, label_name: str) -> bool:
"""Remove a label by name. Returns True if removed."""
original_count = len(self.labels)
self.labels = [label for label in self.labels if label.name != label_name]
if len(self.labels) < original_count:
self.invalidate_cache()
self.updated_at = datetime.now(timezone.utc)
return True
return False
def has_label(self, label_name: str) -> bool:
"""Check if issue has a specific label."""
return any(label.name == label_name for label in self.labels)
def add_assignee(self, user: User) -> None:
"""Add an assignee to the issue."""
if user not in self.assignees:
self.assignees.append(user)
self.updated_at = datetime.now(timezone.utc)
def remove_assignee(self, user_id: str) -> bool:
"""Remove an assignee by ID. Returns True if removed."""
original_count = len(self.assignees)
self.assignees = [user for user in self.assignees if user.id != user_id]
if len(self.assignees) < original_count:
self.updated_at = datetime.now(timezone.utc)
return True
return False
def add_comment(self, comment: Comment) -> None:
"""Add a comment to the issue."""
self.comments.append(comment)
self.updated_at = datetime.now(timezone.utc)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
'id': self.id,
'number': self.number,
'title': self.title,
'description': self.description,
'state': self.state.value,
'created_at': self.created_at.isoformat(),
'updated_at': self.updated_at.isoformat(),
'closed_at': self.closed_at.isoformat() if self.closed_at else None,
'labels': [{'name': l.name, 'color': l.color, 'description': l.description} for l in self.labels],
'assignees': [{'id': u.id, 'username': u.username, 'display_name': u.display_name} for u in self.assignees],
'milestone': {'id': self.milestone.id, 'title': self.milestone.title} if self.milestone else None,
'backend_id': self.backend_id,
'backend_type': self.backend_type,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'Issue':
"""Create Issue from dictionary."""
# This would be implemented with proper parsing
# Simplified version for now
raise NotImplementedError("from_dict implementation needed")

454
core/repository.py Normal file
View File

@@ -0,0 +1,454 @@
"""
Repository Pattern Implementation
Provides a high-level repository interface that abstracts backend operations
and adds features like caching, transaction support, and business logic.
"""
from abc import ABC, abstractmethod
from typing import List, Optional, Dict, Any, Union
from datetime import datetime, timezone
import logging
from .interfaces import IssueBackend, IssueFilter
from .models import Issue, Label, User, Milestone, Comment, IssueState
logger = logging.getLogger(__name__)
class IssueRepository:
"""
High-level repository for issue operations.
Provides a clean interface for issue management with additional features
like caching, validation, and business rule enforcement.
"""
def __init__(self, backend: IssueBackend, enable_caching: bool = True):
self.backend = backend
self.enable_caching = enable_caching
self._cache: Dict[str, Any] = {}
self._cache_timeout = 300 # 5 minutes
def _cache_key(self, operation: str, *args) -> str:
"""Generate cache key for operation."""
return f"{operation}:{':'.join(str(arg) for arg in args)}"
def _get_from_cache(self, key: str) -> Optional[Any]:
"""Get value from cache if enabled and not expired."""
if not self.enable_caching or key not in self._cache:
return None
cached_item = self._cache[key]
if datetime.now(timezone.utc).timestamp() - cached_item['timestamp'] > self._cache_timeout:
del self._cache[key]
return None
return cached_item['value']
def _set_cache(self, key: str, value: Any) -> None:
"""Set value in cache if enabled."""
if self.enable_caching:
self._cache[key] = {
'value': value,
'timestamp': datetime.now(timezone.utc).timestamp()
}
def _invalidate_cache_pattern(self, pattern: str) -> None:
"""Invalidate cache entries matching pattern."""
if not self.enable_caching:
return
keys_to_remove = [key for key in self._cache.keys() if pattern in key]
for key in keys_to_remove:
del self._cache[key]
# Issue Operations
def create_issue(
self,
title: str,
description: str = "",
labels: Optional[List[str]] = None,
assignees: Optional[List[str]] = None,
milestone: Optional[str] = None,
issue_type: Optional[str] = None,
priority: Optional[str] = None
) -> Issue:
"""
Create a new issue with business rule validation.
Args:
title: Issue title (required)
description: Issue description
labels: List of label names
assignees: List of usernames to assign
milestone: Milestone title or ID
issue_type: Issue type (bug, feature, etc.)
priority: Priority level (low, medium, high, critical)
Returns:
Created issue
"""
# Validation
if not title.strip():
raise ValueError("Issue title cannot be empty")
# Build labels
issue_labels = []
if labels:
for label_name in labels:
issue_labels.append(Label(name=label_name.strip()))
# Add type and priority as labels
if issue_type:
issue_labels.append(Label(name=issue_type))
if priority:
issue_labels.append(Label(name=f'priority:{priority}'))
# Resolve assignees
issue_assignees = []
if assignees:
for username in assignees:
users = self.backend.search_users(username)
if users:
issue_assignees.append(users[0])
else:
# Create basic user if not found
issue_assignees.append(User(id=username, username=username))
# Resolve milestone
issue_milestone = None
if milestone:
milestones = self.backend.get_milestones()
for m in milestones:
if m.title == milestone or m.id == milestone:
issue_milestone = m
break
# Create issue
now = datetime.now(timezone.utc)
issue = Issue(
id="", # Will be set by backend
number=0, # Will be set by backend
title=title.strip(),
description=description.strip(),
state=IssueState.OPEN,
created_at=now,
updated_at=now,
labels=issue_labels,
assignees=issue_assignees,
milestone=issue_milestone
)
created_issue = self.backend.create_issue(issue)
# Invalidate relevant caches
self._invalidate_cache_pattern("list_issues")
self._invalidate_cache_pattern("search_issues")
logger.info(f"Created issue #{created_issue.number}: {created_issue.title}")
return created_issue
def get_issue(self, issue_id: Union[str, int]) -> Optional[Issue]:
"""Get issue by ID or number."""
if isinstance(issue_id, int):
return self.get_issue_by_number(issue_id)
cache_key = self._cache_key("get_issue", issue_id)
cached_issue = self._get_from_cache(cache_key)
if cached_issue:
return cached_issue
issue = self.backend.get_issue(str(issue_id))
if issue:
self._set_cache(cache_key, issue)
return issue
def get_issue_by_number(self, number: int) -> Optional[Issue]:
"""Get issue by number."""
cache_key = self._cache_key("get_issue_by_number", number)
cached_issue = self._get_from_cache(cache_key)
if cached_issue:
return cached_issue
issue = self.backend.get_issue_by_number(number)
if issue:
self._set_cache(cache_key, issue)
return issue
def update_issue(self, issue: Issue) -> Issue:
"""Update issue with validation."""
if not issue.title.strip():
raise ValueError("Issue title cannot be empty")
issue.updated_at = datetime.now(timezone.utc)
updated_issue = self.backend.update_issue(issue)
# Invalidate caches
self._invalidate_cache_pattern("get_issue")
self._invalidate_cache_pattern("list_issues")
self._invalidate_cache_pattern("search_issues")
logger.info(f"Updated issue #{updated_issue.number}: {updated_issue.title}")
return updated_issue
def close_issue(self, issue_id: Union[str, int], comment: Optional[str] = None) -> Issue:
"""Close issue with optional comment."""
issue = self.get_issue(issue_id)
if not issue:
raise ValueError(f"Issue {issue_id} not found")
if issue.state == IssueState.CLOSED:
raise ValueError(f"Issue #{issue.number} is already closed")
issue.close()
# Add closing comment if provided
if comment:
self.add_comment(issue.id, comment)
return self.update_issue(issue)
def reopen_issue(self, issue_id: Union[str, int], comment: Optional[str] = None) -> Issue:
"""Reopen issue with optional comment."""
issue = self.get_issue(issue_id)
if not issue:
raise ValueError(f"Issue {issue_id} not found")
if issue.state != IssueState.CLOSED:
raise ValueError(f"Issue #{issue.number} is not closed")
issue.reopen()
# Add reopening comment if provided
if comment:
self.add_comment(issue.id, comment)
return self.update_issue(issue)
def list_issues(
self,
state: Optional[str] = None,
assignee: Optional[str] = None,
labels: Optional[List[str]] = None,
milestone: Optional[str] = None,
search: Optional[str] = None,
limit: Optional[int] = None,
offset: Optional[int] = None
) -> List[Issue]:
"""List issues with filtering and caching."""
filter_criteria = IssueFilter(
state=state,
assignee=assignee,
labels=labels,
milestone=milestone,
search=search,
limit=limit,
offset=offset
)
# Create cache key from filter criteria
cache_key = self._cache_key(
"list_issues",
state or "all",
assignee or "any",
",".join(labels) if labels else "any",
milestone or "any",
search or "any",
limit or 0,
offset or 0
)
cached_issues = self._get_from_cache(cache_key)
if cached_issues:
return cached_issues
issues = self.backend.list_issues(filter_criteria)
self._set_cache(cache_key, issues)
return issues
def search_issues(self, query: str, limit: Optional[int] = None) -> List[Issue]:
"""Search issues with caching."""
cache_key = self._cache_key("search_issues", query, limit or 0)
cached_issues = self._get_from_cache(cache_key)
if cached_issues:
return cached_issues
issues = self.backend.search_issues(query, limit)
self._set_cache(cache_key, issues)
return issues
# Comment Operations
def add_comment(self, issue_id: str, comment_text: str, author_username: str = "cli-user") -> Comment:
"""Add comment to issue."""
if not comment_text.strip():
raise ValueError("Comment text cannot be empty")
# Try to find user
users = self.backend.search_users(author_username)
if users:
author = users[0]
else:
author = User(id=author_username, username=author_username)
comment = Comment(
id="",
body=comment_text.strip(),
author=author,
created_at=datetime.now(timezone.utc)
)
added_comment = self.backend.add_comment(issue_id, comment)
# Invalidate issue cache
self._invalidate_cache_pattern("get_issue")
logger.info(f"Added comment to issue {issue_id}")
return added_comment
def get_comments(self, issue_id: str) -> List[Comment]:
"""Get comments for issue."""
cache_key = self._cache_key("get_comments", issue_id)
cached_comments = self._get_from_cache(cache_key)
if cached_comments:
return cached_comments
comments = self.backend.get_comments(issue_id)
self._set_cache(cache_key, comments)
return comments
# Label Operations
def get_or_create_label(self, name: str, color: Optional[str] = None, description: Optional[str] = None) -> Label:
"""Get existing label or create new one."""
existing_labels = self.backend.get_labels()
for label in existing_labels:
if label.name == name:
return label
# Create new label
new_label = Label(name=name, color=color, description=description)
return self.backend.create_label(new_label)
# Statistics and Analytics
def get_issue_stats(self) -> Dict[str, Any]:
"""Get issue statistics."""
cache_key = self._cache_key("get_issue_stats")
cached_stats = self._get_from_cache(cache_key)
if cached_stats:
return cached_stats
all_issues = self.backend.list_issues()
stats = {
'total': len(all_issues),
'open': len([i for i in all_issues if i.state != IssueState.CLOSED]),
'closed': len([i for i in all_issues if i.state == IssueState.CLOSED]),
'by_state': {},
'by_priority': {},
'by_type': {},
'by_assignee': {},
'recent_activity': 0
}
# Count by state
for issue in all_issues:
state = issue.state.value
stats['by_state'][state] = stats['by_state'].get(state, 0) + 1
# Count by priority and type
one_week_ago = datetime.now(timezone.utc).timestamp() - 604800 # 7 days
for issue in all_issues:
# Priority
priority = issue.priority
if priority:
priority_name = priority.value
stats['by_priority'][priority_name] = stats['by_priority'].get(priority_name, 0) + 1
# Type
issue_type = issue.issue_type
if issue_type:
type_name = issue_type.value
stats['by_type'][type_name] = stats['by_type'].get(type_name, 0) + 1
# Assignee
if issue.assignees:
for assignee in issue.assignees:
username = assignee.username
stats['by_assignee'][username] = stats['by_assignee'].get(username, 0) + 1
# Recent activity
if issue.updated_at.timestamp() > one_week_ago:
stats['recent_activity'] += 1
self._set_cache(cache_key, stats)
return stats
# Bulk Operations
def bulk_close_issues(self, issue_numbers: List[int], comment: Optional[str] = None) -> List[Issue]:
"""Close multiple issues."""
results = []
for number in issue_numbers:
try:
closed_issue = self.close_issue(number, comment)
results.append(closed_issue)
except Exception as e:
logger.error(f"Failed to close issue #{number}: {e}")
return results
def bulk_add_label(self, issue_numbers: List[int], label_name: str) -> List[Issue]:
"""Add label to multiple issues."""
label = self.get_or_create_label(label_name)
results = []
for number in issue_numbers:
try:
issue = self.get_issue_by_number(number)
if issue:
issue.add_label(label)
updated_issue = self.update_issue(issue)
results.append(updated_issue)
except Exception as e:
logger.error(f"Failed to add label to issue #{number}: {e}")
return results
# Cache Management
def clear_cache(self) -> None:
"""Clear all cached data."""
self._cache.clear()
logger.info("Repository cache cleared")
def get_cache_stats(self) -> Dict[str, Any]:
"""Get cache statistics."""
if not self.enable_caching:
return {'enabled': False}
now = datetime.now(timezone.utc).timestamp()
expired_count = 0
for cached_item in self._cache.values():
if now - cached_item['timestamp'] > self._cache_timeout:
expired_count += 1
return {
'enabled': True,
'total_entries': len(self._cache),
'expired_entries': expired_count,
'cache_timeout': self._cache_timeout
}
# Context Manager Support
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if hasattr(self.backend, 'disconnect'):
self.backend.disconnect()

164
pyproject.toml Normal file
View File

@@ -0,0 +1,164 @@
[build-system]
requires = ["setuptools>=45", "wheel", "setuptools-scm[toml]>=6.2"]
build-backend = "setuptools.build_meta"
[project]
name = "universal-issue-tracker"
description = "Backend-agnostic issue tracking system with plugin architecture"
readme = "README.md"
requires-python = ">=3.8"
license = {text = "MIT"}
authors = [
{name = "MarkiTect Project", email = "noreply@example.com"},
]
keywords = ["issue-tracking", "project-management", "cli", "gitea", "github", "jira"]
classifiers = [
"Development Status :: 4 - Beta",
"Environment :: Console",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Topic :: Software Development :: Bug Tracking",
"Topic :: Software Development :: Libraries :: Python Modules",
"Topic :: Utilities",
]
dependencies = [
"click>=8.0.0",
"requests>=2.25.0",
"python-dateutil>=2.8.0",
]
dynamic = ["version"]
[project.optional-dependencies]
dev = [
"pytest>=6.0",
"pytest-cov>=2.0",
"pytest-mock>=3.0",
"black>=22.0",
"isort>=5.0",
"flake8>=4.0",
"mypy>=0.900",
"pre-commit>=2.0",
]
docs = [
"sphinx>=4.0",
"sphinx-rtd-theme>=1.0",
"sphinx-click>=3.0",
]
gitea = [
"requests>=2.25.0",
]
github = [
"pygithub>=1.55",
]
jira = [
"jira>=3.0",
]
[project.urls]
Homepage = "https://github.com/markitect/universal-issue-tracker"
Documentation = "https://universal-issue-tracker.readthedocs.io/"
Repository = "https://github.com/markitect/universal-issue-tracker.git"
"Bug Tracker" = "https://github.com/markitect/universal-issue-tracker/issues"
[project.scripts]
issue = "issue_tracker.cli.main:main"
issue-tracker = "issue_tracker.cli.main:main"
[tool.setuptools]
packages = ["issue_tracker"]
[tool.setuptools.dynamic]
version = {attr = "issue_tracker.__version__"}
[tool.setuptools.package-data]
issue_tracker = ["backends/local/schema.sql"]
[tool.black]
line-length = 100
target-version = ['py38']
include = '\.pyi?$'
extend-exclude = '''
/(
# directories
\.eggs
| \.git
| \.hg
| \.mypy_cache
| \.tox
| \.venv
| build
| dist
)/
'''
[tool.isort]
profile = "black"
line_length = 100
known_first_party = ["issue_tracker"]
[tool.mypy]
python_version = "3.8"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true
disallow_incomplete_defs = true
check_untyped_defs = true
disallow_untyped_decorators = true
no_implicit_optional = true
warn_redundant_casts = true
warn_unused_ignores = true
warn_no_return = true
warn_unreachable = true
strict_equality = true
[[tool.mypy.overrides]]
module = [
"requests.*",
"click.*",
]
ignore_missing_imports = true
[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = ["test_*.py", "*_test.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]
addopts = [
"--strict-markers",
"--disable-warnings",
"--tb=short",
]
markers = [
"unit: Unit tests",
"integration: Integration tests",
"slow: Slow tests",
]
[tool.coverage.run]
source = ["issue_tracker"]
omit = [
"*/tests/*",
"*/test_*",
"setup.py",
]
[tool.coverage.report]
exclude_lines = [
"pragma: no cover",
"def __repr__",
"if self.debug:",
"if settings.DEBUG",
"raise AssertionError",
"raise NotImplementedError",
"if 0:",
"if __name__ == .__main__.:",
"class .*\\bProtocol\\):",
"@(abc\\.)?abstractmethod",
]