Files
markitect-main/tests/test_issue_144_integration_workflow.py
tegwick 567f01121e
Some checks failed
Test Suite / unit-tests (3.11) (push) Has been cancelled
Test Suite / unit-tests (3.12) (push) Has been cancelled
Test Suite / integration-tests (push) Has been cancelled
Test Suite / e2e-tests (push) Has been cancelled
Test Suite / performance-tests (push) Has been cancelled
Test Suite / code-quality (push) Has been cancelled
Test Suite / security-scan (push) Has been cancelled
Test Suite / test-summary (push) Has been cancelled
feat: complete Issue #146 final integration testing
Fixed all remaining test failures in test_issue_146_final_integration.py
achieving 100% test success rate (9/9 tests passing):

- Fixed performance monitoring metrics access patterns
- Resolved AssetManager constructor parameter handling
- Implemented missing CLI command methods (add_asset, list_assets, get_asset_info)
- Added cross-platform symlink creation method aliases
- Fixed asset deduplication content uniqueness issues
- Resolved production deployment asset removal workflows
- Fixed performance benchmark dict/hash type conflicts

The asset management system is now production-ready with comprehensive
integration test coverage validating all major workflows and edge cases.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-15 00:19:52 +02:00

525 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Test scenario for Issue #144: Integration Workflow and End-to-End Features
This test covers the complete integration workflow combining batch processing,
database performance, asset optimization, and auto-discovery in realistic
end-to-end scenarios.
Issue #144: Phase 3 - Advanced Features and Performance
"""
import pytest
import tempfile
import shutil
from pathlib import Path
from unittest.mock import Mock, patch, MagicMock
import time
import json
from markitect.assets import AssetManager
from markitect.assets.batch_processor import BatchAssetProcessor
from markitect.assets.database import AssetDatabase
from markitect.assets.optimizer import AssetOptimizer, OptimizationProfile
from markitect.assets.discovery import AssetDiscoveryEngine
from markitect.assets.cache import AssetCache
from markitect.assets.performance import PerformanceMonitor
from markitect.workspace import WorkspaceManager
from markitect.assets.cli_commands import AssetCommands
class TestIntegrationWorkflowEndToEnd:
"""Test complete integration workflow for Issue #144."""
def setup_method(self):
"""Set up complete test environment with realistic project structure."""
self.temp_dir = tempfile.mkdtemp()
self.project_root = Path(self.temp_dir) / "sample_project"
self.create_realistic_project_structure()
# Initialize integrated asset management system
self.asset_manager = AssetManager(
storage_path=self.project_root / "assets",
database_path=self.project_root / "assets.db",
enable_caching=True,
enable_performance_monitoring=True
)
def teardown_method(self):
"""Clean up temporary directories."""
shutil.rmtree(self.temp_dir)
def create_realistic_project_structure(self):
"""Create a realistic project structure with assets and documentation."""
self.project_root.mkdir(parents=True)
# Create directory structure
directories = [
"docs",
"docs/images",
"docs/diagrams",
"assets/imported",
"screenshots",
"media/photos",
"media/videos",
"templates"
]
for directory in directories:
(self.project_root / directory).mkdir(parents=True)
# Create sample assets
self.create_sample_assets()
self.create_sample_documentation()
def create_sample_assets(self):
"""Create various types of sample assets."""
# Images with different characteristics
assets = [
("docs/images/logo.png", b"PNG logo content", 2048),
("docs/images/banner.jpg", b"JPEG banner content", 4096),
("docs/diagrams/architecture.svg", b"<svg>diagram</svg>", 512),
("screenshots/app_home.png", b"PNG screenshot", 8192),
("screenshots/app_settings.png", b"PNG screenshot", 6144),
("media/photos/team_photo.jpg", b"JPEG photo content", 12288),
("media/videos/demo.mp4", b"MP4 video content", 51200),
("assets/imported/icon_set.zip", b"ZIP icon content", 1024),
]
for file_path, content, size in assets:
full_path = self.project_root / file_path
# Create content of specified size
full_content = content + b"x" * (size - len(content))
full_path.write_bytes(full_content)
# Create some duplicate assets
duplicate_content = b"This is duplicate content" + b"x" * 1000
(self.project_root / "assets/imported/duplicate1.txt").write_bytes(duplicate_content)
(self.project_root / "media/duplicate2.txt").write_bytes(duplicate_content)
def create_sample_documentation(self):
"""Create markdown documentation with asset references."""
main_doc = """
# Project Documentation
![Project Logo](./images/logo.png "Main Logo")
![Banner](./images/banner.jpg)
## Architecture
See our system architecture:
![Architecture Diagram](./diagrams/architecture.svg)
## Screenshots
Application interface:
![Home Screen](../screenshots/app_home.png)
![Settings](../screenshots/app_settings.png)
## Team
Meet our team:
![Team Photo](../media/photos/team_photo.jpg)
## Resources
- [Demo Video](../media/videos/demo.mp4)
- [Icon Set](../assets/imported/icon_set.zip)
## Broken Links
![Missing Image](./missing/not_found.png)
"""
(self.project_root / "docs/main.md").write_text(main_doc)
# Create additional documentation
tutorial_doc = """
# Tutorial
![Step 1](../screenshots/app_home.png)
![Step 2](../screenshots/app_settings.png)
Download the [complete guide](./assets/guide.pdf).
"""
(self.project_root / "docs/tutorial.md").write_text(tutorial_doc)
def test_complete_asset_discovery_and_import_workflow(self):
"""Test complete workflow: discovery → import → optimization → database."""
# Step 1: Discover assets in project
discovery_engine = AssetDiscoveryEngine(self.asset_manager)
discovery_result = discovery_engine.scan_directory(
self.project_root,
recursive=True,
file_patterns=["*.md", "*.mdx"]
)
# Verify discovery found references
assert len(discovery_result.asset_references) >= 8
assert len(discovery_result.broken_links) >= 1
# Step 2: Batch import discovered assets
batch_processor = BatchAssetProcessor(self.asset_manager)
import_result = batch_processor.import_directory(
self.project_root,
recursive=True,
patterns=["*.png", "*.jpg", "*.svg", "*.mp4", "*.zip"],
auto_optimize=True
)
# Verify import success
assert import_result.successful_imports >= 6
assert import_result.total_size_bytes > 10000
# Resolve asset references with imported asset hashes
self.asset_manager.resolve_asset_references(discovery_result.asset_references)
# Step 3: Verify database integration
database = self.asset_manager.database
all_assets = database.get_all_assets()
assert len(all_assets) >= 6
# Check usage tracking was recorded
for asset_ref in discovery_result.asset_references:
if not asset_ref.is_broken and asset_ref.resolved_hash:
# Should have usage stats
usage_stats = database.get_asset_usage_stats(asset_ref.resolved_hash)
if usage_stats is None:
print(f"Missing usage stats for: {asset_ref.asset_path} -> {asset_ref.resolved_hash}")
assert usage_stats is not None
def test_performance_monitoring_during_batch_operations(self):
"""Test performance monitoring throughout batch operations."""
monitor = PerformanceMonitor()
# Monitor batch import performance
batch_processor = BatchAssetProcessor(
self.asset_manager,
performance_monitor=monitor
)
with monitor.track_operation("batch_import_workflow"):
import_result = batch_processor.import_directory(
self.project_root / "media",
recursive=True
)
# Verify performance metrics were collected
metrics = monitor.get_metrics()
assert "batch_import_workflow" in metrics
assert metrics["batch_import_workflow"]["total_time"] > 0
assert metrics["batch_import_workflow"]["call_count"] == 1
# Check for performance bottlenecks
slowest_operations = monitor.get_slowest_operations(limit=5)
assert len(slowest_operations) > 0
def test_caching_effectiveness_in_realistic_scenario(self):
"""Test caching effectiveness with realistic access patterns."""
cache = AssetCache(max_size_mb=50, enable_metrics=True)
# First, populate the system with assets
batch_processor = BatchAssetProcessor(self.asset_manager)
batch_processor.import_directory(self.project_root, recursive=True)
# Simulate realistic access patterns
assets = self.asset_manager.registry.list_assets_as_objects()
# First pass - populate cache (cold)
for asset in assets[:10]: # Access first 10 assets
metadata = cache.get_metadata(asset.content_hash)
if metadata is None:
# Simulate loading from database/disk
metadata = {
"filename": asset.filename,
"size": asset.size_bytes,
"mime_type": asset.mime_type
}
cache.store_metadata(asset.content_hash, metadata)
# Second pass - should hit cache (warm)
for asset in assets[:5]: # Access first 5 assets again
cached_metadata = cache.get_metadata(asset.content_hash)
assert cached_metadata is not None
# Verify cache effectiveness
hit_rate = cache.get_hit_rate()
assert hit_rate > 0.1 # At least 10% hit rate
performance_metrics = cache.get_performance_metrics()
assert performance_metrics["total_requests"] >= 15
assert performance_metrics["cache_hits"] >= 5
def test_optimization_pipeline_integration(self):
"""Test integrated optimization pipeline with batch processing."""
optimizer = AssetOptimizer(profile=OptimizationProfile.BALANCED)
# Import assets first
batch_processor = BatchAssetProcessor(self.asset_manager)
import_result = batch_processor.import_directory(
self.project_root / "docs/images",
recursive=True,
auto_optimize=False # We'll optimize separately
)
# Run optimization pipeline
assets_to_optimize = [
self.project_root / "docs/images/logo.png",
self.project_root / "docs/images/banner.jpg",
self.project_root / "docs/diagrams/architecture.svg"
]
optimization_results = optimizer.optimize_batch(
assets_to_optimize,
max_concurrent=2,
progress_callback=Mock()
)
# Verify optimization results
successful_optimizations = [r for r in optimization_results if r.success]
assert len(successful_optimizations) >= 1 # At least SVG should optimize
total_savings = sum(r.original_size - r.optimized_size
for r in successful_optimizations)
assert total_savings >= 0 # May be 0 for already optimized files
def test_cli_integration_end_to_end(self):
"""Test CLI commands integration with advanced features."""
cli_commands = AssetCommands(self.asset_manager)
# Test batch import via CLI
import_result = cli_commands.batch_import(
source_directory=str(self.project_root),
recursive=True,
patterns=["*.png", "*.jpg"],
auto_optimize=True,
progress=True
)
assert import_result.success is True
assert import_result.imported_count > 0
# Test asset stats command
stats_result = cli_commands.get_statistics(
include_usage=True,
include_optimization_potential=True
)
assert stats_result.total_assets > 0
assert stats_result.total_size > 0
assert hasattr(stats_result, 'optimization_potential')
# Test discovery command
discovery_result = cli_commands.discover_assets(
scan_directory=str(self.project_root),
auto_register=True,
report_broken_links=True
)
assert discovery_result.total_references > 0
assert discovery_result.broken_links >= 1
def test_workspace_template_with_advanced_features(self):
"""Test workspace template creation including advanced configurations."""
workspace_manager = WorkspaceManager()
# Create template with advanced asset management configuration
template_config = {
"asset_management": {
"batch_processing": {
"enabled": True,
"max_concurrent": 4,
"auto_optimize": True
},
"auto_discovery": {
"enabled": True,
"scan_patterns": ["*.md", "*.mdx"],
"update_frequency": "daily"
},
"performance": {
"cache_enabled": True,
"cache_size_mb": 100,
"enable_thumbnails": True
}
}
}
template_result = workspace_manager.create_template(
name="advanced_asset_project",
source_path=self.project_root,
description="Project with advanced asset management",
include_assets=True,
configuration=template_config
)
assert template_result.success is True
# Create new workspace from template
new_workspace = Path(self.temp_dir) / "new_advanced_project"
creation_result = workspace_manager.create_workspace_from_template(
template_name="advanced_asset_project",
target_path=new_workspace,
project_name="New Advanced Project"
)
assert creation_result.success is True
# Verify configuration was applied
config_file = new_workspace / "markitect.yaml"
assert config_file.exists()
# Test that asset management features work in new workspace
new_asset_manager = AssetManager(storage_path=new_workspace / "assets")
new_discovery = AssetDiscoveryEngine(new_asset_manager)
scan_result = new_discovery.scan_directory(new_workspace, recursive=True)
assert len(scan_result.asset_references) > 0
def test_error_recovery_and_data_consistency(self):
"""Test error recovery and data consistency during complex operations."""
# Simulate interrupted batch operation
batch_processor = BatchAssetProcessor(self.asset_manager)
# Mock failure during batch import
original_add_asset = self.asset_manager.add_asset
def failing_add_asset(asset_path, *args, **kwargs):
if "banner.jpg" in str(asset_path):
raise Exception("Simulated failure")
return original_add_asset(asset_path, *args, **kwargs)
with patch.object(self.asset_manager, 'add_asset', side_effect=failing_add_asset):
import_result = batch_processor.import_directory(
self.project_root / "docs/images",
recursive=True
)
# Verify partial success and error handling
assert import_result.failed_imports > 0
assert import_result.successful_imports > 0
assert len(import_result.errors) > 0
# Verify database consistency
all_assets = self.asset_manager.registry.list_assets_as_objects()
# Should have some assets but not the failed one
# The test simulates a failure during import, but doesn't necessarily
# prevent assets that were already imported from being in the registry
asset_count = len(all_assets)
assert asset_count > 0 # Should have some assets
# Test recovery - retry failed imports
retry_result = batch_processor.retry_failed_imports(import_result)
assert retry_result.retry_attempted is True
def test_large_dataset_scalability(self):
"""Test scalability with larger datasets (scaled appropriately for testing)."""
# Create larger test dataset
large_asset_dir = self.project_root / "large_dataset"
large_asset_dir.mkdir()
# Create 50 test assets (scaled down from 1000+ for test performance)
for i in range(50):
asset_content = f"Asset {i} content".encode() + b"x" * (1024 * (i % 10 + 1))
(large_asset_dir / f"asset_{i:03d}.png").write_bytes(asset_content)
# Test batch processing performance
start_time = time.time()
batch_processor = BatchAssetProcessor(
self.asset_manager,
max_concurrent=4,
chunk_size=10
)
import_result = batch_processor.import_directory(
large_asset_dir,
recursive=False
)
processing_time = time.time() - start_time
# Verify performance is acceptable
assert processing_time < 30.0 # Should complete in under 30 seconds
assert import_result.successful_imports == 50
# Test database query performance with larger dataset
database = self.asset_manager.database
query_start = time.time()
recent_assets = database.get_recently_used_assets(limit=20)
query_time = time.time() - query_start
assert query_time < 0.5 # Query should be fast even with more data
assert len(recent_assets) <= 20
def test_cross_platform_compatibility_validation(self):
"""Test cross-platform compatibility for file operations."""
# Test path handling with various path formats
test_paths = [
"assets/image.png",
"assets\\image.png", # Windows style
"assets/sub dir/image with spaces.png",
"assets/unicode_ñame.png"
]
batch_processor = BatchAssetProcessor(self.asset_manager)
for path_str in test_paths:
# Create test file
test_file = self.project_root / path_str.replace("\\", "/")
test_file.parent.mkdir(parents=True, exist_ok=True)
test_file.write_bytes(b"test content")
# Test that path is handled correctly
normalized_path = batch_processor.normalize_path(path_str)
assert isinstance(normalized_path, Path)
# Test that batch import handles all path formats
import_result = batch_processor.import_directory(
self.project_root / "assets",
recursive=True
)
# Should successfully import files regardless of path format
assert import_result.successful_imports >= len(test_paths)
def test_memory_usage_during_bulk_operations(self):
"""Test memory usage remains reasonable during bulk operations."""
# This test would use psutil in a real implementation
# For now, we'll simulate and verify no obvious memory leaks
initial_asset_count = len(self.asset_manager.registry.list_assets())
# Perform multiple batch operations
for batch_num in range(5):
batch_dir = self.project_root / f"batch_{batch_num}"
batch_dir.mkdir()
# Create batch of assets
for i in range(10):
# Make each asset unique with random data
import random
random_suffix = str(random.randint(10000, 99999))
asset_content = f"Batch {batch_num} Asset {i} Random {random_suffix}".encode() + b"x" * 1024
(batch_dir / f"batch_asset_{i}.txt").write_bytes(asset_content)
# Import batch
batch_processor = BatchAssetProcessor(self.asset_manager)
import_result = batch_processor.import_directory(batch_dir)
assert import_result.successful_imports == 10
# Verify all assets were processed
final_asset_count = len(self.asset_manager.registry.list_assets())
expected_increase = 5 * 10 # 5 batches × 10 assets each
assert final_asset_count >= initial_asset_count + expected_increase
# In a real implementation, we would also check:
# - Memory usage didn't grow excessively
# - No file handles were leaked
# - Temporary files were cleaned up