generated from coulomb/repo-seed
Renames the package, distribution, CLI alias, Makefile targets, and working directory from issue-facade to issue-core, signalling its role as the authoritative task lifecycle manager for the Coulomb org (peer to activity-core, rules-core, project-core). Adds POST /issues/ ingestion endpoint for activity-core's IssueSink, under a new optional [api] extra. The endpoint is served by `issue serve`, authenticates via the ISSUE_CORE_API_KEY env var (Bearer or X-API-Key header), and routes the TaskSpec payload to the configured default backend with full traceability metadata embedded in sync_metadata. - T01: Python package issue_tracker -> issue_core, dir rename - T02: registered in state hub under custodian domain - T03: INTENT.md (what it is, what it isn't, how it fits) - T04: SCOPE.md (in/out-of-scope, integration boundaries) - T05: POST /issues/ via FastAPI + Uvicorn, 9 unit tests - T06: docs/nats-task-ingestion.md design stub Closes ISSC-WP-0001. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
752 lines
26 KiB
Python
752 lines
26 KiB
Python
"""
|
|
Test suite for Local SQLite Backend functionality.
|
|
|
|
These tests ensure the local backend correctly implements the IssueBackend interface
|
|
and provides reliable offline issue tracking.
|
|
"""
|
|
|
|
import pytest
|
|
import tempfile
|
|
from datetime import datetime, timezone, timedelta
|
|
from pathlib import Path
|
|
|
|
from issue_core.backends.local.backend import LocalSQLiteBackend
|
|
from issue_core.core.models import Issue, Label, User, Milestone, Comment, IssueState
|
|
from issue_core.core.interfaces import IssueFilter
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestLocalBackendInitialization:
|
|
"""Test local backend initialization and connection."""
|
|
|
|
def test_backend_initialization(self):
|
|
"""Test backend initializes with correct type and capabilities."""
|
|
backend = LocalSQLiteBackend()
|
|
|
|
assert backend.backend_type == "local"
|
|
assert backend.capabilities.supports_milestones
|
|
assert backend.capabilities.supports_assignees
|
|
assert backend.capabilities.supports_comments
|
|
assert backend.capabilities.supports_labels
|
|
assert backend.capabilities.supports_search
|
|
assert backend.capabilities.supports_bulk_operations
|
|
assert not backend.capabilities.supports_webhooks
|
|
assert not backend.capabilities.supports_real_time
|
|
|
|
def test_connect_creates_database(self):
|
|
"""Test connect creates database file and schema."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
db_path = Path(tmpdir) / "test.db"
|
|
backend = LocalSQLiteBackend()
|
|
|
|
backend.connect({'db_path': str(db_path)})
|
|
|
|
assert db_path.exists()
|
|
assert backend.connection is not None
|
|
assert backend.test_connection()
|
|
|
|
backend.disconnect()
|
|
|
|
def test_disconnect_closes_connection(self):
|
|
"""Test disconnect properly closes database connection."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
backend = LocalSQLiteBackend()
|
|
backend.connect({'db_path': str(Path(tmpdir) / "test.db")})
|
|
|
|
backend.disconnect()
|
|
|
|
assert backend.connection is None
|
|
assert not backend.test_connection()
|
|
|
|
def test_schema_initialization(self):
|
|
"""Test database schema is properly initialized."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
backend = LocalSQLiteBackend()
|
|
backend.connect({'db_path': str(Path(tmpdir) / "test.db")})
|
|
|
|
# Verify tables exist
|
|
cursor = backend.connection.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table'"
|
|
)
|
|
tables = {row[0] for row in cursor.fetchall()}
|
|
|
|
expected_tables = {
|
|
'issues', 'labels', 'users', 'milestones', 'comments',
|
|
'issue_labels', 'issue_assignees', 'sync_history'
|
|
}
|
|
|
|
assert expected_tables.issubset(tables)
|
|
|
|
backend.disconnect()
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestIssueCRUD:
|
|
"""Test issue CRUD operations."""
|
|
|
|
@pytest.fixture
|
|
def backend(self):
|
|
"""Create a temporary backend for each test."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
backend = LocalSQLiteBackend()
|
|
backend.connect({'db_path': str(Path(tmpdir) / "test.db")})
|
|
yield backend
|
|
backend.disconnect()
|
|
|
|
def test_create_issue(self, backend):
|
|
"""Test creating a new issue."""
|
|
issue = Issue(
|
|
id=None,
|
|
number=0,
|
|
title="Test Issue",
|
|
description="Test description",
|
|
state=IssueState.OPEN,
|
|
created_at=datetime.now(timezone.utc),
|
|
updated_at=datetime.now(timezone.utc)
|
|
)
|
|
|
|
created = backend.create_issue(issue)
|
|
|
|
assert created.id is not None
|
|
assert created.number == 1
|
|
assert created.title == "Test Issue"
|
|
assert created.description == "Test description"
|
|
assert created.state == IssueState.OPEN
|
|
|
|
def test_create_multiple_issues_increments_numbers(self, backend):
|
|
"""Test issue numbers increment correctly."""
|
|
issue1 = Issue(
|
|
id=None, number=0, title="Issue 1", description="Desc 1",
|
|
state=IssueState.OPEN,
|
|
created_at=datetime.now(timezone.utc),
|
|
updated_at=datetime.now(timezone.utc)
|
|
)
|
|
issue2 = Issue(
|
|
id=None, number=0, title="Issue 2", description="Desc 2",
|
|
state=IssueState.OPEN,
|
|
created_at=datetime.now(timezone.utc),
|
|
updated_at=datetime.now(timezone.utc)
|
|
)
|
|
|
|
created1 = backend.create_issue(issue1)
|
|
created2 = backend.create_issue(issue2)
|
|
|
|
assert created1.number == 1
|
|
assert created2.number == 2
|
|
|
|
def test_get_issue_by_id(self, backend):
|
|
"""Test retrieving issue by ID."""
|
|
issue = Issue(
|
|
id=None, number=0, title="Test", description="Desc",
|
|
state=IssueState.OPEN,
|
|
created_at=datetime.now(timezone.utc),
|
|
updated_at=datetime.now(timezone.utc)
|
|
)
|
|
created = backend.create_issue(issue)
|
|
|
|
retrieved = backend.get_issue(created.id)
|
|
|
|
assert retrieved is not None
|
|
assert retrieved.id == created.id
|
|
assert retrieved.title == "Test"
|
|
|
|
def test_get_issue_by_number(self, backend):
|
|
"""Test retrieving issue by number."""
|
|
issue = Issue(
|
|
id=None, number=0, title="Test", description="Desc",
|
|
state=IssueState.OPEN,
|
|
created_at=datetime.now(timezone.utc),
|
|
updated_at=datetime.now(timezone.utc)
|
|
)
|
|
created = backend.create_issue(issue)
|
|
|
|
retrieved = backend.get_issue_by_number(created.number)
|
|
|
|
assert retrieved is not None
|
|
assert retrieved.number == created.number
|
|
assert retrieved.title == "Test"
|
|
|
|
def test_get_nonexistent_issue_returns_none(self, backend):
|
|
"""Test getting non-existent issue returns None."""
|
|
assert backend.get_issue("nonexistent-id") is None
|
|
assert backend.get_issue_by_number(999) is None
|
|
|
|
def test_update_issue(self, backend):
|
|
"""Test updating an existing issue."""
|
|
issue = Issue(
|
|
id=None, number=0, title="Original", description="Original desc",
|
|
state=IssueState.OPEN,
|
|
created_at=datetime.now(timezone.utc),
|
|
updated_at=datetime.now(timezone.utc)
|
|
)
|
|
created = backend.create_issue(issue)
|
|
|
|
created.title = "Updated"
|
|
created.description = "Updated desc"
|
|
created.state = IssueState.CLOSED
|
|
created.closed_at = datetime.now(timezone.utc)
|
|
|
|
updated = backend.update_issue(created)
|
|
|
|
assert updated.title == "Updated"
|
|
assert updated.description == "Updated desc"
|
|
assert updated.state == IssueState.CLOSED
|
|
|
|
# Verify changes persisted
|
|
retrieved = backend.get_issue(created.id)
|
|
assert retrieved.title == "Updated"
|
|
|
|
def test_delete_issue(self, backend):
|
|
"""Test deleting an issue."""
|
|
issue = Issue(
|
|
id=None, number=0, title="To Delete", description="Desc",
|
|
state=IssueState.OPEN,
|
|
created_at=datetime.now(timezone.utc),
|
|
updated_at=datetime.now(timezone.utc)
|
|
)
|
|
created = backend.create_issue(issue)
|
|
|
|
result = backend.delete_issue(created.id)
|
|
|
|
assert result is True
|
|
assert backend.get_issue(created.id) is None
|
|
|
|
def test_delete_nonexistent_issue_returns_false(self, backend):
|
|
"""Test deleting non-existent issue returns False."""
|
|
result = backend.delete_issue("nonexistent-id")
|
|
assert result is False
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestIssueWithLabels:
|
|
"""Test issue operations with labels."""
|
|
|
|
@pytest.fixture
|
|
def backend(self):
|
|
"""Create a temporary backend for each test."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
backend = LocalSQLiteBackend()
|
|
backend.connect({'db_path': str(Path(tmpdir) / "test.db")})
|
|
yield backend
|
|
backend.disconnect()
|
|
|
|
def test_create_issue_with_labels(self, backend):
|
|
"""Test creating issue with labels."""
|
|
labels = [
|
|
Label(name="bug", color="red", description="Bug reports"),
|
|
Label(name="priority:high", color="orange")
|
|
]
|
|
|
|
issue = Issue(
|
|
id=None, number=0, title="Bug Report", description="Desc",
|
|
state=IssueState.OPEN,
|
|
created_at=datetime.now(timezone.utc),
|
|
updated_at=datetime.now(timezone.utc),
|
|
labels=labels
|
|
)
|
|
|
|
created = backend.create_issue(issue)
|
|
retrieved = backend.get_issue(created.id)
|
|
|
|
assert len(retrieved.labels) == 2
|
|
assert any(l.name == "bug" for l in retrieved.labels)
|
|
assert any(l.name == "priority:high" for l in retrieved.labels)
|
|
|
|
def test_update_issue_labels(self, backend):
|
|
"""Test updating issue labels."""
|
|
issue = Issue(
|
|
id=None, number=0, title="Test", description="Desc",
|
|
state=IssueState.OPEN,
|
|
created_at=datetime.now(timezone.utc),
|
|
updated_at=datetime.now(timezone.utc),
|
|
labels=[Label(name="bug")]
|
|
)
|
|
created = backend.create_issue(issue)
|
|
|
|
# Update labels
|
|
created.labels = [
|
|
Label(name="bug"),
|
|
Label(name="enhancement"),
|
|
Label(name="priority:low")
|
|
]
|
|
backend.update_issue(created)
|
|
|
|
retrieved = backend.get_issue(created.id)
|
|
assert len(retrieved.labels) == 3
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestIssueWithAssignees:
|
|
"""Test issue operations with assignees."""
|
|
|
|
@pytest.fixture
|
|
def backend(self):
|
|
"""Create a temporary backend for each test."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
backend = LocalSQLiteBackend()
|
|
backend.connect({'db_path': str(Path(tmpdir) / "test.db")})
|
|
yield backend
|
|
backend.disconnect()
|
|
|
|
def test_create_issue_with_assignees(self, backend):
|
|
"""Test creating issue with assignees."""
|
|
assignees = [
|
|
User(id="user1", username="alice", display_name="Alice"),
|
|
User(id="user2", username="bob", display_name="Bob")
|
|
]
|
|
|
|
issue = Issue(
|
|
id=None, number=0, title="Assigned Issue", description="Desc",
|
|
state=IssueState.OPEN,
|
|
created_at=datetime.now(timezone.utc),
|
|
updated_at=datetime.now(timezone.utc),
|
|
assignees=assignees
|
|
)
|
|
|
|
created = backend.create_issue(issue)
|
|
retrieved = backend.get_issue(created.id)
|
|
|
|
assert len(retrieved.assignees) == 2
|
|
assert any(u.username == "alice" for u in retrieved.assignees)
|
|
assert any(u.username == "bob" for u in retrieved.assignees)
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestIssueWithMilestone:
|
|
"""Test issue operations with milestones."""
|
|
|
|
@pytest.fixture
|
|
def backend(self):
|
|
"""Create a temporary backend for each test."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
backend = LocalSQLiteBackend()
|
|
backend.connect({'db_path': str(Path(tmpdir) / "test.db")})
|
|
yield backend
|
|
backend.disconnect()
|
|
|
|
def test_create_issue_with_milestone(self, backend):
|
|
"""Test creating issue with milestone."""
|
|
milestone = Milestone(
|
|
id=None,
|
|
title="v1.0",
|
|
description="First release",
|
|
state="open",
|
|
due_date=datetime.now(timezone.utc) + timedelta(days=30)
|
|
)
|
|
created_milestone = backend.create_milestone(milestone)
|
|
|
|
issue = Issue(
|
|
id=None, number=0, title="Feature", description="Desc",
|
|
state=IssueState.OPEN,
|
|
created_at=datetime.now(timezone.utc),
|
|
updated_at=datetime.now(timezone.utc),
|
|
milestone=created_milestone
|
|
)
|
|
|
|
created_issue = backend.create_issue(issue)
|
|
retrieved = backend.get_issue(created_issue.id)
|
|
|
|
assert retrieved.milestone is not None
|
|
assert retrieved.milestone.title == "v1.0"
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestListAndFilter:
|
|
"""Test listing and filtering issues."""
|
|
|
|
@pytest.fixture
|
|
def backend(self):
|
|
"""Create backend with sample data."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
backend = LocalSQLiteBackend()
|
|
backend.connect({'db_path': str(Path(tmpdir) / "test.db")})
|
|
|
|
# Create sample issues
|
|
backend.create_issue(Issue(
|
|
id=None, number=0, title="Open Bug", description="Bug desc",
|
|
state=IssueState.OPEN,
|
|
created_at=datetime.now(timezone.utc),
|
|
updated_at=datetime.now(timezone.utc),
|
|
labels=[Label(name="bug")]
|
|
))
|
|
|
|
backend.create_issue(Issue(
|
|
id=None, number=0, title="Closed Feature", description="Feature desc",
|
|
state=IssueState.CLOSED,
|
|
created_at=datetime.now(timezone.utc),
|
|
updated_at=datetime.now(timezone.utc),
|
|
closed_at=datetime.now(timezone.utc),
|
|
labels=[Label(name="enhancement")]
|
|
))
|
|
|
|
backend.create_issue(Issue(
|
|
id=None, number=0, title="In Progress Task", description="Task desc",
|
|
state=IssueState.IN_PROGRESS,
|
|
created_at=datetime.now(timezone.utc),
|
|
updated_at=datetime.now(timezone.utc)
|
|
))
|
|
|
|
yield backend
|
|
backend.disconnect()
|
|
|
|
def test_list_all_issues(self, backend):
|
|
"""Test listing all issues."""
|
|
issues = backend.list_issues()
|
|
assert len(issues) == 3
|
|
|
|
def test_filter_by_state_open(self, backend):
|
|
"""Test filtering by open state."""
|
|
filter_criteria = IssueFilter(state="open")
|
|
issues = backend.list_issues(filter_criteria)
|
|
assert len(issues) == 1
|
|
assert issues[0].state == IssueState.OPEN
|
|
|
|
def test_filter_by_state_closed(self, backend):
|
|
"""Test filtering by closed state."""
|
|
filter_criteria = IssueFilter(state="closed")
|
|
issues = backend.list_issues(filter_criteria)
|
|
assert len(issues) == 1
|
|
assert issues[0].state == IssueState.CLOSED
|
|
|
|
def test_search_issues(self, backend):
|
|
"""Test searching issues by text."""
|
|
filter_criteria = IssueFilter(search="Bug")
|
|
issues = backend.list_issues(filter_criteria)
|
|
assert len(issues) == 1
|
|
assert "Bug" in issues[0].title
|
|
|
|
def test_search_issues_method(self, backend):
|
|
"""Test search_issues method."""
|
|
issues = backend.search_issues("Feature")
|
|
assert len(issues) == 1
|
|
assert "Feature" in issues[0].title
|
|
|
|
def test_filter_with_limit(self, backend):
|
|
"""Test filtering with limit."""
|
|
filter_criteria = IssueFilter(limit=2)
|
|
issues = backend.list_issues(filter_criteria)
|
|
assert len(issues) == 2
|
|
|
|
def test_filter_with_offset(self, backend):
|
|
"""Test filtering with offset and limit."""
|
|
filter_criteria = IssueFilter(limit=2, offset=1)
|
|
issues = backend.list_issues(filter_criteria)
|
|
assert len(issues) == 2
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestLabelOperations:
|
|
"""Test label management operations."""
|
|
|
|
@pytest.fixture
|
|
def backend(self):
|
|
"""Create a temporary backend for each test."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
backend = LocalSQLiteBackend()
|
|
backend.connect({'db_path': str(Path(tmpdir) / "test.db")})
|
|
yield backend
|
|
backend.disconnect()
|
|
|
|
def test_create_label(self, backend):
|
|
"""Test creating a label."""
|
|
label = Label(name="bug", color="red", description="Bug reports")
|
|
created = backend.create_label(label)
|
|
|
|
assert created.name == "bug"
|
|
assert created.color == "red"
|
|
|
|
def test_get_labels(self, backend):
|
|
"""Test getting all labels."""
|
|
backend.create_label(Label(name="bug", color="red"))
|
|
backend.create_label(Label(name="enhancement", color="blue"))
|
|
|
|
labels = backend.get_labels()
|
|
assert len(labels) == 2
|
|
|
|
def test_update_label(self, backend):
|
|
"""Test updating a label."""
|
|
label = Label(name="bug", color="red", description="Old desc")
|
|
backend.create_label(label)
|
|
|
|
# Create new label with updated values (Label is frozen/immutable)
|
|
updated_label = Label(name="bug", color="orange", description="New desc")
|
|
backend.update_label(updated_label)
|
|
|
|
labels = backend.get_labels()
|
|
bug_label = next(l for l in labels if l.name == "bug")
|
|
assert bug_label.color == "orange"
|
|
assert bug_label.description == "New desc"
|
|
|
|
def test_delete_label(self, backend):
|
|
"""Test deleting a label."""
|
|
backend.create_label(Label(name="bug", color="red"))
|
|
|
|
result = backend.delete_label("bug")
|
|
assert result is True
|
|
|
|
labels = backend.get_labels()
|
|
assert len(labels) == 0
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestUserOperations:
|
|
"""Test user management operations."""
|
|
|
|
@pytest.fixture
|
|
def backend(self):
|
|
"""Create a temporary backend for each test."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
backend = LocalSQLiteBackend()
|
|
backend.connect({'db_path': str(Path(tmpdir) / "test.db")})
|
|
|
|
# Create test users
|
|
backend.connection.execute("""
|
|
INSERT INTO users (id, username, display_name, email)
|
|
VALUES (?, ?, ?, ?)
|
|
""", ("user1", "alice", "Alice Smith", "alice@example.com"))
|
|
backend.connection.execute("""
|
|
INSERT INTO users (id, username, display_name, email)
|
|
VALUES (?, ?, ?, ?)
|
|
""", ("user2", "bob", "Bob Jones", "bob@example.com"))
|
|
backend.connection.commit()
|
|
|
|
yield backend
|
|
backend.disconnect()
|
|
|
|
def test_get_users(self, backend):
|
|
"""Test getting all users."""
|
|
users = backend.get_users()
|
|
assert len(users) == 2
|
|
|
|
def test_get_user_by_id(self, backend):
|
|
"""Test getting user by ID."""
|
|
user = backend.get_user("user1")
|
|
assert user is not None
|
|
assert user.username == "alice"
|
|
|
|
def test_search_users(self, backend):
|
|
"""Test searching users."""
|
|
users = backend.search_users("alice")
|
|
assert len(users) == 1
|
|
assert users[0].username == "alice"
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestMilestoneOperations:
|
|
"""Test milestone management operations."""
|
|
|
|
@pytest.fixture
|
|
def backend(self):
|
|
"""Create a temporary backend for each test."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
backend = LocalSQLiteBackend()
|
|
backend.connect({'db_path': str(Path(tmpdir) / "test.db")})
|
|
yield backend
|
|
backend.disconnect()
|
|
|
|
def test_create_milestone(self, backend):
|
|
"""Test creating a milestone."""
|
|
milestone = Milestone(
|
|
id=None,
|
|
title="v1.0",
|
|
description="First release",
|
|
state="open",
|
|
due_date=datetime.now(timezone.utc) + timedelta(days=30)
|
|
)
|
|
|
|
created = backend.create_milestone(milestone)
|
|
assert created.id is not None
|
|
assert created.title == "v1.0"
|
|
|
|
def test_get_milestones(self, backend):
|
|
"""Test getting all milestones."""
|
|
backend.create_milestone(Milestone(id=None, title="v1.0", state="open"))
|
|
backend.create_milestone(Milestone(id=None, title="v2.0", state="open"))
|
|
|
|
milestones = backend.get_milestones()
|
|
assert len(milestones) == 2
|
|
|
|
def test_update_milestone(self, backend):
|
|
"""Test updating a milestone."""
|
|
milestone = Milestone(id=None, title="v1.0", description="Old", state="open")
|
|
created = backend.create_milestone(milestone)
|
|
|
|
created.description = "Updated"
|
|
created.state = "closed"
|
|
backend.update_milestone(created)
|
|
|
|
milestones = backend.get_milestones()
|
|
updated = next(m for m in milestones if m.id == created.id)
|
|
assert updated.description == "Updated"
|
|
assert updated.state == "closed"
|
|
|
|
def test_delete_milestone(self, backend):
|
|
"""Test deleting a milestone."""
|
|
milestone = Milestone(id=None, title="v1.0", state="open")
|
|
created = backend.create_milestone(milestone)
|
|
|
|
result = backend.delete_milestone(created.id)
|
|
assert result is True
|
|
|
|
milestones = backend.get_milestones()
|
|
assert len(milestones) == 0
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestCommentOperations:
|
|
"""Test comment operations."""
|
|
|
|
@pytest.fixture
|
|
def backend(self):
|
|
"""Create backend with an issue."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
backend = LocalSQLiteBackend()
|
|
backend.connect({'db_path': str(Path(tmpdir) / "test.db")})
|
|
|
|
# Create test issue
|
|
issue = Issue(
|
|
id=None, number=0, title="Test", description="Desc",
|
|
state=IssueState.OPEN,
|
|
created_at=datetime.now(timezone.utc),
|
|
updated_at=datetime.now(timezone.utc)
|
|
)
|
|
backend.create_issue(issue)
|
|
|
|
yield backend
|
|
backend.disconnect()
|
|
|
|
def test_add_comment(self, backend):
|
|
"""Test adding a comment to an issue."""
|
|
issue = backend.get_issue_by_number(1)
|
|
author = User(id="user1", username="alice")
|
|
|
|
comment = Comment(
|
|
id=None,
|
|
body="Great issue!",
|
|
author=author,
|
|
created_at=datetime.now(timezone.utc)
|
|
)
|
|
|
|
created = backend.add_comment(issue.id, comment)
|
|
assert created.id is not None
|
|
assert created.body == "Great issue!"
|
|
|
|
def test_get_comments(self, backend):
|
|
"""Test getting comments for an issue."""
|
|
issue = backend.get_issue_by_number(1)
|
|
author = User(id="user1", username="alice")
|
|
|
|
comment1 = Comment(
|
|
id=None, body="First comment", author=author,
|
|
created_at=datetime.now(timezone.utc)
|
|
)
|
|
comment2 = Comment(
|
|
id=None, body="Second comment", author=author,
|
|
created_at=datetime.now(timezone.utc)
|
|
)
|
|
|
|
backend.add_comment(issue.id, comment1)
|
|
backend.add_comment(issue.id, comment2)
|
|
|
|
comments = backend.get_comments(issue.id)
|
|
assert len(comments) == 2
|
|
assert comments[0].body == "First comment"
|
|
|
|
def test_update_comment(self, backend):
|
|
"""Test updating a comment."""
|
|
issue = backend.get_issue_by_number(1)
|
|
author = User(id="user1", username="alice")
|
|
|
|
comment = Comment(
|
|
id=None, body="Original", author=author,
|
|
created_at=datetime.now(timezone.utc)
|
|
)
|
|
created = backend.add_comment(issue.id, comment)
|
|
|
|
created.body = "Updated"
|
|
backend.update_comment(created)
|
|
|
|
comments = backend.get_comments(issue.id)
|
|
assert comments[0].body == "Updated"
|
|
|
|
def test_delete_comment(self, backend):
|
|
"""Test deleting a comment."""
|
|
issue = backend.get_issue_by_number(1)
|
|
author = User(id="user1", username="alice")
|
|
|
|
comment = Comment(
|
|
id=None, body="To delete", author=author,
|
|
created_at=datetime.now(timezone.utc)
|
|
)
|
|
created = backend.add_comment(issue.id, comment)
|
|
|
|
result = backend.delete_comment(created.id)
|
|
assert result is True
|
|
|
|
comments = backend.get_comments(issue.id)
|
|
assert len(comments) == 0
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestSyncOperations:
|
|
"""Test synchronization support."""
|
|
|
|
@pytest.fixture
|
|
def backend(self):
|
|
"""Create a temporary backend for each test."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
backend = LocalSQLiteBackend()
|
|
backend.connect({'db_path': str(Path(tmpdir) / "test.db")})
|
|
yield backend
|
|
backend.disconnect()
|
|
|
|
def test_finalize_sync_logs_success(self, backend):
|
|
"""Test successful sync is logged."""
|
|
backend.finalize_sync(success=True)
|
|
|
|
cursor = backend.connection.execute(
|
|
"SELECT * FROM sync_history WHERE success = 1"
|
|
)
|
|
rows = cursor.fetchall()
|
|
assert len(rows) == 1
|
|
|
|
def test_get_issues_modified_since(self, backend):
|
|
"""Test getting issues modified after a timestamp."""
|
|
old_time = datetime.now(timezone.utc) - timedelta(hours=2)
|
|
|
|
# Create issue before timestamp
|
|
issue1 = Issue(
|
|
id=None, number=0, title="Old", description="Desc",
|
|
state=IssueState.OPEN,
|
|
created_at=old_time,
|
|
updated_at=old_time
|
|
)
|
|
backend.create_issue(issue1)
|
|
|
|
# Create issue after timestamp
|
|
new_time = datetime.now(timezone.utc)
|
|
issue2 = Issue(
|
|
id=None, number=0, title="New", description="Desc",
|
|
state=IssueState.OPEN,
|
|
created_at=new_time,
|
|
updated_at=new_time
|
|
)
|
|
backend.create_issue(issue2)
|
|
|
|
# Get issues modified since 1 hour ago
|
|
since_time = datetime.now(timezone.utc) - timedelta(hours=1)
|
|
modified_issues = backend.get_issues_modified_since(since_time)
|
|
|
|
assert len(modified_issues) == 1
|
|
assert modified_issues[0].title == "New"
|
|
|
|
def test_get_sync_conflicts_returns_empty(self, backend):
|
|
"""Test local backend has no sync conflicts."""
|
|
conflicts = backend.get_sync_conflicts()
|
|
assert conflicts == []
|
|
|
|
def test_prepare_for_sync(self, backend):
|
|
"""Test prepare_for_sync doesn't fail."""
|
|
# Should not raise
|
|
backend.prepare_for_sync()
|