""" Repository Pattern Implementation Provides a high-level repository interface that abstracts backend operations and adds features like caching, transaction support, and business logic. """ from abc import ABC, abstractmethod from typing import List, Optional, Dict, Any, Union from datetime import datetime, timezone import logging from .interfaces import IssueBackend, IssueFilter from .models import Issue, Label, User, Milestone, Comment, IssueState logger = logging.getLogger(__name__) class IssueRepository: """ High-level repository for issue operations. Provides a clean interface for issue management with additional features like caching, validation, and business rule enforcement. """ def __init__(self, backend: IssueBackend, enable_caching: bool = True): self.backend = backend self.enable_caching = enable_caching self._cache: Dict[str, Any] = {} self._cache_timeout = 300 # 5 minutes def _cache_key(self, operation: str, *args) -> str: """Generate cache key for operation.""" return f"{operation}:{':'.join(str(arg) for arg in args)}" def _get_from_cache(self, key: str) -> Optional[Any]: """Get value from cache if enabled and not expired.""" if not self.enable_caching or key not in self._cache: return None cached_item = self._cache[key] if datetime.now(timezone.utc).timestamp() - cached_item['timestamp'] > self._cache_timeout: del self._cache[key] return None return cached_item['value'] def _set_cache(self, key: str, value: Any) -> None: """Set value in cache if enabled.""" if self.enable_caching: self._cache[key] = { 'value': value, 'timestamp': datetime.now(timezone.utc).timestamp() } def _invalidate_cache_pattern(self, pattern: str) -> None: """Invalidate cache entries matching pattern.""" if not self.enable_caching: return keys_to_remove = [key for key in self._cache.keys() if pattern in key] for key in keys_to_remove: del self._cache[key] # Issue Operations def create_issue( self, title: str, description: str = "", labels: Optional[List[str]] = None, assignees: Optional[List[str]] = None, milestone: Optional[str] = None, issue_type: Optional[str] = None, priority: Optional[str] = None ) -> Issue: """ Create a new issue with business rule validation. Args: title: Issue title (required) description: Issue description labels: List of label names assignees: List of usernames to assign milestone: Milestone title or ID issue_type: Issue type (bug, feature, etc.) priority: Priority level (low, medium, high, critical) Returns: Created issue """ # Validation if not title.strip(): raise ValueError("Issue title cannot be empty") # Build labels issue_labels = [] if labels: for label_name in labels: issue_labels.append(Label(name=label_name.strip())) # Add type and priority as labels if issue_type: issue_labels.append(Label(name=issue_type)) if priority: issue_labels.append(Label(name=f'priority:{priority}')) # Resolve assignees issue_assignees = [] if assignees: for username in assignees: users = self.backend.search_users(username) if users: issue_assignees.append(users[0]) else: # Create basic user if not found issue_assignees.append(User(id=username, username=username)) # Resolve milestone issue_milestone = None if milestone: milestones = self.backend.get_milestones() for m in milestones: if m.title == milestone or m.id == milestone: issue_milestone = m break # Create issue now = datetime.now(timezone.utc) issue = Issue( id="", # Will be set by backend number=0, # Will be set by backend title=title.strip(), description=description.strip(), state=IssueState.OPEN, created_at=now, updated_at=now, labels=issue_labels, assignees=issue_assignees, milestone=issue_milestone ) created_issue = self.backend.create_issue(issue) # Invalidate relevant caches self._invalidate_cache_pattern("list_issues") self._invalidate_cache_pattern("search_issues") logger.info(f"Created issue #{created_issue.number}: {created_issue.title}") return created_issue def get_issue(self, issue_id: Union[str, int]) -> Optional[Issue]: """Get issue by ID or number.""" if isinstance(issue_id, int): return self.get_issue_by_number(issue_id) cache_key = self._cache_key("get_issue", issue_id) cached_issue = self._get_from_cache(cache_key) if cached_issue: return cached_issue issue = self.backend.get_issue(str(issue_id)) if issue: self._set_cache(cache_key, issue) return issue def get_issue_by_number(self, number: int) -> Optional[Issue]: """Get issue by number.""" cache_key = self._cache_key("get_issue_by_number", number) cached_issue = self._get_from_cache(cache_key) if cached_issue: return cached_issue issue = self.backend.get_issue_by_number(number) if issue: self._set_cache(cache_key, issue) return issue def update_issue(self, issue: Issue) -> Issue: """Update issue with validation.""" if not issue.title.strip(): raise ValueError("Issue title cannot be empty") issue.updated_at = datetime.now(timezone.utc) updated_issue = self.backend.update_issue(issue) # Invalidate caches self._invalidate_cache_pattern("get_issue") self._invalidate_cache_pattern("list_issues") self._invalidate_cache_pattern("search_issues") logger.info(f"Updated issue #{updated_issue.number}: {updated_issue.title}") return updated_issue def close_issue(self, issue_id: Union[str, int], comment: Optional[str] = None) -> Issue: """Close issue with optional comment.""" issue = self.get_issue(issue_id) if not issue: raise ValueError(f"Issue {issue_id} not found") if issue.state == IssueState.CLOSED: raise ValueError(f"Issue #{issue.number} is already closed") issue.close() # Add closing comment if provided if comment: self.add_comment(issue.id, comment) return self.update_issue(issue) def reopen_issue(self, issue_id: Union[str, int], comment: Optional[str] = None) -> Issue: """Reopen issue with optional comment.""" issue = self.get_issue(issue_id) if not issue: raise ValueError(f"Issue {issue_id} not found") if issue.state != IssueState.CLOSED: raise ValueError(f"Issue #{issue.number} is not closed") issue.reopen() # Add reopening comment if provided if comment: self.add_comment(issue.id, comment) return self.update_issue(issue) def list_issues( self, state: Optional[str] = None, assignee: Optional[str] = None, labels: Optional[List[str]] = None, milestone: Optional[str] = None, search: Optional[str] = None, limit: Optional[int] = None, offset: Optional[int] = None ) -> List[Issue]: """List issues with filtering and caching.""" filter_criteria = IssueFilter( state=state, assignee=assignee, labels=labels, milestone=milestone, search=search, limit=limit, offset=offset ) # Create cache key from filter criteria cache_key = self._cache_key( "list_issues", state or "all", assignee or "any", ",".join(labels) if labels else "any", milestone or "any", search or "any", limit or 0, offset or 0 ) cached_issues = self._get_from_cache(cache_key) if cached_issues: return cached_issues issues = self.backend.list_issues(filter_criteria) self._set_cache(cache_key, issues) return issues def search_issues(self, query: str, limit: Optional[int] = None) -> List[Issue]: """Search issues with caching.""" cache_key = self._cache_key("search_issues", query, limit or 0) cached_issues = self._get_from_cache(cache_key) if cached_issues: return cached_issues issues = self.backend.search_issues(query, limit) self._set_cache(cache_key, issues) return issues # Comment Operations def add_comment(self, issue_id: str, comment_text: str, author_username: str = "cli-user") -> Comment: """Add comment to issue.""" if not comment_text.strip(): raise ValueError("Comment text cannot be empty") # Try to find user users = self.backend.search_users(author_username) if users: author = users[0] else: author = User(id=author_username, username=author_username) comment = Comment( id="", body=comment_text.strip(), author=author, created_at=datetime.now(timezone.utc) ) added_comment = self.backend.add_comment(issue_id, comment) # Invalidate issue cache self._invalidate_cache_pattern("get_issue") logger.info(f"Added comment to issue {issue_id}") return added_comment def get_comments(self, issue_id: str) -> List[Comment]: """Get comments for issue.""" cache_key = self._cache_key("get_comments", issue_id) cached_comments = self._get_from_cache(cache_key) if cached_comments: return cached_comments comments = self.backend.get_comments(issue_id) self._set_cache(cache_key, comments) return comments # Label Operations def get_or_create_label(self, name: str, color: Optional[str] = None, description: Optional[str] = None) -> Label: """Get existing label or create new one.""" existing_labels = self.backend.get_labels() for label in existing_labels: if label.name == name: return label # Create new label new_label = Label(name=name, color=color, description=description) return self.backend.create_label(new_label) # Statistics and Analytics def get_issue_stats(self) -> Dict[str, Any]: """Get issue statistics.""" cache_key = self._cache_key("get_issue_stats") cached_stats = self._get_from_cache(cache_key) if cached_stats: return cached_stats all_issues = self.backend.list_issues() stats = { 'total': len(all_issues), 'open': len([i for i in all_issues if i.state != IssueState.CLOSED]), 'closed': len([i for i in all_issues if i.state == IssueState.CLOSED]), 'by_state': {}, 'by_priority': {}, 'by_type': {}, 'by_assignee': {}, 'recent_activity': 0 } # Count by state for issue in all_issues: state = issue.state.value stats['by_state'][state] = stats['by_state'].get(state, 0) + 1 # Count by priority and type one_week_ago = datetime.now(timezone.utc).timestamp() - 604800 # 7 days for issue in all_issues: # Priority priority = issue.priority if priority: priority_name = priority.value stats['by_priority'][priority_name] = stats['by_priority'].get(priority_name, 0) + 1 # Type issue_type = issue.issue_type if issue_type: type_name = issue_type.value stats['by_type'][type_name] = stats['by_type'].get(type_name, 0) + 1 # Assignee if issue.assignees: for assignee in issue.assignees: username = assignee.username stats['by_assignee'][username] = stats['by_assignee'].get(username, 0) + 1 # Recent activity if issue.updated_at.timestamp() > one_week_ago: stats['recent_activity'] += 1 self._set_cache(cache_key, stats) return stats # Bulk Operations def bulk_close_issues(self, issue_numbers: List[int], comment: Optional[str] = None) -> List[Issue]: """Close multiple issues.""" results = [] for number in issue_numbers: try: closed_issue = self.close_issue(number, comment) results.append(closed_issue) except Exception as e: logger.error(f"Failed to close issue #{number}: {e}") return results def bulk_add_label(self, issue_numbers: List[int], label_name: str) -> List[Issue]: """Add label to multiple issues.""" label = self.get_or_create_label(label_name) results = [] for number in issue_numbers: try: issue = self.get_issue_by_number(number) if issue: issue.add_label(label) updated_issue = self.update_issue(issue) results.append(updated_issue) except Exception as e: logger.error(f"Failed to add label to issue #{number}: {e}") return results # Cache Management def clear_cache(self) -> None: """Clear all cached data.""" self._cache.clear() logger.info("Repository cache cleared") def get_cache_stats(self) -> Dict[str, Any]: """Get cache statistics.""" if not self.enable_caching: return {'enabled': False} now = datetime.now(timezone.utc).timestamp() expired_count = 0 for cached_item in self._cache.values(): if now - cached_item['timestamp'] > self._cache_timeout: expired_count += 1 return { 'enabled': True, 'total_entries': len(self._cache), 'expired_entries': expired_count, 'cache_timeout': self._cache_timeout } # Context Manager Support def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): if hasattr(self.backend, 'disconnect'): self.backend.disconnect()