Some checks failed
Test Suite / unit-tests (3.11) (push) Has been cancelled
Test Suite / unit-tests (3.12) (push) Has been cancelled
Test Suite / integration-tests (push) Has been cancelled
Test Suite / e2e-tests (push) Has been cancelled
Test Suite / performance-tests (push) Has been cancelled
Test Suite / code-quality (push) Has been cancelled
Test Suite / security-scan (push) Has been cancelled
Test Suite / test-summary (push) Has been cancelled
This is a robust fix for test registry isolation that addresses the root cause: when AssetManager is created with only storage_path, the registry now defaults to storage_path.parent/asset_registry.json instead of cwd/asset_registry.json. Benefits: - Tests using temp directories automatically get isolated registries - No need to manually fix every test file - Consistent behavior: registry stays with the asset storage - Explicit registry_path still works for custom configurations This makes the AssetManager behavior more intuitive and prevents test artifacts from contaminating the production asset registry. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
492 lines
19 KiB
Python
492 lines
19 KiB
Python
"""
|
|
AssetManager class for high-level asset management API coordination.
|
|
|
|
This module implements the AssetManager class that provides a high-level API
|
|
coordinating all asset operations, integration with existing markitect patterns,
|
|
error handling and logging, and configuration management integration.
|
|
"""
|
|
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Any, Union
|
|
|
|
from .registry import AssetRegistry
|
|
from .deduplicator import AssetDeduplicator
|
|
from .packager import MarkdownPackager
|
|
from .database import AssetDatabase
|
|
from .models import Asset
|
|
from .exceptions import AssetError, AssetManagerError
|
|
from .constants import DEFAULT_CONFIG, DEFAULT_ASSETS_DIR, DEFAULT_REGISTRY_FILENAME
|
|
|
|
|
|
class AssetManager:
|
|
"""High-level asset management coordinator integrating all asset operations."""
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None,
|
|
storage_path: Optional[Union[str, Path]] = None,
|
|
registry_path: Optional[Union[str, Path]] = None,
|
|
database_path: Optional[Union[str, Path]] = None,
|
|
**kwargs):
|
|
"""Initialize AssetManager with configuration.
|
|
|
|
Args:
|
|
config: Configuration dictionary. Uses defaults if None.
|
|
storage_path: Legacy parameter for asset storage path (backward compatibility)
|
|
registry_path: Legacy parameter for registry path (backward compatibility)
|
|
database_path: Path to the database file
|
|
**kwargs: Additional legacy parameters for backward compatibility
|
|
|
|
Raises:
|
|
AssetManagerError: If initialization fails.
|
|
"""
|
|
# Handle legacy parameter support for backward compatibility
|
|
config = config or {}
|
|
if storage_path is not None or registry_path is not None or database_path is not None:
|
|
# Create config from legacy parameters
|
|
if 'assets' not in config:
|
|
config['assets'] = {}
|
|
if storage_path is not None:
|
|
config['assets']['storage_path'] = str(storage_path)
|
|
if registry_path is not None:
|
|
config['assets']['registry_path'] = str(registry_path)
|
|
if database_path is not None:
|
|
config['assets']['database_path'] = str(database_path)
|
|
|
|
self.config = self._merge_config(config)
|
|
self.logger = logging.getLogger('markitect.assets')
|
|
|
|
try:
|
|
# Extract configuration
|
|
assets_config = self.config.get('assets', {})
|
|
|
|
# Set up paths
|
|
self.storage_path = Path(
|
|
assets_config.get('storage_path', DEFAULT_ASSETS_DIR)
|
|
).resolve()
|
|
|
|
# Default registry path should be relative to storage_path, not cwd
|
|
default_registry_path = self.storage_path.parent / DEFAULT_REGISTRY_FILENAME
|
|
self.registry_path = Path(
|
|
assets_config.get('registry_path', default_registry_path)
|
|
).resolve()
|
|
|
|
self.database_path = Path(
|
|
assets_config.get('database_path', self.storage_path / "assets.db")
|
|
).resolve()
|
|
|
|
# Configuration options
|
|
self.enable_deduplication = assets_config.get('enable_deduplication', True)
|
|
self.default_conflict_resolution = assets_config.get(
|
|
'default_conflict_resolution', 'backup'
|
|
)
|
|
|
|
# Validate configuration
|
|
self._validate_configuration()
|
|
|
|
# Initialize components
|
|
self.registry = AssetRegistry(self.registry_path)
|
|
self.deduplicator = AssetDeduplicator(self.storage_path, self.registry)
|
|
self.packager = MarkdownPackager(self.registry, self.deduplicator)
|
|
self.database = AssetDatabase(self.database_path)
|
|
self.database.initialize_enhanced_schema()
|
|
self.database.create_performance_indexes()
|
|
|
|
self.logger.info(f"AssetManager initialized with storage: {self.storage_path}")
|
|
|
|
except Exception as e:
|
|
raise AssetManagerError("Failed to initialize AssetManager", cause=e)
|
|
|
|
@classmethod
|
|
def from_config_manager(cls) -> 'AssetManager':
|
|
"""Create AssetManager from ConfigurationManager.
|
|
|
|
Returns:
|
|
Initialized AssetManager instance.
|
|
"""
|
|
try:
|
|
from markitect.config_manager import ConfigurationManager
|
|
config_manager = ConfigurationManager()
|
|
config = config_manager.get_current_config()
|
|
return cls(config)
|
|
except ImportError:
|
|
# Fallback to default configuration
|
|
return cls()
|
|
except Exception as e:
|
|
raise AssetManagerError("Failed to initialize from configuration manager", cause=e)
|
|
|
|
def _merge_config(self, user_config: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Merge user configuration with defaults.
|
|
|
|
Args:
|
|
user_config: User-provided configuration.
|
|
|
|
Returns:
|
|
Merged configuration dictionary.
|
|
"""
|
|
config = {}
|
|
|
|
# Merge assets configuration
|
|
assets_config = DEFAULT_CONFIG.copy()
|
|
if 'assets' in user_config:
|
|
assets_config.update(user_config['assets'])
|
|
|
|
config['assets'] = assets_config
|
|
|
|
# Add other top-level config as-is
|
|
for key, value in user_config.items():
|
|
if key != 'assets':
|
|
config[key] = value
|
|
|
|
return config
|
|
|
|
def _validate_configuration(self) -> None:
|
|
"""Validate configuration values.
|
|
|
|
Raises:
|
|
AssetManagerError: If configuration is invalid.
|
|
"""
|
|
# Check if storage path is valid
|
|
if self.storage_path.exists() and not self.storage_path.is_dir():
|
|
raise AssetManagerError(f"Storage path exists but is not a directory: {self.storage_path}")
|
|
|
|
# Check registry path parent directory
|
|
if not self.registry_path.parent.exists():
|
|
try:
|
|
self.registry_path.parent.mkdir(parents=True, exist_ok=True)
|
|
except PermissionError:
|
|
raise AssetManagerError(f"Cannot create registry directory: {self.registry_path.parent}")
|
|
|
|
def add_asset(self, file_path: Path, description: Optional[str] = None) -> Dict[str, Any]:
|
|
"""Add asset with automatic deduplication.
|
|
|
|
Args:
|
|
file_path: Path to the asset file.
|
|
description: Optional description for the asset.
|
|
|
|
Returns:
|
|
Dictionary containing asset information and deduplication status.
|
|
|
|
Raises:
|
|
AssetError: If asset cannot be added.
|
|
"""
|
|
try:
|
|
self.logger.info(f"Adding asset: {file_path}")
|
|
|
|
# Store asset through deduplicator
|
|
result = self.deduplicator.store_asset(file_path, description)
|
|
|
|
# Log result
|
|
if result.get('deduplicated'):
|
|
self.logger.info(f"Asset deduplicated: {result['content_hash']}")
|
|
else:
|
|
self.logger.info(f"New asset stored: {result['content_hash']}")
|
|
|
|
# Add friendly information
|
|
result['description'] = description
|
|
result['added_at'] = self.registry.get_asset(result['content_hash']).get('created_at')
|
|
|
|
# Add to database (both new and deduplicated assets should be in database)
|
|
asset_info = self.registry.get_asset(result['content_hash'])
|
|
# Insert into database with proper field names using INSERT OR IGNORE for dedup safety
|
|
with self.database.transaction() as conn:
|
|
conn.execute("""
|
|
INSERT OR IGNORE INTO asset_metadata
|
|
(content_hash, filename, size_bytes, mime_type, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
result['content_hash'],
|
|
Path(asset_info['path']).name, # Extract filename
|
|
asset_info['size'], # Registry stores as 'size'
|
|
asset_info['mime_type'],
|
|
asset_info['created_at'],
|
|
asset_info['created_at']
|
|
))
|
|
|
|
# Record initial usage for the asset
|
|
self.database.record_asset_usage(result['content_hash'], str(file_path))
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to add asset {file_path}: {e}")
|
|
if isinstance(e, AssetError):
|
|
raise
|
|
raise AssetError(f"Failed to add asset: {e}", cause=e)
|
|
|
|
def get_asset_info(self, content_hash: str) -> Dict[str, Any]:
|
|
"""Get detailed asset information by content hash.
|
|
|
|
Args:
|
|
content_hash: SHA-256 hash of the asset content.
|
|
|
|
Returns:
|
|
Dictionary containing detailed asset information.
|
|
|
|
Raises:
|
|
AssetManagerError: If asset is not found.
|
|
"""
|
|
try:
|
|
asset_info = self.registry.get_asset(content_hash)
|
|
|
|
# Add additional information
|
|
stored_path = Path(asset_info['path'])
|
|
asset_info['file_path'] = str(stored_path)
|
|
asset_info['exists'] = stored_path.exists()
|
|
|
|
if stored_path.exists():
|
|
asset_info['actual_size'] = stored_path.stat().st_size
|
|
|
|
# Add integrity check
|
|
asset_info['integrity_valid'] = self.deduplicator.verify_asset_integrity(content_hash)
|
|
|
|
return asset_info
|
|
|
|
except Exception as e:
|
|
if "not found" in str(e).lower():
|
|
raise AssetManagerError(f"Asset not found: {content_hash}")
|
|
raise AssetManagerError(f"Failed to get asset info: {e}", cause=e)
|
|
|
|
def list_assets(self) -> List[Dict[str, Any]]:
|
|
"""List all assets with enhanced information.
|
|
|
|
Returns:
|
|
List of asset information dictionaries.
|
|
"""
|
|
try:
|
|
assets = self.registry.list_assets()
|
|
|
|
# Enhance with additional information
|
|
for asset in assets:
|
|
stored_path = Path(asset['path'])
|
|
asset['exists'] = stored_path.exists()
|
|
asset['integrity_valid'] = self.deduplicator.verify_asset_integrity(
|
|
asset['content_hash']
|
|
)
|
|
|
|
return assets
|
|
|
|
except Exception as e:
|
|
raise AssetManagerError(f"Failed to list assets: {e}", cause=e)
|
|
|
|
def list_assets_as_objects(self) -> List[Asset]:
|
|
"""List all assets as Asset objects.
|
|
|
|
This method implements the asset model migration from dict-based to object-based assets.
|
|
|
|
Returns:
|
|
List of Asset objects.
|
|
"""
|
|
try:
|
|
asset_dicts = self.list_assets()
|
|
return [Asset.from_dict(asset_dict) for asset_dict in asset_dicts]
|
|
except Exception as e:
|
|
raise AssetManagerError(f"Failed to list assets as objects: {e}", cause=e)
|
|
|
|
def asset_exists(self, content_hash: str) -> bool:
|
|
"""Check if asset exists by content hash.
|
|
|
|
Args:
|
|
content_hash: SHA-256 hash of the asset content.
|
|
|
|
Returns:
|
|
True if asset exists, False otherwise.
|
|
"""
|
|
return self.registry.asset_exists(content_hash)
|
|
|
|
def remove_asset(self, content_hash: str) -> Dict[str, Any]:
|
|
"""Remove asset by content hash.
|
|
|
|
Args:
|
|
content_hash: SHA-256 hash of the asset content.
|
|
|
|
Returns:
|
|
Dictionary with removal results.
|
|
"""
|
|
try:
|
|
self.logger.info(f"Removing asset: {content_hash}")
|
|
|
|
result = self.deduplicator.remove_stored_asset(content_hash)
|
|
|
|
self.logger.info(f"Asset removed: {content_hash}")
|
|
result['removed'] = result.get('registry_removed', False)
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to remove asset {content_hash}: {e}")
|
|
raise AssetManagerError(f"Failed to remove asset: {e}", cause=e)
|
|
|
|
def create_package(self, source_dir: Path, package_path: Path,
|
|
description: Optional[str] = None,
|
|
exclude_patterns: Optional[List[str]] = None,
|
|
metadata: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
|
|
"""Create document package with assets.
|
|
|
|
Args:
|
|
source_dir: Directory containing files to package.
|
|
package_path: Path for the output package file.
|
|
description: Optional package description.
|
|
exclude_patterns: File patterns to exclude from packaging.
|
|
metadata: Optional metadata to include in package.
|
|
|
|
Returns:
|
|
Dictionary containing packaging results.
|
|
"""
|
|
try:
|
|
self.logger.info(f"Creating package from {source_dir} to {package_path}")
|
|
|
|
result = self.packager.create_package(
|
|
source_dir, package_path, description, exclude_patterns, metadata
|
|
)
|
|
|
|
self.logger.info(f"Package created: {len(result['assets'])} assets processed")
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to create package: {e}")
|
|
raise AssetManagerError(f"Failed to create package: {e}", cause=e)
|
|
|
|
def extract_package(self, package_path: Path, extract_dir: Path,
|
|
restore_assets: bool = True) -> Dict[str, Any]:
|
|
"""Extract package to workspace with asset restoration.
|
|
|
|
Args:
|
|
package_path: Path to the package file.
|
|
extract_dir: Directory to extract files to.
|
|
restore_assets: Whether to restore asset links.
|
|
|
|
Returns:
|
|
Dictionary containing extraction results.
|
|
"""
|
|
try:
|
|
self.logger.info(f"Extracting package {package_path} to {extract_dir}")
|
|
|
|
result = self.packager.extract_package(
|
|
package_path, extract_dir, restore_symlinks=restore_assets
|
|
)
|
|
|
|
self.logger.info(f"Package extracted: {result['extracted_files']} files")
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to extract package: {e}")
|
|
raise AssetManagerError(f"Failed to extract package: {e}", cause=e)
|
|
|
|
def get_storage_stats(self) -> Dict[str, Any]:
|
|
"""Get asset storage statistics.
|
|
|
|
Returns:
|
|
Dictionary containing storage statistics.
|
|
"""
|
|
try:
|
|
stats = self.deduplicator.list_stored_assets()
|
|
|
|
# Add additional statistics
|
|
stats['storage_path'] = str(self.storage_path)
|
|
stats['registry_path'] = str(self.registry_path)
|
|
stats['deduplication_enabled'] = self.enable_deduplication
|
|
|
|
# Calculate storage efficiency (if deduplication is enabled)
|
|
if stats['total_assets'] > 0:
|
|
total_files = len(self.list_assets())
|
|
if total_files > stats['total_assets']:
|
|
stats['deduplication_ratio'] = stats['total_assets'] / total_files
|
|
stats['space_saved_ratio'] = 1 - stats['deduplication_ratio']
|
|
|
|
return stats
|
|
|
|
except Exception as e:
|
|
raise AssetManagerError(f"Failed to get storage statistics: {e}", cause=e)
|
|
|
|
def verify_integrity(self, content_hash: Optional[str] = None) -> Dict[str, Any]:
|
|
"""Verify integrity of assets.
|
|
|
|
Args:
|
|
content_hash: Specific asset to verify, or None for all assets.
|
|
|
|
Returns:
|
|
Dictionary containing integrity check results.
|
|
"""
|
|
try:
|
|
if content_hash:
|
|
# Verify specific asset
|
|
valid = self.deduplicator.verify_asset_integrity(content_hash)
|
|
return {
|
|
'content_hash': content_hash,
|
|
'valid': valid,
|
|
'checked': 1
|
|
}
|
|
else:
|
|
# Verify all assets
|
|
assets = self.list_assets()
|
|
valid_count = 0
|
|
invalid_assets = []
|
|
|
|
for asset in assets:
|
|
hash_val = asset['content_hash']
|
|
if self.deduplicator.verify_asset_integrity(hash_val):
|
|
valid_count += 1
|
|
else:
|
|
invalid_assets.append(hash_val)
|
|
|
|
return {
|
|
'total_checked': len(assets),
|
|
'valid_assets': valid_count,
|
|
'invalid_assets': invalid_assets,
|
|
'integrity_valid': len(invalid_assets) == 0
|
|
}
|
|
|
|
except Exception as e:
|
|
raise AssetManagerError(f"Failed to verify integrity: {e}", cause=e)
|
|
|
|
def cleanup_orphaned_assets(self) -> Dict[str, Any]:
|
|
"""Clean up orphaned assets (in storage but not in registry).
|
|
|
|
Returns:
|
|
Dictionary containing cleanup results.
|
|
"""
|
|
try:
|
|
self.logger.info("Starting orphaned asset cleanup")
|
|
|
|
# This would involve scanning storage directory and comparing with registry
|
|
# For minimal implementation, return placeholder
|
|
return {
|
|
'orphaned_files_found': 0,
|
|
'orphaned_files_removed': 0,
|
|
'space_reclaimed_bytes': 0
|
|
}
|
|
|
|
except Exception as e:
|
|
raise AssetManagerError(f"Failed to cleanup orphaned assets: {e}", cause=e)
|
|
|
|
def resolve_asset_references(self, asset_references: List) -> None:
|
|
"""Update asset references with resolved hashes for imported assets.
|
|
|
|
Args:
|
|
asset_references: List of AssetReference objects to update
|
|
"""
|
|
resolved_count = 0
|
|
for ref in asset_references:
|
|
if not ref.is_broken:
|
|
# First resolve the path from relative to absolute
|
|
if not ref.resolved_path and ref.asset_path:
|
|
# Convert relative path to absolute based on source file location
|
|
source_dir = ref.source_file.parent
|
|
potential_path = (source_dir / ref.asset_path).resolve()
|
|
if potential_path.exists():
|
|
ref.resolved_path = potential_path
|
|
|
|
if ref.resolved_path:
|
|
# Try to find the asset hash by checking if file was imported
|
|
try:
|
|
content_hash = self.registry.generate_content_hash(ref.resolved_path)
|
|
if self.registry.asset_exists(content_hash):
|
|
ref.resolved_hash = content_hash
|
|
# Also record usage for this reference
|
|
self.database.record_asset_usage(content_hash, str(ref.source_file))
|
|
resolved_count += 1
|
|
except Exception as e:
|
|
self.logger.warning(f"Failed to resolve reference {ref.asset_path}: {e}")
|
|
self.logger.info(f"Resolved {resolved_count} asset references") |