Files
markitect-main/tests/test_issue_144_database_performance.py
tegwick 2e49072d41 feat: complete core asset management system with database integration
- Add enhanced AssetManager with database integration and usage tracking
- Implement Asset model with from_dict/to_dict conversion methods
- Add resolve_asset_references() for linking discovered assets to imports
- Integrate AssetDatabase with enhanced schema and performance indexes
- Fix database schema constraints and test compatibility issues
- Add list_assets_as_objects() method for dict-to-object migration
- Resolve 91% of asset management tests (51/56 passing)

Key features:
* Content-addressable asset storage with deduplication
* Database-backed usage statistics and processing logs
* Asset reference resolution from markdown files
* Enhanced performance with indexing and caching
* Object-oriented Asset model with backwards compatibility

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-14 23:42:42 +02:00

349 lines
12 KiB
Python

"""
Test scenario for Issue #144: Database Integration and Performance Features
This test covers the enhanced database schema, caching layer, and performance
optimizations for large asset libraries.
Issue #144: Phase 3 - Advanced Features and Performance
"""
import pytest
import tempfile
import shutil
from pathlib import Path
from unittest.mock import Mock, patch, MagicMock
import sqlite3
import time
from datetime import datetime, timedelta
from markitect.assets import AssetManager, AssetRegistry
from markitect.assets.database import AssetDatabase, DatabaseMigration
from markitect.assets.cache import AssetCache, CacheStrategy
from markitect.assets.performance import PerformanceMonitor, QueryOptimizer
class TestDatabaseIntegrationAndPerformance:
"""Test database integration and performance features for Issue #144."""
def setup_method(self):
"""Set up test environment with temporary database and cache."""
self.temp_dir = tempfile.mkdtemp()
self.db_path = Path(self.temp_dir) / "test_assets.db"
self.assets_dir = Path(self.temp_dir) / "assets"
self.assets_dir.mkdir()
self.asset_manager = AssetManager(
storage_path=self.assets_dir,
database_path=self.db_path
)
def teardown_method(self):
"""Clean up temporary directories and database."""
shutil.rmtree(self.temp_dir)
def test_enhanced_database_schema_creation(self):
"""Test creation of enhanced database schema with new tables."""
db = AssetDatabase(self.db_path)
db.initialize_enhanced_schema()
# Verify new tables exist
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Check asset_usage_stats table
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name='asset_usage_stats'
""")
assert cursor.fetchone() is not None
# Check asset_processing_log table
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name='asset_processing_log'
""")
assert cursor.fetchone() is not None
# Check package_metadata table
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name='package_metadata'
""")
assert cursor.fetchone() is not None
def test_asset_usage_tracking(self):
"""Test asset usage statistics tracking."""
db = AssetDatabase(self.db_path)
db.initialize_enhanced_schema()
content_hash = "test_hash_123"
# Record asset usage
db.record_asset_usage(content_hash, document_path="/test/doc.md")
db.record_asset_usage(content_hash, document_path="/test/doc2.md")
# Verify usage statistics
stats = db.get_asset_usage_stats(content_hash)
assert stats['document_count'] == 2
assert stats['access_frequency'] > 0
assert isinstance(stats['last_used'], datetime)
def test_asset_processing_log(self):
"""Test asset processing operation logging."""
db = AssetDatabase(self.db_path)
db.initialize_enhanced_schema()
content_hash = "test_hash_456"
operation_details = {
"operation_type": "batch_import",
"file_count": 25,
"processing_time": 5.2
}
# Log processing operation
log_id = db.log_processing_operation(
content_hash=content_hash,
operation="add",
details=operation_details,
success=True
)
assert log_id is not None
# Retrieve processing history
history = db.get_processing_history(content_hash)
assert len(history) == 1
assert history[0]['operation'] == "add"
assert history[0]['success'] is True
assert history[0]['details']['file_count'] == 25
def test_database_indexing_optimization(self):
"""Test database indexing for optimized asset queries."""
db = AssetDatabase(self.db_path)
db.initialize_enhanced_schema()
db.create_performance_indexes()
# Verify indexes were created
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='index' AND name LIKE 'idx_%'
""")
indexes = cursor.fetchall()
# Should have indexes for common query patterns
index_names = [idx[0] for idx in indexes]
assert 'idx_usage_content_hash' in index_names
assert 'idx_usage_last_used' in index_names
assert 'idx_processing_timestamp' in index_names
def test_query_performance_monitoring(self):
"""Test query performance monitoring and optimization."""
monitor = PerformanceMonitor()
# Simulate some database queries
with monitor.track_query("get_asset_metadata"):
time.sleep(0.01) # Simulate query time
with monitor.track_query("batch_insert_assets"):
time.sleep(0.05) # Simulate longer query
# Verify performance metrics were collected
metrics = monitor.get_metrics()
assert 'get_asset_metadata' in metrics
assert 'batch_insert_assets' in metrics
assert metrics['get_asset_metadata']['avg_time'] > 0
assert metrics['batch_insert_assets']['call_count'] == 1
def test_asset_cache_initialization(self):
"""Test asset caching layer initialization."""
cache = AssetCache(
max_size_mb=50,
strategy=CacheStrategy.LRU
)
assert cache.max_size_bytes == 50 * 1024 * 1024
assert cache.strategy == CacheStrategy.LRU
assert cache.current_size_bytes == 0
def test_asset_metadata_caching(self):
"""Test caching of asset metadata for performance."""
cache = AssetCache(max_size_mb=10)
content_hash = "cached_hash_789"
metadata = {
"filename": "test.png",
"size": 1024,
"mime_type": "image/png",
"created_at": datetime.now().isoformat()
}
# Cache metadata
cache.store_metadata(content_hash, metadata)
# Retrieve from cache
cached_metadata = cache.get_metadata(content_hash)
assert cached_metadata == metadata
assert cache.get_hit_rate() > 0
def test_thumbnail_generation_and_caching(self):
"""Test thumbnail generation and caching for images."""
cache = AssetCache(max_size_mb=20)
# Mock image file
image_path = self.assets_dir / "test_image.png"
image_path.write_bytes(b"PNG fake content")
content_hash = "image_hash_abc"
# Generate and cache thumbnail
thumbnail_data = cache.generate_and_cache_thumbnail(
content_hash,
image_path,
size=(150, 150)
)
assert thumbnail_data is not None
# Retrieve cached thumbnail
cached_thumbnail = cache.get_thumbnail(content_hash, size=(150, 150))
assert cached_thumbnail == thumbnail_data
def test_cache_invalidation_strategies(self):
"""Test cache invalidation and cleanup strategies."""
cache = AssetCache(max_size_mb=1) # Small cache to test eviction
# Fill cache beyond capacity
for i in range(10):
content_hash = f"hash_{i}"
metadata = {"filename": f"file_{i}.txt", "size": 1024 * 100} # 100KB each
cache.store_metadata(content_hash, metadata)
# Verify LRU eviction occurred
assert cache.current_size_bytes <= cache.max_size_bytes
# Test manual invalidation
cache.invalidate("hash_0")
assert cache.get_metadata("hash_0") is None
def test_database_migration_support(self):
"""Test database migration support for schema updates."""
migration = DatabaseMigration(self.db_path)
# Create initial schema
migration.create_base_schema()
# Apply enhancement migration
migration.apply_migration("add_usage_tracking")
migration.apply_migration("add_processing_log")
migration.apply_migration("add_package_metadata")
# Verify migration history
applied_migrations = migration.get_applied_migrations()
assert "add_usage_tracking" in applied_migrations
assert "add_processing_log" in applied_migrations
assert "add_package_metadata" in applied_migrations
def test_database_backup_and_recovery(self):
"""Test database backup and recovery procedures."""
db = AssetDatabase(self.db_path)
db.initialize_enhanced_schema()
# Add some test data
content_hash = "backup_test_hash"
db.record_asset_usage(content_hash, "/test/backup.md")
# Create backup
backup_path = Path(self.temp_dir) / "backup.db"
db.create_backup(backup_path)
assert backup_path.exists()
# Test recovery
recovery_db = AssetDatabase(backup_path)
stats = recovery_db.get_asset_usage_stats(content_hash)
assert stats['document_count'] == 1
def test_connection_pooling_and_transactions(self):
"""Test database connection pooling and transaction management."""
db = AssetDatabase(self.db_path, enable_pooling=True, max_connections=5)
# Test transaction context manager
with db.transaction() as txn:
txn.execute("INSERT INTO asset_metadata (content_hash, filename, size_bytes, mime_type) VALUES (?, ?, ?, ?)",
("txn_hash", "txn_test.txt", 1024, "text/plain"))
# Verify data exists within transaction
result = txn.execute("SELECT filename FROM asset_metadata WHERE content_hash = ?",
("txn_hash",)).fetchone()
assert result[0] == "txn_test.txt"
# Verify transaction was committed
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT filename FROM asset_metadata WHERE content_hash = ?",
("txn_hash",))
result = cursor.fetchone()
assert result[0] == "txn_test.txt"
def test_large_dataset_performance(self):
"""Test performance with large datasets (scaled down for testing)."""
db = AssetDatabase(self.db_path)
db.initialize_enhanced_schema()
db.create_performance_indexes()
# Insert test dataset
test_size = 1000 # Scaled down from 10,000 for test speed
start_time = time.time()
for i in range(test_size):
content_hash = f"perf_hash_{i:04d}"
db.record_asset_usage(content_hash, f"/test/doc_{i}.md")
insert_time = time.time() - start_time
# Test query performance
start_time = time.time()
recent_assets = db.get_recently_used_assets(limit=100)
query_time = time.time() - start_time
# Performance assertions (should complete quickly)
assert insert_time < 10.0 # Should insert 1000 records in under 10 seconds
assert query_time < 1.0 # Should query in under 1 second
assert len(recent_assets) <= 100
def test_cache_effectiveness_validation(self):
"""Test cache effectiveness under realistic usage patterns."""
cache = AssetCache(max_size_mb=10)
# Simulate realistic access patterns
assets = [f"asset_{i}" for i in range(100)]
# First pass - populate cache
for asset in assets:
metadata = {"filename": f"{asset}.png", "size": 1024}
cache.store_metadata(asset, metadata)
# Second pass - should hit cache frequently
for asset in assets[:50]: # Access first 50 again
cached = cache.get_metadata(asset)
assert cached is not None
# Verify hit rate is reasonable
hit_rate = cache.get_hit_rate()
assert hit_rate > 0.3 # At least 30% hit rate
# Verify cache metrics
metrics = cache.get_performance_metrics()
assert metrics['total_requests'] > 100
assert metrics['cache_hits'] > 30