Add comprehensive issue tracking facade system that provides a unified CLI interface to any issue tracking backend. The facade automatically detects the repository's issue tracker and provides consistent commands across all platforms. Key features: - Repository-aware automatic backend detection (GitHub, GitLab, Gitea, local SQLite) - Unified CLI interface with same commands across all backends - Plugin architecture for extensible backend support - Local SQLite backend for offline development - Gitea backend with full API integration - Bidirectional synchronization between backends - Performance-optimized domain models with caching - Clean architecture with separation of concerns The facade acts as a "universal remote control" for issue tracking systems, eliminating the need to learn different CLIs for each platform while providing seamless offline capability and cross-platform consistency. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
336 lines
10 KiB
Python
336 lines
10 KiB
Python
"""
|
|
CLI Utility Functions
|
|
|
|
Helper functions for the CLI commands including formatting, configuration,
|
|
backend management, and user interaction.
|
|
"""
|
|
|
|
import click
|
|
import os
|
|
import sys
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional, List
|
|
import tempfile
|
|
|
|
from ..core.interfaces import IssueBackend, BackendFactory
|
|
from ..core.models import Issue, Comment
|
|
from ..backends.local import LocalSQLiteBackend
|
|
from ..backends.gitea import GiteaBackend
|
|
|
|
|
|
# Register available backends
|
|
BackendFactory.register_backend('local', LocalSQLiteBackend)
|
|
BackendFactory.register_backend('gitea', GiteaBackend)
|
|
|
|
|
|
def get_config_dir() -> Path:
|
|
"""Get configuration directory."""
|
|
config_dir = Path.home() / '.config' / 'issue-tracker'
|
|
config_dir.mkdir(parents=True, exist_ok=True)
|
|
return config_dir
|
|
|
|
|
|
def get_backend_config_path() -> Path:
|
|
"""Get backend configuration file path."""
|
|
return get_config_dir() / 'backends.json'
|
|
|
|
|
|
def load_backend_configs() -> dict:
|
|
"""Load backend configurations."""
|
|
config_path = get_backend_config_path()
|
|
if not config_path.exists():
|
|
return {}
|
|
|
|
import json
|
|
try:
|
|
with open(config_path, 'r') as f:
|
|
return json.load(f)
|
|
except (json.JSONDecodeError, IOError):
|
|
return {}
|
|
|
|
|
|
def save_backend_configs(configs: dict) -> None:
|
|
"""Save backend configurations."""
|
|
config_path = get_backend_config_path()
|
|
import json
|
|
with open(config_path, 'w') as f:
|
|
json.dump(configs, f, indent=2)
|
|
|
|
|
|
def get_default_backend() -> str:
|
|
"""Get the default backend name."""
|
|
configs = load_backend_configs()
|
|
return configs.get('default', 'local')
|
|
|
|
|
|
def get_backend(ctx) -> IssueBackend:
|
|
"""Get backend instance from context."""
|
|
backend_name = ctx.obj.get('backend') or get_default_backend()
|
|
configs = load_backend_configs()
|
|
|
|
if backend_name not in configs:
|
|
if backend_name == 'local':
|
|
# Auto-configure local backend
|
|
local_config = {
|
|
'type': 'local',
|
|
'db_path': str(get_config_dir() / 'issues.db')
|
|
}
|
|
configs['local'] = local_config
|
|
save_backend_configs(configs)
|
|
else:
|
|
raise click.ClickException(f"Backend '{backend_name}' not configured. Use 'backend add' to configure it.")
|
|
|
|
backend_config = configs[backend_name]
|
|
backend_type = backend_config['type']
|
|
|
|
try:
|
|
backend = BackendFactory.create_backend(backend_type)
|
|
backend.connect(backend_config)
|
|
return backend
|
|
except Exception as e:
|
|
raise click.ClickException(f"Failed to connect to backend '{backend_name}': {e}")
|
|
|
|
|
|
def format_issue_list(issues: List[Issue]) -> str:
|
|
"""Format list of issues as a table."""
|
|
if not issues:
|
|
return "No issues found."
|
|
|
|
# Calculate column widths
|
|
max_title_width = min(50, max(len(issue.title) for issue in issues))
|
|
max_assignee_width = 15
|
|
|
|
# Header
|
|
lines = []
|
|
header = f"{'#':<6} {'State':<12} {'Title':<{max_title_width}} {'Assignee':<{max_assignee_width}} Labels"
|
|
lines.append(header)
|
|
lines.append("-" * len(header))
|
|
|
|
# Issues
|
|
for issue in issues:
|
|
title = issue.title[:max_title_width]
|
|
if len(issue.title) > max_title_width:
|
|
title = title[:-3] + "..."
|
|
|
|
assignee = issue.primary_assignee.username if issue.primary_assignee else "unassigned"
|
|
assignee = assignee[:max_assignee_width]
|
|
|
|
labels = ", ".join(label.name for label in issue.labels[:3])
|
|
if len(issue.labels) > 3:
|
|
labels += f" (+{len(issue.labels) - 3})"
|
|
|
|
line = f"{issue.number:<6} {issue.state.value:<12} {title:<{max_title_width}} {assignee:<{max_assignee_width}} {labels}"
|
|
lines.append(line)
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def format_issue(issue: Issue, show_comments: bool = False, backend: Optional[IssueBackend] = None) -> str:
|
|
"""Format a single issue with details."""
|
|
lines = []
|
|
|
|
# Header
|
|
lines.append(f"#{issue.number}: {issue.title}")
|
|
lines.append("=" * (len(f"#{issue.number}: {issue.title}")))
|
|
lines.append("")
|
|
|
|
# Basic info
|
|
lines.append(f"State: {issue.state.value}")
|
|
lines.append(f"Created: {format_datetime(issue.created_at)}")
|
|
lines.append(f"Updated: {format_datetime(issue.updated_at)}")
|
|
|
|
if issue.closed_at:
|
|
lines.append(f"Closed: {format_datetime(issue.closed_at)}")
|
|
|
|
# Assignees
|
|
if issue.assignees:
|
|
assignees_str = ", ".join(assignee.username for assignee in issue.assignees)
|
|
lines.append(f"Assignees: {assignees_str}")
|
|
else:
|
|
lines.append("Assignees: none")
|
|
|
|
# Milestone
|
|
if issue.milestone:
|
|
lines.append(f"Milestone: {issue.milestone.title}")
|
|
|
|
# Labels
|
|
if issue.labels:
|
|
labels_by_category = {}
|
|
for label in issue.labels:
|
|
category = label.category
|
|
if category not in labels_by_category:
|
|
labels_by_category[category] = []
|
|
labels_by_category[category].append(label.name)
|
|
|
|
for category, label_names in labels_by_category.items():
|
|
lines.append(f"{category.title()} labels: {', '.join(label_names)}")
|
|
else:
|
|
lines.append("Labels: none")
|
|
|
|
# Description
|
|
lines.append("")
|
|
lines.append("Description:")
|
|
lines.append("-" * 12)
|
|
if issue.description:
|
|
lines.append(issue.description)
|
|
else:
|
|
lines.append("(no description)")
|
|
|
|
# Comments
|
|
if show_comments and backend:
|
|
comments = backend.get_comments(issue.id)
|
|
if comments:
|
|
lines.append("")
|
|
lines.append(f"Comments ({len(comments)}):")
|
|
lines.append("-" * 20)
|
|
|
|
for comment in comments:
|
|
lines.append("")
|
|
lines.append(f"Comment by {comment.author.username} at {format_datetime(comment.created_at)}:")
|
|
lines.append(comment.body)
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def format_datetime(dt: datetime) -> str:
|
|
"""Format datetime for display."""
|
|
if dt.tzinfo:
|
|
dt = dt.astimezone()
|
|
return dt.strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
|
|
def get_user_input(prompt: str, default: str = "", multiline: bool = False) -> str:
|
|
"""Get user input with optional default and multiline support."""
|
|
if multiline:
|
|
click.echo(f"{prompt} (Press Ctrl+D when done, Ctrl+C to cancel):")
|
|
if default:
|
|
click.echo(f"Current value:\n{default}\n")
|
|
|
|
lines = []
|
|
try:
|
|
while True:
|
|
try:
|
|
line = input()
|
|
lines.append(line)
|
|
except EOFError:
|
|
break
|
|
except KeyboardInterrupt:
|
|
raise click.Abort()
|
|
|
|
return "\n".join(lines) if lines else default
|
|
else:
|
|
return click.prompt(prompt, default=default)
|
|
|
|
|
|
def validate_backend_type(backend_type: str) -> bool:
|
|
"""Validate that a backend type is supported."""
|
|
return backend_type in BackendFactory.get_available_backends()
|
|
|
|
|
|
def test_backend_connection(backend_config: dict) -> bool:
|
|
"""Test if a backend configuration works."""
|
|
try:
|
|
backend_type = backend_config['type']
|
|
backend = BackendFactory.create_backend(backend_type)
|
|
backend.connect(backend_config)
|
|
result = backend.test_connection()
|
|
backend.disconnect()
|
|
return result
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def format_backend_list(configs: dict) -> str:
|
|
"""Format backend configurations for display."""
|
|
if not configs:
|
|
return "No backends configured."
|
|
|
|
lines = []
|
|
default_backend = configs.get('default', 'local')
|
|
|
|
lines.append(f"{'Name':<15} {'Type':<10} {'Status':<10} Description")
|
|
lines.append("-" * 60)
|
|
|
|
for name, config in configs.items():
|
|
if name == 'default':
|
|
continue
|
|
|
|
backend_type = config.get('type', 'unknown')
|
|
is_default = name == default_backend
|
|
|
|
# Test connection
|
|
try:
|
|
status = "connected" if test_backend_connection(config) else "error"
|
|
except Exception:
|
|
status = "error"
|
|
|
|
# Description
|
|
if backend_type == 'local':
|
|
desc = f"Local SQLite: {config.get('db_path', 'unknown')}"
|
|
elif backend_type == 'gitea':
|
|
desc = f"Gitea: {config.get('base_url', 'unknown')}/{config.get('owner', 'unknown')}/{config.get('repo', 'unknown')}"
|
|
else:
|
|
desc = f"{backend_type} backend"
|
|
|
|
if is_default:
|
|
desc += " (default)"
|
|
|
|
line = f"{name:<15} {backend_type:<10} {status:<10} {desc}"
|
|
lines.append(line)
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def get_editor() -> str:
|
|
"""Get the user's preferred editor."""
|
|
return os.environ.get('EDITOR', 'nano')
|
|
|
|
|
|
def edit_text(initial_text: str = "") -> Optional[str]:
|
|
"""Open text in editor and return edited content."""
|
|
with tempfile.NamedTemporaryFile(mode='w+', suffix='.md', delete=False) as f:
|
|
f.write(initial_text)
|
|
temp_path = f.name
|
|
|
|
try:
|
|
editor = get_editor()
|
|
os.system(f'{editor} {temp_path}')
|
|
|
|
with open(temp_path, 'r') as f:
|
|
edited_text = f.read()
|
|
|
|
return edited_text if edited_text != initial_text else None
|
|
|
|
finally:
|
|
os.unlink(temp_path)
|
|
|
|
|
|
def confirm_action(message: str, default: bool = False) -> bool:
|
|
"""Ask user for confirmation."""
|
|
return click.confirm(message, default=default)
|
|
|
|
|
|
def progress_bar(items, label: str = "Processing"):
|
|
"""Create a progress bar for iterating over items."""
|
|
return click.progressbar(items, label=label)
|
|
|
|
|
|
def echo_success(message: str) -> None:
|
|
"""Echo success message in green."""
|
|
click.echo(click.style(message, fg='green'))
|
|
|
|
|
|
def echo_warning(message: str) -> None:
|
|
"""Echo warning message in yellow."""
|
|
click.echo(click.style(message, fg='yellow'))
|
|
|
|
|
|
def echo_error(message: str) -> None:
|
|
"""Echo error message in red."""
|
|
click.echo(click.style(message, fg='red'))
|
|
|
|
|
|
def echo_info(message: str) -> None:
|
|
"""Echo info message in blue."""
|
|
click.echo(click.style(message, fg='blue')) |