""" Gitea Backend Implementation Provides integration with Gitea API for remote issue tracking. This backend adapts the Gitea API to our unified issue model. """ import requests import time from datetime import datetime, timezone from typing import List, Optional, Dict, Any from urllib.parse import urljoin from ...core.interfaces import RemoteBackend, BackendCapabilities, IssueFilter, SyncableBackend from ...core.models import Issue, Label, User, Milestone, Comment, IssueState, Priority, IssueType class GiteaAPIError(Exception): """Gitea API specific errors.""" pass class GiteaRateLimitError(GiteaAPIError): """Rate limit exceeded.""" pass class GiteaBackend(RemoteBackend, SyncableBackend): """Gitea API backend for remote issue tracking.""" def __init__(self): self.base_url: Optional[str] = None self.token: Optional[str] = None self.owner: Optional[str] = None self.repo: Optional[str] = None self.session = requests.Session() self._capabilities = BackendCapabilities( supports_milestones=True, supports_assignees=True, supports_comments=True, supports_labels=True, supports_search=True, supports_bulk_operations=False, supports_webhooks=True, supports_real_time=False, max_labels_per_issue=None, max_assignees_per_issue=10 # Gitea typical limit ) @property def backend_type(self) -> str: return "gitea" @property def capabilities(self) -> BackendCapabilities: return self._capabilities def connect(self, config: Dict[str, Any]) -> None: """Connect to Gitea API.""" self.base_url = config['base_url'].rstrip('/') self.token = config['token'] self.owner = config['owner'] self.repo = config['repo'] # Setup session with authentication self.session.headers.update({ 'Authorization': f'token {self.token}', 'Content-Type': 'application/json', 'Accept': 'application/json' }) # Test connection if not self.test_connection(): raise GiteaAPIError("Failed to connect to Gitea API") def disconnect(self) -> None: """Disconnect from Gitea API.""" self.session.close() self.base_url = None self.token = None self.owner = None self.repo = None def test_connection(self) -> bool: """Test Gitea API connection.""" try: response = self._api_request('GET', f'/repos/{self.owner}/{self.repo}') return response.status_code == 200 except Exception: return False def _api_request(self, method: str, endpoint: str, data: Optional[Dict] = None, params: Optional[Dict] = None) -> requests.Response: """Make API request with error handling and rate limiting.""" # Fix urljoin issue - ensure endpoint doesn't start with / when base ends with / base = f"{self.base_url}/api/v1/" endpoint = endpoint.lstrip('/') url = urljoin(base, endpoint) try: response = self.session.request(method, url, json=data, params=params) # Handle rate limiting if response.status_code == 429: retry_after = int(response.headers.get('Retry-After', 60)) raise GiteaRateLimitError(f"Rate limit exceeded. Retry after {retry_after} seconds") response.raise_for_status() return response except requests.exceptions.RequestException as e: raise GiteaAPIError(f"API request failed: {e}") def _gitea_issue_to_unified(self, gitea_issue: Dict[str, Any]) -> Issue: """Convert Gitea issue JSON to unified Issue model.""" # Convert labels labels = [] for label_data in gitea_issue.get('labels', []): labels.append(Label( name=label_data['name'], color=label_data['color'], description=label_data.get('description', ''), backend_id=str(label_data['id']) )) # Convert assignees assignees = [] if gitea_issue.get('assignees'): for assignee_data in gitea_issue['assignees']: assignees.append(User( id=str(assignee_data['id']), username=assignee_data['login'], display_name=assignee_data.get('full_name', ''), email=assignee_data.get('email', ''), avatar_url=assignee_data.get('avatar_url', ''), backend_id=str(assignee_data['id']) )) # Convert milestone milestone = None if gitea_issue.get('milestone'): milestone_data = gitea_issue['milestone'] milestone = Milestone( id=str(milestone_data['id']), title=milestone_data['title'], description=milestone_data.get('description', ''), state=milestone_data['state'], due_date=datetime.fromisoformat(milestone_data['due_on'].replace('Z', '+00:00')) if milestone_data.get('due_on') else None, created_at=datetime.fromisoformat(milestone_data['created_at'].replace('Z', '+00:00')) if milestone_data.get('created_at') else None, updated_at=datetime.fromisoformat(milestone_data['updated_at'].replace('Z', '+00:00')) if milestone_data.get('updated_at') else None, backend_id=str(milestone_data['id']) ) # Determine state if gitea_issue['state'] == 'closed': state = IssueState.CLOSED else: # Check for status labels to determine more specific state for label in labels: if label.name == 'status:in_progress' or label.name == 'status:in-progress': state = IssueState.IN_PROGRESS break elif label.name == 'status:blocked': state = IssueState.BLOCKED break else: state = IssueState.OPEN return Issue( id=str(gitea_issue['id']), number=gitea_issue['number'], title=gitea_issue['title'], description=gitea_issue.get('body', ''), state=state, created_at=datetime.fromisoformat(gitea_issue['created_at'].replace('Z', '+00:00')), updated_at=datetime.fromisoformat(gitea_issue['updated_at'].replace('Z', '+00:00')), closed_at=datetime.fromisoformat(gitea_issue['closed_at'].replace('Z', '+00:00')) if gitea_issue.get('closed_at') else None, labels=labels, assignees=assignees, milestone=milestone, backend_id=str(gitea_issue['id']), backend_type='gitea' ) def _unified_issue_to_gitea(self, issue: Issue) -> Dict[str, Any]: """Convert unified Issue to Gitea API format.""" data = { 'title': issue.title, 'body': issue.description, 'state': 'closed' if issue.state == IssueState.CLOSED else 'open' } if issue.assignees: data['assignees'] = [assignee.username for assignee in issue.assignees] if issue.milestone: data['milestone'] = int(issue.milestone.backend_id) if issue.milestone.backend_id else None # Convert labels if issue.labels: data['labels'] = [label.name for label in issue.labels] return data # Issue CRUD Operations def create_issue(self, issue: Issue) -> Issue: """Create issue in Gitea.""" data = self._unified_issue_to_gitea(issue) response = self._api_request('POST', f'/repos/{self.owner}/{self.repo}/issues', data=data) gitea_issue = response.json() return self._gitea_issue_to_unified(gitea_issue) def get_issue(self, issue_id: str) -> Optional[Issue]: """Get issue from Gitea by ID.""" try: response = self._api_request('GET', f'/repos/{self.owner}/{self.repo}/issues/{issue_id}') gitea_issue = response.json() return self._gitea_issue_to_unified(gitea_issue) except GiteaAPIError: return None def get_issue_by_number(self, number: int) -> Optional[Issue]: """Get issue by number.""" return self.get_issue(str(number)) def update_issue(self, issue: Issue) -> Issue: """Update issue in Gitea.""" data = self._unified_issue_to_gitea(issue) response = self._api_request('PATCH', f'/repos/{self.owner}/{self.repo}/issues/{issue.number}', data=data) gitea_issue = response.json() return self._gitea_issue_to_unified(gitea_issue) def delete_issue(self, issue_id: str) -> bool: """Delete issue - not supported by Gitea API.""" # Gitea doesn't support deleting issues via API # We could close it instead try: issue = self.get_issue(issue_id) if issue: issue.close() self.update_issue(issue) return True except Exception: pass return False def list_issues(self, filter_criteria: Optional[IssueFilter] = None) -> List[Issue]: """List issues from Gitea.""" params = { 'state': 'all', # Get both open and closed 'sort': 'updated', 'order': 'desc' } if filter_criteria: if filter_criteria.state: if filter_criteria.state == 'open': params['state'] = 'open' elif filter_criteria.state == 'closed': params['state'] = 'closed' if filter_criteria.assignee: params['assignee'] = filter_criteria.assignee if filter_criteria.milestone: params['milestone'] = filter_criteria.milestone if filter_criteria.labels: params['labels'] = ','.join(filter_criteria.labels) if filter_criteria.created_after: params['since'] = filter_criteria.created_after.isoformat() if filter_criteria.limit: params['limit'] = filter_criteria.limit if filter_criteria.offset: params['page'] = (filter_criteria.offset // (filter_criteria.limit or 30)) + 1 response = self._api_request('GET', f'/repos/{self.owner}/{self.repo}/issues', params=params) gitea_issues = response.json() issues = [self._gitea_issue_to_unified(gitea_issue) for gitea_issue in gitea_issues] # Apply additional filtering that Gitea API doesn't support if filter_criteria: if filter_criteria.search: search_term = filter_criteria.search.lower() issues = [ issue for issue in issues if search_term in issue.title.lower() or search_term in issue.description.lower() ] return issues def search_issues(self, query: str, limit: Optional[int] = None) -> List[Issue]: """Search issues - limited Gitea API support.""" # Gitea has limited search API, fallback to list with search filter filter_criteria = IssueFilter(search=query, limit=limit) return self.list_issues(filter_criteria) # Label Operations def create_label(self, label: Label) -> Label: """Create label in Gitea.""" data = { 'name': label.name, 'color': label.color or '#000000', 'description': label.description or '' } response = self._api_request('POST', f'/repos/{self.owner}/{self.repo}/labels', data=data) gitea_label = response.json() return Label( name=gitea_label['name'], color=gitea_label['color'], description=gitea_label.get('description', ''), backend_id=str(gitea_label['id']) ) def get_labels(self) -> List[Label]: """Get all labels from Gitea.""" response = self._api_request('GET', f'/repos/{self.owner}/{self.repo}/labels') gitea_labels = response.json() return [Label( name=label['name'], color=label['color'], description=label.get('description', ''), backend_id=str(label['id']) ) for label in gitea_labels] def update_label(self, label: Label) -> Label: """Update label in Gitea.""" data = { 'name': label.name, 'color': label.color or '#000000', 'description': label.description or '' } response = self._api_request('PATCH', f'/repos/{self.owner}/{self.repo}/labels/{label.name}', data=data) gitea_label = response.json() return Label( name=gitea_label['name'], color=gitea_label['color'], description=gitea_label.get('description', ''), backend_id=str(gitea_label['id']) ) def delete_label(self, label_name: str) -> bool: """Delete label from Gitea.""" try: self._api_request('DELETE', f'/repos/{self.owner}/{self.repo}/labels/{label_name}') return True except GiteaAPIError: return False # User Operations def get_users(self) -> List[User]: """Get repository collaborators.""" response = self._api_request('GET', f'/repos/{self.owner}/{self.repo}/collaborators') gitea_users = response.json() return [User( id=str(user['id']), username=user['login'], display_name=user.get('full_name', ''), email=user.get('email', ''), avatar_url=user.get('avatar_url', ''), backend_id=str(user['id']) ) for user in gitea_users] def get_user(self, user_id: str) -> Optional[User]: """Get specific user.""" try: response = self._api_request('GET', f'/users/{user_id}') user = response.json() return User( id=str(user['id']), username=user['login'], display_name=user.get('full_name', ''), email=user.get('email', ''), avatar_url=user.get('avatar_url', ''), backend_id=str(user['id']) ) except GiteaAPIError: return None def search_users(self, query: str) -> List[User]: """Search users in Gitea.""" params = {'q': query, 'limit': 50} response = self._api_request('GET', '/users/search', params=params) search_result = response.json() return [User( id=str(user['id']), username=user['login'], display_name=user.get('full_name', ''), email=user.get('email', ''), avatar_url=user.get('avatar_url', ''), backend_id=str(user['id']) ) for user in search_result.get('data', [])] # Milestone Operations def create_milestone(self, milestone: Milestone) -> Milestone: """Create milestone in Gitea.""" data = { 'title': milestone.title, 'description': milestone.description or '', 'due_on': milestone.due_date.isoformat() if milestone.due_date else None } response = self._api_request('POST', f'/repos/{self.owner}/{self.repo}/milestones', data=data) gitea_milestone = response.json() return Milestone( id=str(gitea_milestone['id']), title=gitea_milestone['title'], description=gitea_milestone.get('description', ''), state=gitea_milestone['state'], due_date=datetime.fromisoformat(gitea_milestone['due_on'].replace('Z', '+00:00')) if gitea_milestone.get('due_on') else None, created_at=datetime.fromisoformat(gitea_milestone['created_at'].replace('Z', '+00:00')), updated_at=datetime.fromisoformat(gitea_milestone['updated_at'].replace('Z', '+00:00')), backend_id=str(gitea_milestone['id']) ) def get_milestones(self) -> List[Milestone]: """Get all milestones.""" response = self._api_request('GET', f'/repos/{self.owner}/{self.repo}/milestones') gitea_milestones = response.json() return [Milestone( id=str(m['id']), title=m['title'], description=m.get('description', ''), state=m['state'], due_date=datetime.fromisoformat(m['due_on'].replace('Z', '+00:00')) if m.get('due_on') else None, created_at=datetime.fromisoformat(m['created_at'].replace('Z', '+00:00')), updated_at=datetime.fromisoformat(m['updated_at'].replace('Z', '+00:00')), backend_id=str(m['id']) ) for m in gitea_milestones] def update_milestone(self, milestone: Milestone) -> Milestone: """Update milestone.""" data = { 'title': milestone.title, 'description': milestone.description or '', 'state': milestone.state, 'due_on': milestone.due_date.isoformat() if milestone.due_date else None } response = self._api_request('PATCH', f'/repos/{self.owner}/{self.repo}/milestones/{milestone.backend_id}', data=data) gitea_milestone = response.json() return Milestone( id=str(gitea_milestone['id']), title=gitea_milestone['title'], description=gitea_milestone.get('description', ''), state=gitea_milestone['state'], due_date=datetime.fromisoformat(gitea_milestone['due_on'].replace('Z', '+00:00')) if gitea_milestone.get('due_on') else None, created_at=datetime.fromisoformat(gitea_milestone['created_at'].replace('Z', '+00:00')), updated_at=datetime.fromisoformat(gitea_milestone['updated_at'].replace('Z', '+00:00')), backend_id=str(gitea_milestone['id']) ) def delete_milestone(self, milestone_id: str) -> bool: """Delete milestone.""" try: self._api_request('DELETE', f'/repos/{self.owner}/{self.repo}/milestones/{milestone_id}') return True except GiteaAPIError: return False # Comment Operations def add_comment(self, issue_id: str, comment: Comment) -> Comment: """Add comment to issue.""" data = {'body': comment.body} response = self._api_request('POST', f'/repos/{self.owner}/{self.repo}/issues/{issue_id}/comments', data=data) gitea_comment = response.json() # Convert author author_data = gitea_comment['user'] author = User( id=str(author_data['id']), username=author_data['login'], display_name=author_data.get('full_name', ''), email=author_data.get('email', ''), avatar_url=author_data.get('avatar_url', ''), backend_id=str(author_data['id']) ) return Comment( id=str(gitea_comment['id']), body=gitea_comment['body'], author=author, created_at=datetime.fromisoformat(gitea_comment['created_at'].replace('Z', '+00:00')), updated_at=datetime.fromisoformat(gitea_comment['updated_at'].replace('Z', '+00:00')), backend_id=str(gitea_comment['id']) ) def get_comments(self, issue_id: str) -> List[Comment]: """Get comments for issue.""" response = self._api_request('GET', f'/repos/{self.owner}/{self.repo}/issues/{issue_id}/comments') gitea_comments = response.json() comments = [] for gc in gitea_comments: author_data = gc['user'] author = User( id=str(author_data['id']), username=author_data['login'], display_name=author_data.get('full_name', ''), email=author_data.get('email', ''), avatar_url=author_data.get('avatar_url', ''), backend_id=str(author_data['id']) ) comment = Comment( id=str(gc['id']), body=gc['body'], author=author, created_at=datetime.fromisoformat(gc['created_at'].replace('Z', '+00:00')), updated_at=datetime.fromisoformat(gc['updated_at'].replace('Z', '+00:00')), backend_id=str(gc['id']) ) comments.append(comment) return comments def update_comment(self, comment: Comment) -> Comment: """Update comment.""" data = {'body': comment.body} response = self._api_request('PATCH', f'/repos/{self.owner}/{self.repo}/issues/comments/{comment.backend_id}', data=data) gitea_comment = response.json() # Update comment object comment.updated_at = datetime.fromisoformat(gitea_comment['updated_at'].replace('Z', '+00:00')) return comment def delete_comment(self, comment_id: str) -> bool: """Delete comment.""" try: self._api_request('DELETE', f'/repos/{self.owner}/{self.repo}/issues/comments/{comment_id}') return True except GiteaAPIError: return False # Sync Support def get_last_sync_timestamp(self) -> Optional[datetime]: """Get last sync timestamp - stored in metadata.""" # Could be stored in repository description or other metadata return None def get_issues_modified_since(self, timestamp: datetime) -> List[Issue]: """Get issues modified since timestamp.""" filter_criteria = IssueFilter(updated_after=timestamp) return self.list_issues(filter_criteria) # SyncableBackend Implementation def prepare_for_sync(self) -> None: """Prepare for sync operation.""" # Could implement rate limiting preparation pass def finalize_sync(self, success: bool) -> None: """Finalize sync operation.""" # Could log sync status pass def get_sync_conflicts(self) -> List[Dict[str, Any]]: """Get sync conflicts.""" # Would compare timestamps and detect conflicts return [] def resolve_sync_conflict(self, issue_id: str, resolution: str) -> Issue: """Resolve sync conflict.""" issue = self.get_issue(issue_id) if not issue: raise GiteaAPIError(f"Issue {issue_id} not found") if resolution == 'remote': # Keep remote version (current issue) return issue elif resolution == 'local': # This would require the local version to be provided raise NotImplementedError("Local resolution requires local issue data") else: raise ValueError(f"Unknown resolution: {resolution}")