""" Integration tests for document repository with real database. Demonstrates: - Real database integration testing - Transaction testing - Performance validation - Error scenario handling """ import pytest import sqlite3 import asyncio from pathlib import Path from datetime import datetime, timezone import tempfile import shutil from tests.fixtures.markdown_samples import MarkdownDocumentBuilder, SAMPLE_COMPLEX_DOCUMENT from tests.utils.assertions import assert_file_exists, assert_performance_within_bounds class MockDocument: """Mock document model for testing.""" def __init__(self, filename: str, content: str, ast_data: dict = None): self.filename = filename self.content = content self.ast_data = ast_data or {} self.created_at = datetime.now(timezone.utc) self.updated_at = datetime.now(timezone.utc) class MockDocumentRepository: """Mock document repository that simulates real database operations.""" def __init__(self, db_path: Path): self.db_path = db_path self._init_database() def _init_database(self): """Initialize database schema.""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS documents ( id INTEGER PRIMARY KEY AUTOINCREMENT, filename TEXT UNIQUE NOT NULL, content TEXT NOT NULL, ast_data TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_documents_filename ON documents(filename) """) conn.commit() conn.close() async def store_document(self, document: MockDocument) -> int: """Store a document in the database.""" await asyncio.sleep(0.001) # Simulate async database operation conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: cursor.execute(""" INSERT INTO documents (filename, content, ast_data, created_at, updated_at) VALUES (?, ?, ?, ?, ?) """, ( document.filename, document.content, str(document.ast_data), document.created_at.isoformat(), document.updated_at.isoformat() )) document_id = cursor.lastrowid conn.commit() return document_id except sqlite3.IntegrityError as e: conn.rollback() raise ValueError(f"Document with filename '{document.filename}' already exists") from e finally: conn.close() async def get_document(self, document_id: int) -> MockDocument: """Retrieve a document by ID.""" await asyncio.sleep(0.001) # Simulate async database operation conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: cursor.execute(""" SELECT filename, content, ast_data, created_at, updated_at FROM documents WHERE id = ? """, (document_id,)) row = cursor.fetchone() if not row: raise ValueError(f"Document with ID {document_id} not found") filename, content, ast_data, created_at, updated_at = row document = MockDocument(filename, content, eval(ast_data) if ast_data else {}) document.created_at = datetime.fromisoformat(created_at) document.updated_at = datetime.fromisoformat(updated_at) return document finally: conn.close() async def update_document(self, document_id: int, content: str, ast_data: dict) -> None: """Update document content and AST data.""" await asyncio.sleep(0.001) # Simulate async database operation conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: cursor.execute(""" UPDATE documents SET content = ?, ast_data = ?, updated_at = ? WHERE id = ? """, ( content, str(ast_data), datetime.now(timezone.utc).isoformat(), document_id )) if cursor.rowcount == 0: raise ValueError(f"Document with ID {document_id} not found") conn.commit() finally: conn.close() async def delete_document(self, document_id: int) -> None: """Delete a document.""" await asyncio.sleep(0.001) # Simulate async database operation conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: cursor.execute("DELETE FROM documents WHERE id = ?", (document_id,)) if cursor.rowcount == 0: raise ValueError(f"Document with ID {document_id} not found") conn.commit() finally: conn.close() async def list_all_documents(self): """List all documents.""" await asyncio.sleep(0.001) # Simulate async database operation conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: cursor.execute(""" SELECT id, filename, created_at, updated_at FROM documents ORDER BY created_at DESC """) rows = cursor.fetchall() return [ { "id": row[0], "filename": row[1], "created_at": row[2], "updated_at": row[3] } for row in rows ] finally: conn.close() async def search_content(self, search_term: str): """Search documents by content.""" await asyncio.sleep(0.005) # Simulate more expensive search operation conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: cursor.execute(""" SELECT id, filename, content FROM documents WHERE content LIKE ? ORDER BY filename """, (f"%{search_term}%",)) rows = cursor.fetchall() return [ { "id": row[0], "filename": row[1], "content": row[2] } for row in rows ] finally: conn.close() def close(self): """Close repository (cleanup).""" pass @pytest.fixture def test_db_path(test_workspace): """Provide test database path.""" return test_workspace / "integration_test.db" @pytest.fixture async def document_repository(test_db_path): """Provide document repository with real database.""" repo = MockDocumentRepository(test_db_path) yield repo repo.close() @pytest.mark.integration class TestDocumentRepositoryIntegration: """Integration tests for document repository with real database.""" @pytest.mark.asyncio async def test_store_and_retrieve_document(self, document_repository, test_db_path): """Test storing and retrieving a document.""" # Arrange assert_file_exists(test_db_path) document = MockDocument( filename="test.md", content="# Test Document\nThis is a test.", ast_data={"type": "document", "children": []} ) # Act document_id = await document_repository.store_document(document) retrieved = await document_repository.get_document(document_id) # Assert assert isinstance(document_id, int) assert document_id > 0 assert retrieved.filename == "test.md" assert retrieved.content == "# Test Document\nThis is a test." assert retrieved.ast_data["type"] == "document" @pytest.mark.asyncio async def test_store_duplicate_filename_raises_error(self, document_repository): """Test that storing duplicate filename raises error.""" # Arrange document1 = MockDocument("duplicate.md", "Content 1") document2 = MockDocument("duplicate.md", "Content 2") # Act await document_repository.store_document(document1) # Assert with pytest.raises(ValueError, match="already exists"): await document_repository.store_document(document2) @pytest.mark.asyncio async def test_update_document_content(self, document_repository): """Test updating document content and AST.""" # Arrange document = MockDocument("update.md", "Original content") document_id = await document_repository.store_document(document) # Act new_content = "Updated content" new_ast = {"type": "document", "updated": True} await document_repository.update_document(document_id, new_content, new_ast) # Verify updated = await document_repository.get_document(document_id) assert updated.content == "Updated content" assert updated.ast_data["updated"] is True @pytest.mark.asyncio async def test_delete_document(self, document_repository): """Test deleting a document.""" # Arrange document = MockDocument("delete.md", "To be deleted") document_id = await document_repository.store_document(document) # Verify document exists retrieved = await document_repository.get_document(document_id) assert retrieved.filename == "delete.md" # Act await document_repository.delete_document(document_id) # Assert with pytest.raises(ValueError, match="not found"): await document_repository.get_document(document_id) @pytest.mark.asyncio async def test_list_all_documents(self, document_repository): """Test listing all documents.""" # Arrange - Store multiple documents documents = [ MockDocument("doc1.md", "Content 1"), MockDocument("doc2.md", "Content 2"), MockDocument("doc3.md", "Content 3") ] for doc in documents: await document_repository.store_document(doc) # Act all_docs = await document_repository.list_all_documents() # Assert assert len(all_docs) == 3 filenames = {doc["filename"] for doc in all_docs} expected_filenames = {"doc1.md", "doc2.md", "doc3.md"} assert filenames == expected_filenames @pytest.mark.asyncio async def test_search_content(self, document_repository): """Test content search functionality.""" # Arrange documents = [ MockDocument("api.md", "API documentation for REST endpoints"), MockDocument("guide.md", "User guide for getting started"), MockDocument("readme.md", "Project README with API examples") ] for doc in documents: await document_repository.store_document(doc) # Act api_results = await document_repository.search_content("API") guide_results = await document_repository.search_content("guide") # Assert assert len(api_results) == 2 # api.md and readme.md api_filenames = {result["filename"] for result in api_results} assert api_filenames == {"api.md", "readme.md"} assert len(guide_results) == 1 # guide.md only assert guide_results[0]["filename"] == "guide.md" @pytest.mark.asyncio async def test_bulk_operations_performance(self, document_repository, performance_timer): """Test performance of bulk operations.""" # Arrange documents = [] for i in range(50): content = (MarkdownDocumentBuilder() .with_heading(f"Document {i}") .with_paragraph(f"Content for document {i}") .build()) documents.append(MockDocument(f"bulk_{i}.md", content)) # Act - Bulk storage performance_timer.start() document_ids = [] for doc in documents: doc_id = await document_repository.store_document(doc) document_ids.append(doc_id) performance_timer.stop() # Assert assert len(document_ids) == 50 assert_performance_within_bounds(performance_timer.elapsed, 5.0, "bulk document storage") # Act - Bulk retrieval performance_timer.start() retrieved_docs = [] for doc_id in document_ids: doc = await document_repository.get_document(doc_id) retrieved_docs.append(doc) performance_timer.stop() # Assert assert len(retrieved_docs) == 50 assert_performance_within_bounds(performance_timer.elapsed, 3.0, "bulk document retrieval") @pytest.mark.asyncio async def test_concurrent_operations(self, document_repository): """Test concurrent database operations.""" # Arrange async def store_document(index): content = f"# Document {index}\nContent for document {index}" doc = MockDocument(f"concurrent_{index}.md", content) return await document_repository.store_document(doc) # Act - Concurrent storage tasks = [store_document(i) for i in range(20)] document_ids = await asyncio.gather(*tasks) # Assert assert len(document_ids) == 20 assert len(set(document_ids)) == 20 # All IDs should be unique # Verify all documents are accessible all_docs = await document_repository.list_all_documents() assert len(all_docs) == 20 @pytest.mark.asyncio async def test_transaction_like_behavior(self, document_repository): """Test error handling doesn't leave database in inconsistent state.""" # Arrange - Store initial document doc1 = MockDocument("initial.md", "Initial content") doc_id = await document_repository.store_document(doc1) # Act - Try to update with invalid ID (should fail) with pytest.raises(ValueError, match="not found"): await document_repository.update_document(99999, "Invalid update", {}) # Assert - Original document should be unchanged retrieved = await document_repository.get_document(doc_id) assert retrieved.content == "Initial content" @pytest.mark.asyncio async def test_large_document_handling(self, document_repository, performance_timer): """Test handling of large documents.""" # Arrange - Create large document content from tests.fixtures.markdown_samples import LargeMarkdownGenerator generator = LargeMarkdownGenerator(seed=42) large_content = generator.generate_document(size="100kb") document = MockDocument("large.md", large_content) # Act performance_timer.start() document_id = await document_repository.store_document(document) retrieved = await document_repository.get_document(document_id) performance_timer.stop() # Assert assert document_id > 0 assert len(retrieved.content) > 100000 # At least 100KB assert retrieved.content == large_content assert_performance_within_bounds(performance_timer.elapsed, 1.0, "large document operations") @pytest.mark.asyncio @pytest.mark.slow async def test_search_performance_with_large_dataset(self, document_repository, performance_timer): """Test search performance with large dataset.""" # Arrange - Create many documents with searchable content search_terms = ["API", "database", "testing", "performance", "integration"] documents = [] for i in range(100): term = search_terms[i % len(search_terms)] content = (MarkdownDocumentBuilder() .with_heading(f"Document {i}") .with_paragraph(f"This document covers {term} functionality in detail.") .with_paragraph("Additional content for search testing.") .build()) documents.append(MockDocument(f"search_{i}.md", content)) # Store all documents for doc in documents: await document_repository.store_document(doc) # Act - Perform searches performance_timer.start() api_results = await document_repository.search_content("API") database_results = await document_repository.search_content("database") performance_timer.stop() # Assert assert len(api_results) >= 20 # Should find multiple documents assert len(database_results) >= 20 assert_performance_within_bounds(performance_timer.elapsed, 2.0, "search operations")