""" AssetManager class for high-level asset management API coordination. This module implements the AssetManager class that provides a high-level API coordinating all asset operations, integration with existing markitect patterns, error handling and logging, and configuration management integration. """ import logging from pathlib import Path from typing import Dict, List, Optional, Any, Union from .registry import AssetRegistry from .deduplicator import AssetDeduplicator from .packager import MarkdownPackager from .exceptions import AssetError, AssetManagerError from .constants import DEFAULT_CONFIG, DEFAULT_ASSETS_DIR, DEFAULT_REGISTRY_FILENAME class AssetManager: """High-level asset management coordinator integrating all asset operations.""" def __init__(self, config: Optional[Dict[str, Any]] = None): """Initialize AssetManager with configuration. Args: config: Configuration dictionary. Uses defaults if None. Raises: AssetManagerError: If initialization fails. """ self.config = self._merge_config(config or {}) self.logger = logging.getLogger('markitect.assets') try: # Extract configuration assets_config = self.config.get('assets', {}) # Set up paths self.storage_path = Path( assets_config.get('storage_path', DEFAULT_ASSETS_DIR) ).resolve() self.registry_path = Path( assets_config.get('registry_path', DEFAULT_REGISTRY_FILENAME) ).resolve() # Configuration options self.enable_deduplication = assets_config.get('enable_deduplication', True) self.default_conflict_resolution = assets_config.get( 'default_conflict_resolution', 'backup' ) # Validate configuration self._validate_configuration() # Initialize components self.registry = AssetRegistry(self.registry_path) self.deduplicator = AssetDeduplicator(self.storage_path, self.registry) self.packager = MarkdownPackager(self.registry, self.deduplicator) self.logger.info(f"AssetManager initialized with storage: {self.storage_path}") except Exception as e: raise AssetManagerError("Failed to initialize AssetManager", cause=e) @classmethod def from_config_manager(cls) -> 'AssetManager': """Create AssetManager from ConfigurationManager. Returns: Initialized AssetManager instance. """ try: from markitect.config_manager import ConfigurationManager config_manager = ConfigurationManager() config = config_manager.get_current_config() return cls(config) except ImportError: # Fallback to default configuration return cls() except Exception as e: raise AssetManagerError("Failed to initialize from configuration manager", cause=e) def _merge_config(self, user_config: Dict[str, Any]) -> Dict[str, Any]: """Merge user configuration with defaults. Args: user_config: User-provided configuration. Returns: Merged configuration dictionary. """ config = {} # Merge assets configuration assets_config = DEFAULT_CONFIG.copy() if 'assets' in user_config: assets_config.update(user_config['assets']) config['assets'] = assets_config # Add other top-level config as-is for key, value in user_config.items(): if key != 'assets': config[key] = value return config def _validate_configuration(self) -> None: """Validate configuration values. Raises: AssetManagerError: If configuration is invalid. """ # Check if storage path is valid if self.storage_path.exists() and not self.storage_path.is_dir(): raise AssetManagerError(f"Storage path exists but is not a directory: {self.storage_path}") # Check registry path parent directory if not self.registry_path.parent.exists(): try: self.registry_path.parent.mkdir(parents=True, exist_ok=True) except PermissionError: raise AssetManagerError(f"Cannot create registry directory: {self.registry_path.parent}") def add_asset(self, file_path: Path, description: Optional[str] = None) -> Dict[str, Any]: """Add asset with automatic deduplication. Args: file_path: Path to the asset file. description: Optional description for the asset. Returns: Dictionary containing asset information and deduplication status. Raises: AssetError: If asset cannot be added. """ try: self.logger.info(f"Adding asset: {file_path}") # Store asset through deduplicator result = self.deduplicator.store_asset(file_path, description) # Log result if result.get('deduplicated'): self.logger.info(f"Asset deduplicated: {result['content_hash']}") else: self.logger.info(f"New asset stored: {result['content_hash']}") # Add friendly information result['description'] = description result['added_at'] = self.registry.get_asset(result['content_hash']).get('created_at') return result except Exception as e: self.logger.error(f"Failed to add asset {file_path}: {e}") if isinstance(e, AssetError): raise raise AssetError(f"Failed to add asset: {e}", cause=e) def get_asset_info(self, content_hash: str) -> Dict[str, Any]: """Get detailed asset information by content hash. Args: content_hash: SHA-256 hash of the asset content. Returns: Dictionary containing detailed asset information. Raises: AssetManagerError: If asset is not found. """ try: asset_info = self.registry.get_asset(content_hash) # Add additional information stored_path = Path(asset_info['path']) asset_info['file_path'] = str(stored_path) asset_info['exists'] = stored_path.exists() if stored_path.exists(): asset_info['actual_size'] = stored_path.stat().st_size # Add integrity check asset_info['integrity_valid'] = self.deduplicator.verify_asset_integrity(content_hash) return asset_info except Exception as e: if "not found" in str(e).lower(): raise AssetManagerError(f"Asset not found: {content_hash}") raise AssetManagerError(f"Failed to get asset info: {e}", cause=e) def list_assets(self) -> List[Dict[str, Any]]: """List all assets with enhanced information. Returns: List of asset information dictionaries. """ try: assets = self.registry.list_assets() # Enhance with additional information for asset in assets: stored_path = Path(asset['path']) asset['exists'] = stored_path.exists() asset['integrity_valid'] = self.deduplicator.verify_asset_integrity( asset['content_hash'] ) return assets except Exception as e: raise AssetManagerError(f"Failed to list assets: {e}", cause=e) def asset_exists(self, content_hash: str) -> bool: """Check if asset exists by content hash. Args: content_hash: SHA-256 hash of the asset content. Returns: True if asset exists, False otherwise. """ return self.registry.asset_exists(content_hash) def remove_asset(self, content_hash: str) -> Dict[str, Any]: """Remove asset by content hash. Args: content_hash: SHA-256 hash of the asset content. Returns: Dictionary with removal results. """ try: self.logger.info(f"Removing asset: {content_hash}") result = self.deduplicator.remove_stored_asset(content_hash) self.logger.info(f"Asset removed: {content_hash}") result['removed'] = result.get('registry_removed', False) return result except Exception as e: self.logger.error(f"Failed to remove asset {content_hash}: {e}") raise AssetManagerError(f"Failed to remove asset: {e}", cause=e) def create_package(self, source_dir: Path, package_path: Path, description: Optional[str] = None, exclude_patterns: Optional[List[str]] = None, metadata: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """Create document package with assets. Args: source_dir: Directory containing files to package. package_path: Path for the output package file. description: Optional package description. exclude_patterns: File patterns to exclude from packaging. metadata: Optional metadata to include in package. Returns: Dictionary containing packaging results. """ try: self.logger.info(f"Creating package from {source_dir} to {package_path}") result = self.packager.create_package( source_dir, package_path, description, exclude_patterns, metadata ) self.logger.info(f"Package created: {len(result['assets'])} assets processed") return result except Exception as e: self.logger.error(f"Failed to create package: {e}") raise AssetManagerError(f"Failed to create package: {e}", cause=e) def extract_package(self, package_path: Path, extract_dir: Path, restore_assets: bool = True) -> Dict[str, Any]: """Extract package to workspace with asset restoration. Args: package_path: Path to the package file. extract_dir: Directory to extract files to. restore_assets: Whether to restore asset links. Returns: Dictionary containing extraction results. """ try: self.logger.info(f"Extracting package {package_path} to {extract_dir}") result = self.packager.extract_package( package_path, extract_dir, restore_symlinks=restore_assets ) self.logger.info(f"Package extracted: {result['extracted_files']} files") return result except Exception as e: self.logger.error(f"Failed to extract package: {e}") raise AssetManagerError(f"Failed to extract package: {e}", cause=e) def get_storage_stats(self) -> Dict[str, Any]: """Get asset storage statistics. Returns: Dictionary containing storage statistics. """ try: stats = self.deduplicator.list_stored_assets() # Add additional statistics stats['storage_path'] = str(self.storage_path) stats['registry_path'] = str(self.registry_path) stats['deduplication_enabled'] = self.enable_deduplication # Calculate storage efficiency (if deduplication is enabled) if stats['total_assets'] > 0: total_files = len(self.list_assets()) if total_files > stats['total_assets']: stats['deduplication_ratio'] = stats['total_assets'] / total_files stats['space_saved_ratio'] = 1 - stats['deduplication_ratio'] return stats except Exception as e: raise AssetManagerError(f"Failed to get storage statistics: {e}", cause=e) def verify_integrity(self, content_hash: Optional[str] = None) -> Dict[str, Any]: """Verify integrity of assets. Args: content_hash: Specific asset to verify, or None for all assets. Returns: Dictionary containing integrity check results. """ try: if content_hash: # Verify specific asset valid = self.deduplicator.verify_asset_integrity(content_hash) return { 'content_hash': content_hash, 'valid': valid, 'checked': 1 } else: # Verify all assets assets = self.list_assets() valid_count = 0 invalid_assets = [] for asset in assets: hash_val = asset['content_hash'] if self.deduplicator.verify_asset_integrity(hash_val): valid_count += 1 else: invalid_assets.append(hash_val) return { 'total_checked': len(assets), 'valid_assets': valid_count, 'invalid_assets': invalid_assets, 'integrity_valid': len(invalid_assets) == 0 } except Exception as e: raise AssetManagerError(f"Failed to verify integrity: {e}", cause=e) def cleanup_orphaned_assets(self) -> Dict[str, Any]: """Clean up orphaned assets (in storage but not in registry). Returns: Dictionary containing cleanup results. """ try: self.logger.info("Starting orphaned asset cleanup") # This would involve scanning storage directory and comparing with registry # For minimal implementation, return placeholder return { 'orphaned_files_found': 0, 'orphaned_files_removed': 0, 'space_reclaimed_bytes': 0 } except Exception as e: raise AssetManagerError(f"Failed to cleanup orphaned assets: {e}", cause=e)