""" Test scenario for Issue #144: Database Integration and Performance Features This test covers the enhanced database schema, caching layer, and performance optimizations for large asset libraries. Issue #144: Phase 3 - Advanced Features and Performance """ import pytest import tempfile import shutil from pathlib import Path from unittest.mock import Mock, patch, MagicMock import sqlite3 import time from datetime import datetime, timedelta from markitect.assets import AssetManager, AssetRegistry from markitect.assets.database import AssetDatabase, DatabaseMigration from markitect.assets.cache import AssetCache, CacheStrategy from markitect.assets.performance import PerformanceMonitor, QueryOptimizer class TestDatabaseIntegrationAndPerformance: """Test database integration and performance features for Issue #144.""" def setup_method(self): """Set up test environment with temporary database and cache.""" self.temp_dir = tempfile.mkdtemp() self.db_path = Path(self.temp_dir) / "test_assets.db" self.assets_dir = Path(self.temp_dir) / "assets" self.assets_dir.mkdir() self.asset_manager = AssetManager( storage_path=self.assets_dir, database_path=self.db_path ) def teardown_method(self): """Clean up temporary directories and database.""" shutil.rmtree(self.temp_dir) def test_enhanced_database_schema_creation(self): """Test creation of enhanced database schema with new tables.""" db = AssetDatabase(self.db_path) db.initialize_enhanced_schema() # Verify new tables exist with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Check asset_usage_stats table cursor.execute(""" SELECT name FROM sqlite_master WHERE type='table' AND name='asset_usage_stats' """) assert cursor.fetchone() is not None # Check asset_processing_log table cursor.execute(""" SELECT name FROM sqlite_master WHERE type='table' AND name='asset_processing_log' """) assert cursor.fetchone() is not None # Check package_metadata table cursor.execute(""" SELECT name FROM sqlite_master WHERE type='table' AND name='package_metadata' """) assert cursor.fetchone() is not None def test_asset_usage_tracking(self): """Test asset usage statistics tracking.""" db = AssetDatabase(self.db_path) db.initialize_enhanced_schema() content_hash = "test_hash_123" # Record asset usage db.record_asset_usage(content_hash, document_path="/test/doc.md") db.record_asset_usage(content_hash, document_path="/test/doc2.md") # Verify usage statistics stats = db.get_asset_usage_stats(content_hash) assert stats['document_count'] == 2 assert stats['access_frequency'] > 0 assert isinstance(stats['last_used'], datetime) def test_asset_processing_log(self): """Test asset processing operation logging.""" db = AssetDatabase(self.db_path) db.initialize_enhanced_schema() content_hash = "test_hash_456" operation_details = { "operation_type": "batch_import", "file_count": 25, "processing_time": 5.2 } # Log processing operation log_id = db.log_processing_operation( content_hash=content_hash, operation="add", details=operation_details, success=True ) assert log_id is not None # Retrieve processing history history = db.get_processing_history(content_hash) assert len(history) == 1 assert history[0]['operation'] == "add" assert history[0]['success'] is True assert history[0]['details']['file_count'] == 25 def test_database_indexing_optimization(self): """Test database indexing for optimized asset queries.""" db = AssetDatabase(self.db_path) db.initialize_enhanced_schema() db.create_performance_indexes() # Verify indexes were created with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" SELECT name FROM sqlite_master WHERE type='index' AND name LIKE 'idx_%' """) indexes = cursor.fetchall() # Should have indexes for common query patterns index_names = [idx[0] for idx in indexes] assert 'idx_usage_content_hash' in index_names assert 'idx_usage_last_used' in index_names assert 'idx_processing_timestamp' in index_names def test_query_performance_monitoring(self): """Test query performance monitoring and optimization.""" monitor = PerformanceMonitor() # Simulate some database queries with monitor.track_query("get_asset_metadata"): time.sleep(0.01) # Simulate query time with monitor.track_query("batch_insert_assets"): time.sleep(0.05) # Simulate longer query # Verify performance metrics were collected metrics = monitor.get_metrics() assert 'get_asset_metadata' in metrics assert 'batch_insert_assets' in metrics assert metrics['get_asset_metadata']['avg_time'] > 0 assert metrics['batch_insert_assets']['call_count'] == 1 def test_asset_cache_initialization(self): """Test asset caching layer initialization.""" cache = AssetCache( max_size_mb=50, strategy=CacheStrategy.LRU ) assert cache.max_size_bytes == 50 * 1024 * 1024 assert cache.strategy == CacheStrategy.LRU assert cache.current_size_bytes == 0 def test_asset_metadata_caching(self): """Test caching of asset metadata for performance.""" cache = AssetCache(max_size_mb=10) content_hash = "cached_hash_789" metadata = { "filename": "test.png", "size": 1024, "mime_type": "image/png", "created_at": datetime.now().isoformat() } # Cache metadata cache.store_metadata(content_hash, metadata) # Retrieve from cache cached_metadata = cache.get_metadata(content_hash) assert cached_metadata == metadata assert cache.get_hit_rate() > 0 def test_thumbnail_generation_and_caching(self): """Test thumbnail generation and caching for images.""" cache = AssetCache(max_size_mb=20) # Mock image file image_path = self.assets_dir / "test_image.png" image_path.write_bytes(b"PNG fake content") content_hash = "image_hash_abc" # Generate and cache thumbnail thumbnail_data = cache.generate_and_cache_thumbnail( content_hash, image_path, size=(150, 150) ) assert thumbnail_data is not None # Retrieve cached thumbnail cached_thumbnail = cache.get_thumbnail(content_hash, size=(150, 150)) assert cached_thumbnail == thumbnail_data def test_cache_invalidation_strategies(self): """Test cache invalidation and cleanup strategies.""" cache = AssetCache(max_size_mb=1) # Small cache to test eviction # Fill cache beyond capacity for i in range(10): content_hash = f"hash_{i}" metadata = {"filename": f"file_{i}.txt", "size": 1024 * 100} # 100KB each cache.store_metadata(content_hash, metadata) # Verify LRU eviction occurred assert cache.current_size_bytes <= cache.max_size_bytes # Test manual invalidation cache.invalidate("hash_0") assert cache.get_metadata("hash_0") is None def test_database_migration_support(self): """Test database migration support for schema updates.""" migration = DatabaseMigration(self.db_path) # Create initial schema migration.create_base_schema() # Apply enhancement migration migration.apply_migration("add_usage_tracking") migration.apply_migration("add_processing_log") migration.apply_migration("add_package_metadata") # Verify migration history applied_migrations = migration.get_applied_migrations() assert "add_usage_tracking" in applied_migrations assert "add_processing_log" in applied_migrations assert "add_package_metadata" in applied_migrations def test_database_backup_and_recovery(self): """Test database backup and recovery procedures.""" db = AssetDatabase(self.db_path) db.initialize_enhanced_schema() # Add some test data content_hash = "backup_test_hash" db.record_asset_usage(content_hash, "/test/backup.md") # Create backup backup_path = Path(self.temp_dir) / "backup.db" db.create_backup(backup_path) assert backup_path.exists() # Test recovery recovery_db = AssetDatabase(backup_path) stats = recovery_db.get_asset_usage_stats(content_hash) assert stats['document_count'] == 1 def test_connection_pooling_and_transactions(self): """Test database connection pooling and transaction management.""" db = AssetDatabase(self.db_path, enable_pooling=True, max_connections=5) # Test transaction context manager with db.transaction() as txn: txn.execute("INSERT INTO asset_metadata (content_hash, filename, size_bytes, mime_type) VALUES (?, ?, ?, ?)", ("txn_hash", "txn_test.txt", 1024, "text/plain")) # Verify data exists within transaction result = txn.execute("SELECT filename FROM asset_metadata WHERE content_hash = ?", ("txn_hash",)).fetchone() assert result[0] == "txn_test.txt" # Verify transaction was committed with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("SELECT filename FROM asset_metadata WHERE content_hash = ?", ("txn_hash",)) result = cursor.fetchone() assert result[0] == "txn_test.txt" def test_large_dataset_performance(self): """Test performance with large datasets (scaled down for testing).""" db = AssetDatabase(self.db_path) db.initialize_enhanced_schema() db.create_performance_indexes() # Insert test dataset test_size = 1000 # Scaled down from 10,000 for test speed start_time = time.time() for i in range(test_size): content_hash = f"perf_hash_{i:04d}" db.record_asset_usage(content_hash, f"/test/doc_{i}.md") insert_time = time.time() - start_time # Test query performance start_time = time.time() recent_assets = db.get_recently_used_assets(limit=100) query_time = time.time() - start_time # Performance assertions (should complete quickly) assert insert_time < 10.0 # Should insert 1000 records in under 10 seconds assert query_time < 1.0 # Should query in under 1 second assert len(recent_assets) <= 100 def test_cache_effectiveness_validation(self): """Test cache effectiveness under realistic usage patterns.""" cache = AssetCache(max_size_mb=10) # Simulate realistic access patterns assets = [f"asset_{i}" for i in range(100)] # First pass - populate cache for asset in assets: metadata = {"filename": f"{asset}.png", "size": 1024} cache.store_metadata(asset, metadata) # Second pass - should hit cache frequently for asset in assets[:50]: # Access first 50 again cached = cache.get_metadata(asset) assert cached is not None # Verify hit rate is reasonable hit_rate = cache.get_hit_rate() assert hit_rate > 0.3 # At least 30% hit rate # Verify cache metrics metrics = cache.get_performance_metrics() assert metrics['total_requests'] > 100 assert metrics['cache_hits'] > 30