Files
tegwick 99ea1fbc45 chore(consistency): sync task status from DB [auto]
Updated by fix-consistency on 2026-05-17:
  - update .custodian-brief.md for issue-core
2026-05-17 05:06:10 +02:00

614 lines
24 KiB
Python

"""
Local SQLite Backend Implementation
Provides a complete local issue tracking backend using SQLite for storage.
This implementation serves as the reference for the backend interface and
provides full offline functionality.
"""
import sqlite3
import json
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import List, Optional, Dict, Any
from ...core.interfaces import LocalBackend, BackendCapabilities, IssueFilter, SyncableBackend
from ...core.models import Issue, Label, User, Milestone, Comment, IssueState, Priority, IssueType
class LocalSQLiteBackend(LocalBackend, SyncableBackend):
"""SQLite-based local backend for issue tracking."""
def __init__(self, db_path: Optional[str] = None):
self.db_path = db_path or "issues.db"
self.connection: Optional[sqlite3.Connection] = None
self._capabilities = BackendCapabilities(
supports_milestones=True,
supports_assignees=True,
supports_comments=True,
supports_labels=True,
supports_search=True,
supports_bulk_operations=True,
supports_webhooks=False,
supports_real_time=False,
max_labels_per_issue=None,
max_assignees_per_issue=None
)
@property
def backend_type(self) -> str:
return "local"
@property
def capabilities(self) -> BackendCapabilities:
return self._capabilities
def connect(self, config: Dict[str, Any]) -> None:
"""Connect to SQLite database."""
db_path = config.get('db_path', self.db_path)
self.db_path = db_path
# Ensure directory exists
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
self.connection = sqlite3.connect(db_path)
self.connection.row_factory = sqlite3.Row # Enable dict-like access
self.connection.execute("PRAGMA foreign_keys = ON")
# Initialize schema
self._initialize_schema()
def disconnect(self) -> None:
"""Disconnect from database."""
if self.connection:
self.connection.close()
self.connection = None
def test_connection(self) -> bool:
"""Test database connection."""
if not self.connection:
return False
try:
self.connection.execute("SELECT 1")
return True
except sqlite3.Error:
return False
def _initialize_schema(self) -> None:
"""Initialize database schema."""
schema_path = Path(__file__).parent / "schema.sql"
with open(schema_path, 'r') as f:
schema_sql = f.read()
# Use executescript to handle multi-line statements (triggers, views, etc.)
self.connection.executescript(schema_sql)
def _get_next_issue_number(self) -> int:
"""Get the next available issue number."""
cursor = self.connection.execute("SELECT MAX(number) FROM issues")
result = cursor.fetchone()
return (result[0] or 0) + 1
def _issue_from_row(self, row: sqlite3.Row) -> Issue:
"""Convert database row to Issue object."""
# Get labels
cursor = self.connection.execute("""
SELECT l.id, l.name, l.color, l.description, l.backend_id
FROM labels l
JOIN issue_labels il ON l.id = il.label_id
WHERE il.issue_id = ?
""", (row['id'],))
label_rows = cursor.fetchall()
labels = [Label(
name=lr['name'],
color=lr['color'],
description=lr['description'],
backend_id=lr['backend_id']
) for lr in label_rows]
# Get assignees
cursor = self.connection.execute("""
SELECT u.id, u.username, u.display_name, u.email, u.avatar_url, u.backend_id
FROM users u
JOIN issue_assignees ia ON u.id = ia.user_id
WHERE ia.issue_id = ?
""", (row['id'],))
user_rows = cursor.fetchall()
assignees = [User(
id=ur['id'],
username=ur['username'],
display_name=ur['display_name'],
email=ur['email'],
avatar_url=ur['avatar_url'],
backend_id=ur['backend_id']
) for ur in user_rows]
# Get milestone
milestone = None
if row['milestone_id']:
cursor = self.connection.execute("""
SELECT id, title, description, state, due_date, created_at, updated_at, backend_id
FROM milestones WHERE id = ?
""", (row['milestone_id'],))
m_row = cursor.fetchone()
if m_row:
milestone = Milestone(
id=m_row['id'],
title=m_row['title'],
description=m_row['description'],
state=m_row['state'],
due_date=datetime.fromisoformat(m_row['due_date']) if m_row['due_date'] else None,
created_at=datetime.fromisoformat(m_row['created_at']) if m_row['created_at'] else None,
updated_at=datetime.fromisoformat(m_row['updated_at']) if m_row['updated_at'] else None,
backend_id=m_row['backend_id']
)
# Parse sync metadata
sync_metadata = {}
if row['sync_metadata']:
try:
sync_metadata = json.loads(row['sync_metadata'])
except json.JSONDecodeError:
pass
return Issue(
id=row['id'],
number=row['number'],
title=row['title'],
description=row['description'],
state=IssueState.from_string(row['state']),
created_at=datetime.fromisoformat(row['created_at']),
updated_at=datetime.fromisoformat(row['updated_at']),
closed_at=datetime.fromisoformat(row['closed_at']) if row['closed_at'] else None,
labels=labels,
assignees=assignees,
milestone=milestone,
backend_id=row['backend_id'],
backend_type=row['backend_type'],
sync_metadata=sync_metadata
)
# Issue CRUD Operations
def create_issue(self, issue: Issue) -> Issue:
"""Create a new issue."""
if not issue.id:
issue.id = str(uuid.uuid4())
if not issue.number:
issue.number = self._get_next_issue_number()
# Insert issue
self.connection.execute("""
INSERT INTO issues (id, number, title, description, state, created_at, updated_at,
closed_at, milestone_id, backend_id, backend_type, sync_metadata)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
issue.id,
issue.number,
issue.title,
issue.description,
issue.state.value,
issue.created_at.isoformat(),
issue.updated_at.isoformat(),
issue.closed_at.isoformat() if issue.closed_at else None,
issue.milestone.id if issue.milestone else None,
issue.backend_id,
issue.backend_type or 'local',
json.dumps(issue.sync_metadata) if issue.sync_metadata else None
))
# Add labels
for label in issue.labels:
self._ensure_label_exists(label)
self.connection.execute("""
INSERT OR IGNORE INTO issue_labels (issue_id, label_id)
VALUES (?, ?)
""", (issue.id, label.name)) # Using name as ID for simplicity
# Add assignees
for user in issue.assignees:
self._ensure_user_exists(user)
self.connection.execute("""
INSERT OR IGNORE INTO issue_assignees (issue_id, user_id)
VALUES (?, ?)
""", (issue.id, user.id))
self.connection.commit()
return issue
def get_issue(self, issue_id: str) -> Optional[Issue]:
"""Get issue by ID."""
cursor = self.connection.execute("""
SELECT * FROM issues WHERE id = ? OR backend_id = ?
""", (issue_id, issue_id))
row = cursor.fetchone()
return self._issue_from_row(row) if row else None
def get_issue_by_number(self, number: int) -> Optional[Issue]:
"""Get issue by number."""
cursor = self.connection.execute("""
SELECT * FROM issues WHERE number = ?
""", (number,))
row = cursor.fetchone()
return self._issue_from_row(row) if row else None
def update_issue(self, issue: Issue) -> Issue:
"""Update existing issue."""
# Update main issue record
self.connection.execute("""
UPDATE issues SET
title = ?, description = ?, state = ?, updated_at = ?,
closed_at = ?, milestone_id = ?, sync_metadata = ?
WHERE id = ?
""", (
issue.title,
issue.description,
issue.state.value,
issue.updated_at.isoformat(),
issue.closed_at.isoformat() if issue.closed_at else None,
issue.milestone.id if issue.milestone else None,
json.dumps(issue.sync_metadata) if issue.sync_metadata else None,
issue.id
))
# Update labels (remove all and re-add)
self.connection.execute("DELETE FROM issue_labels WHERE issue_id = ?", (issue.id,))
for label in issue.labels:
self._ensure_label_exists(label)
self.connection.execute("""
INSERT INTO issue_labels (issue_id, label_id) VALUES (?, ?)
""", (issue.id, label.name))
# Update assignees (remove all and re-add)
self.connection.execute("DELETE FROM issue_assignees WHERE issue_id = ?", (issue.id,))
for user in issue.assignees:
self._ensure_user_exists(user)
self.connection.execute("""
INSERT INTO issue_assignees (issue_id, user_id) VALUES (?, ?)
""", (issue.id, user.id))
self.connection.commit()
return issue
def delete_issue(self, issue_id: str) -> bool:
"""Delete issue."""
cursor = self.connection.execute("DELETE FROM issues WHERE id = ?", (issue_id,))
self.connection.commit()
return cursor.rowcount > 0
def list_issues(self, filter_criteria: Optional[IssueFilter] = None) -> List[Issue]:
"""List issues with optional filtering."""
query = "SELECT * FROM issues WHERE 1=1"
params = []
if filter_criteria:
if filter_criteria.state:
query += " AND state = ?"
params.append(filter_criteria.state)
if filter_criteria.search:
query += " AND (title LIKE ? OR description LIKE ?)"
search_term = f"%{filter_criteria.search}%"
params.extend([search_term, search_term])
if filter_criteria.created_after:
query += " AND created_at >= ?"
params.append(filter_criteria.created_after.isoformat())
if filter_criteria.created_before:
query += " AND created_at <= ?"
params.append(filter_criteria.created_before.isoformat())
if filter_criteria.updated_after:
query += " AND updated_at >= ?"
params.append(filter_criteria.updated_after.isoformat())
if filter_criteria.updated_before:
query += " AND updated_at <= ?"
params.append(filter_criteria.updated_before.isoformat())
query += " ORDER BY updated_at DESC"
if filter_criteria and filter_criteria.limit:
query += " LIMIT ?"
params.append(filter_criteria.limit)
if filter_criteria.offset:
query += " OFFSET ?"
params.append(filter_criteria.offset)
cursor = self.connection.execute(query, params)
rows = cursor.fetchall()
return [self._issue_from_row(row) for row in rows]
def search_issues(self, query: str, limit: Optional[int] = None) -> List[Issue]:
"""Search issues using FTS if available, otherwise fallback to LIKE."""
try:
# Try FTS search first
fts_query = """
SELECT i.* FROM issues i
JOIN issue_search s ON i.id = s.issue_id
WHERE issue_search MATCH ?
ORDER BY rank
"""
params = [query]
if limit:
fts_query += " LIMIT ?"
params.append(limit)
cursor = self.connection.execute(fts_query, params)
rows = cursor.fetchall()
return [self._issue_from_row(row) for row in rows]
except sqlite3.OperationalError:
# Fallback to LIKE search
filter_criteria = IssueFilter(search=query, limit=limit)
return self.list_issues(filter_criteria)
# Helper methods
def _ensure_label_exists(self, label: Label) -> None:
"""Ensure label exists in database."""
self.connection.execute("""
INSERT OR IGNORE INTO labels (id, name, color, description, backend_id)
VALUES (?, ?, ?, ?, ?)
""", (label.name, label.name, label.color, label.description, label.backend_id))
def _ensure_user_exists(self, user: User) -> None:
"""Ensure user exists in database."""
self.connection.execute("""
INSERT OR IGNORE INTO users (id, username, display_name, email, avatar_url, backend_id)
VALUES (?, ?, ?, ?, ?, ?)
""", (user.id, user.username, user.display_name, user.email, user.avatar_url, user.backend_id))
# Label Operations
def create_label(self, label: Label) -> Label:
"""Create a new label."""
label_id = label.name # Use name as ID
self.connection.execute("""
INSERT INTO labels (id, name, color, description, backend_id)
VALUES (?, ?, ?, ?, ?)
""", (label_id, label.name, label.color, label.description, label.backend_id))
self.connection.commit()
return label
def get_labels(self) -> List[Label]:
"""Get all labels."""
cursor = self.connection.execute("SELECT * FROM labels ORDER BY name")
rows = cursor.fetchall()
return [Label(
name=row['name'],
color=row['color'],
description=row['description'],
backend_id=row['backend_id']
) for row in rows]
def update_label(self, label: Label) -> Label:
"""Update label."""
self.connection.execute("""
UPDATE labels SET color = ?, description = ? WHERE name = ?
""", (label.color, label.description, label.name))
self.connection.commit()
return label
def delete_label(self, label_name: str) -> bool:
"""Delete label."""
cursor = self.connection.execute("DELETE FROM labels WHERE name = ?", (label_name,))
self.connection.commit()
return cursor.rowcount > 0
# User Operations
def get_users(self) -> List[User]:
"""Get all users."""
cursor = self.connection.execute("SELECT * FROM users ORDER BY username")
rows = cursor.fetchall()
return [User(
id=row['id'],
username=row['username'],
display_name=row['display_name'],
email=row['email'],
avatar_url=row['avatar_url'],
backend_id=row['backend_id']
) for row in rows]
def get_user(self, user_id: str) -> Optional[User]:
"""Get user by ID."""
cursor = self.connection.execute("SELECT * FROM users WHERE id = ?", (user_id,))
row = cursor.fetchone()
if row:
return User(
id=row['id'],
username=row['username'],
display_name=row['display_name'],
email=row['email'],
avatar_url=row['avatar_url'],
backend_id=row['backend_id']
)
return None
def search_users(self, query: str) -> List[User]:
"""Search users."""
cursor = self.connection.execute("""
SELECT * FROM users
WHERE username LIKE ? OR display_name LIKE ? OR email LIKE ?
ORDER BY username
""", (f"%{query}%", f"%{query}%", f"%{query}%"))
rows = cursor.fetchall()
return [User(
id=row['id'],
username=row['username'],
display_name=row['display_name'],
email=row['email'],
avatar_url=row['avatar_url'],
backend_id=row['backend_id']
) for row in rows]
# Milestone Operations
def create_milestone(self, milestone: Milestone) -> Milestone:
"""Create milestone."""
if not milestone.id:
milestone.id = str(uuid.uuid4())
self.connection.execute("""
INSERT INTO milestones (id, title, description, state, due_date, created_at, updated_at, backend_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
milestone.id,
milestone.title,
milestone.description,
milestone.state,
milestone.due_date.isoformat() if milestone.due_date else None,
milestone.created_at.isoformat() if milestone.created_at else datetime.now(timezone.utc).isoformat(),
milestone.updated_at.isoformat() if milestone.updated_at else datetime.now(timezone.utc).isoformat(),
milestone.backend_id
))
self.connection.commit()
return milestone
def get_milestones(self) -> List[Milestone]:
"""Get all milestones."""
cursor = self.connection.execute("SELECT * FROM milestones ORDER BY title")
rows = cursor.fetchall()
return [Milestone(
id=row['id'],
title=row['title'],
description=row['description'],
state=row['state'],
due_date=datetime.fromisoformat(row['due_date']) if row['due_date'] else None,
created_at=datetime.fromisoformat(row['created_at']) if row['created_at'] else None,
updated_at=datetime.fromisoformat(row['updated_at']) if row['updated_at'] else None,
backend_id=row['backend_id']
) for row in rows]
def update_milestone(self, milestone: Milestone) -> Milestone:
"""Update milestone."""
self.connection.execute("""
UPDATE milestones SET title = ?, description = ?, state = ?, due_date = ?, updated_at = ?
WHERE id = ?
""", (
milestone.title,
milestone.description,
milestone.state,
milestone.due_date.isoformat() if milestone.due_date else None,
datetime.now(timezone.utc).isoformat(),
milestone.id
))
self.connection.commit()
return milestone
def delete_milestone(self, milestone_id: str) -> bool:
"""Delete milestone."""
cursor = self.connection.execute("DELETE FROM milestones WHERE id = ?", (milestone_id,))
self.connection.commit()
return cursor.rowcount > 0
# Comment Operations
def add_comment(self, issue_id: str, comment: Comment) -> Comment:
"""Add comment to issue."""
if not comment.id:
comment.id = str(uuid.uuid4())
self._ensure_user_exists(comment.author)
self.connection.execute("""
INSERT INTO comments (id, issue_id, author_id, body, created_at, updated_at, backend_id)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
comment.id,
issue_id,
comment.author.id,
comment.body,
comment.created_at.isoformat(),
comment.updated_at.isoformat() if comment.updated_at else None,
comment.backend_id
))
self.connection.commit()
return comment
def get_comments(self, issue_id: str) -> List[Comment]:
"""Get comments for issue."""
cursor = self.connection.execute("""
SELECT c.*, u.id as user_id, u.username, u.display_name, u.email, u.avatar_url, u.backend_id as user_backend_id
FROM comments c
JOIN users u ON c.author_id = u.id
WHERE c.issue_id = ?
ORDER BY c.created_at
""", (issue_id,))
rows = cursor.fetchall()
comments = []
for row in rows:
author = User(
id=row['user_id'],
username=row['username'],
display_name=row['display_name'],
email=row['email'],
avatar_url=row['avatar_url'],
backend_id=row['user_backend_id']
)
comment = Comment(
id=row['id'],
body=row['body'],
author=author,
created_at=datetime.fromisoformat(row['created_at']),
updated_at=datetime.fromisoformat(row['updated_at']) if row['updated_at'] else None,
backend_id=row['backend_id']
)
comments.append(comment)
return comments
def update_comment(self, comment: Comment) -> Comment:
"""Update comment."""
self.connection.execute("""
UPDATE comments SET body = ?, updated_at = ? WHERE id = ?
""", (comment.body, datetime.now(timezone.utc).isoformat(), comment.id))
self.connection.commit()
return comment
def delete_comment(self, comment_id: str) -> bool:
"""Delete comment."""
cursor = self.connection.execute("DELETE FROM comments WHERE id = ?", (comment_id,))
self.connection.commit()
return cursor.rowcount > 0
# Sync Support
def get_last_sync_timestamp(self) -> Optional[datetime]:
"""Get last sync timestamp."""
cursor = self.connection.execute("""
SELECT sync_timestamp FROM sync_history
WHERE success = 1
ORDER BY sync_timestamp DESC
LIMIT 1
""")
row = cursor.fetchone()
return datetime.fromisoformat(row[0]) if row else None
def get_issues_modified_since(self, timestamp: datetime) -> List[Issue]:
"""Get issues modified since timestamp."""
filter_criteria = IssueFilter(updated_after=timestamp)
return self.list_issues(filter_criteria)
# SyncableBackend Implementation
def prepare_for_sync(self) -> None:
"""Prepare for sync operation."""
# Could create backup or start transaction
pass
def finalize_sync(self, success: bool) -> None:
"""Finalize sync operation."""
# Log sync operation
self.connection.execute("""
INSERT INTO sync_history (backend_type, success, sync_timestamp)
VALUES (?, ?, ?)
""", ('sync', success, datetime.now(timezone.utc).isoformat()))
self.connection.commit()
def get_sync_conflicts(self) -> List[Dict[str, Any]]:
"""Get sync conflicts."""
# For local backend, no conflicts since it's the source of truth
return []
def resolve_sync_conflict(self, issue_id: str, resolution: str) -> Issue:
"""Resolve sync conflict."""
# Local backend doesn't have conflicts
return self.get_issue(issue_id)