Files
tegwick 4ceb6cce42
Some checks failed
Test Suite / unit-tests (3.11) (push) Has been cancelled
Test Suite / unit-tests (3.12) (push) Has been cancelled
Test Suite / integration-tests (push) Has been cancelled
Test Suite / e2e-tests (push) Has been cancelled
Test Suite / performance-tests (push) Has been cancelled
Test Suite / code-quality (push) Has been cancelled
Test Suite / security-scan (push) Has been cancelled
Test Suite / test-summary (push) Has been cancelled
fix: make AssetManager registry path relative to storage_path by default
This is a robust fix for test registry isolation that addresses the root cause:
when AssetManager is created with only storage_path, the registry now defaults
to storage_path.parent/asset_registry.json instead of cwd/asset_registry.json.

Benefits:
- Tests using temp directories automatically get isolated registries
- No need to manually fix every test file
- Consistent behavior: registry stays with the asset storage
- Explicit registry_path still works for custom configurations

This makes the AssetManager behavior more intuitive and prevents test
artifacts from contaminating the production asset registry.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-24 20:08:48 +02:00

492 lines
19 KiB
Python

"""
AssetManager class for high-level asset management API coordination.
This module implements the AssetManager class that provides a high-level API
coordinating all asset operations, integration with existing markitect patterns,
error handling and logging, and configuration management integration.
"""
import logging
from pathlib import Path
from typing import Dict, List, Optional, Any, Union
from .registry import AssetRegistry
from .deduplicator import AssetDeduplicator
from .packager import MarkdownPackager
from .database import AssetDatabase
from .models import Asset
from .exceptions import AssetError, AssetManagerError
from .constants import DEFAULT_CONFIG, DEFAULT_ASSETS_DIR, DEFAULT_REGISTRY_FILENAME
class AssetManager:
"""High-level asset management coordinator integrating all asset operations."""
def __init__(self, config: Optional[Dict[str, Any]] = None,
storage_path: Optional[Union[str, Path]] = None,
registry_path: Optional[Union[str, Path]] = None,
database_path: Optional[Union[str, Path]] = None,
**kwargs):
"""Initialize AssetManager with configuration.
Args:
config: Configuration dictionary. Uses defaults if None.
storage_path: Legacy parameter for asset storage path (backward compatibility)
registry_path: Legacy parameter for registry path (backward compatibility)
database_path: Path to the database file
**kwargs: Additional legacy parameters for backward compatibility
Raises:
AssetManagerError: If initialization fails.
"""
# Handle legacy parameter support for backward compatibility
config = config or {}
if storage_path is not None or registry_path is not None or database_path is not None:
# Create config from legacy parameters
if 'assets' not in config:
config['assets'] = {}
if storage_path is not None:
config['assets']['storage_path'] = str(storage_path)
if registry_path is not None:
config['assets']['registry_path'] = str(registry_path)
if database_path is not None:
config['assets']['database_path'] = str(database_path)
self.config = self._merge_config(config)
self.logger = logging.getLogger('markitect.assets')
try:
# Extract configuration
assets_config = self.config.get('assets', {})
# Set up paths
self.storage_path = Path(
assets_config.get('storage_path', DEFAULT_ASSETS_DIR)
).resolve()
# Default registry path should be relative to storage_path, not cwd
default_registry_path = self.storage_path.parent / DEFAULT_REGISTRY_FILENAME
self.registry_path = Path(
assets_config.get('registry_path', default_registry_path)
).resolve()
self.database_path = Path(
assets_config.get('database_path', self.storage_path / "assets.db")
).resolve()
# Configuration options
self.enable_deduplication = assets_config.get('enable_deduplication', True)
self.default_conflict_resolution = assets_config.get(
'default_conflict_resolution', 'backup'
)
# Validate configuration
self._validate_configuration()
# Initialize components
self.registry = AssetRegistry(self.registry_path)
self.deduplicator = AssetDeduplicator(self.storage_path, self.registry)
self.packager = MarkdownPackager(self.registry, self.deduplicator)
self.database = AssetDatabase(self.database_path)
self.database.initialize_enhanced_schema()
self.database.create_performance_indexes()
self.logger.info(f"AssetManager initialized with storage: {self.storage_path}")
except Exception as e:
raise AssetManagerError("Failed to initialize AssetManager", cause=e)
@classmethod
def from_config_manager(cls) -> 'AssetManager':
"""Create AssetManager from ConfigurationManager.
Returns:
Initialized AssetManager instance.
"""
try:
from markitect.config_manager import ConfigurationManager
config_manager = ConfigurationManager()
config = config_manager.get_current_config()
return cls(config)
except ImportError:
# Fallback to default configuration
return cls()
except Exception as e:
raise AssetManagerError("Failed to initialize from configuration manager", cause=e)
def _merge_config(self, user_config: Dict[str, Any]) -> Dict[str, Any]:
"""Merge user configuration with defaults.
Args:
user_config: User-provided configuration.
Returns:
Merged configuration dictionary.
"""
config = {}
# Merge assets configuration
assets_config = DEFAULT_CONFIG.copy()
if 'assets' in user_config:
assets_config.update(user_config['assets'])
config['assets'] = assets_config
# Add other top-level config as-is
for key, value in user_config.items():
if key != 'assets':
config[key] = value
return config
def _validate_configuration(self) -> None:
"""Validate configuration values.
Raises:
AssetManagerError: If configuration is invalid.
"""
# Check if storage path is valid
if self.storage_path.exists() and not self.storage_path.is_dir():
raise AssetManagerError(f"Storage path exists but is not a directory: {self.storage_path}")
# Check registry path parent directory
if not self.registry_path.parent.exists():
try:
self.registry_path.parent.mkdir(parents=True, exist_ok=True)
except PermissionError:
raise AssetManagerError(f"Cannot create registry directory: {self.registry_path.parent}")
def add_asset(self, file_path: Path, description: Optional[str] = None) -> Dict[str, Any]:
"""Add asset with automatic deduplication.
Args:
file_path: Path to the asset file.
description: Optional description for the asset.
Returns:
Dictionary containing asset information and deduplication status.
Raises:
AssetError: If asset cannot be added.
"""
try:
self.logger.info(f"Adding asset: {file_path}")
# Store asset through deduplicator
result = self.deduplicator.store_asset(file_path, description)
# Log result
if result.get('deduplicated'):
self.logger.info(f"Asset deduplicated: {result['content_hash']}")
else:
self.logger.info(f"New asset stored: {result['content_hash']}")
# Add friendly information
result['description'] = description
result['added_at'] = self.registry.get_asset(result['content_hash']).get('created_at')
# Add to database (both new and deduplicated assets should be in database)
asset_info = self.registry.get_asset(result['content_hash'])
# Insert into database with proper field names using INSERT OR IGNORE for dedup safety
with self.database.transaction() as conn:
conn.execute("""
INSERT OR IGNORE INTO asset_metadata
(content_hash, filename, size_bytes, mime_type, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
""", (
result['content_hash'],
Path(asset_info['path']).name, # Extract filename
asset_info['size'], # Registry stores as 'size'
asset_info['mime_type'],
asset_info['created_at'],
asset_info['created_at']
))
# Record initial usage for the asset
self.database.record_asset_usage(result['content_hash'], str(file_path))
return result
except Exception as e:
self.logger.error(f"Failed to add asset {file_path}: {e}")
if isinstance(e, AssetError):
raise
raise AssetError(f"Failed to add asset: {e}", cause=e)
def get_asset_info(self, content_hash: str) -> Dict[str, Any]:
"""Get detailed asset information by content hash.
Args:
content_hash: SHA-256 hash of the asset content.
Returns:
Dictionary containing detailed asset information.
Raises:
AssetManagerError: If asset is not found.
"""
try:
asset_info = self.registry.get_asset(content_hash)
# Add additional information
stored_path = Path(asset_info['path'])
asset_info['file_path'] = str(stored_path)
asset_info['exists'] = stored_path.exists()
if stored_path.exists():
asset_info['actual_size'] = stored_path.stat().st_size
# Add integrity check
asset_info['integrity_valid'] = self.deduplicator.verify_asset_integrity(content_hash)
return asset_info
except Exception as e:
if "not found" in str(e).lower():
raise AssetManagerError(f"Asset not found: {content_hash}")
raise AssetManagerError(f"Failed to get asset info: {e}", cause=e)
def list_assets(self) -> List[Dict[str, Any]]:
"""List all assets with enhanced information.
Returns:
List of asset information dictionaries.
"""
try:
assets = self.registry.list_assets()
# Enhance with additional information
for asset in assets:
stored_path = Path(asset['path'])
asset['exists'] = stored_path.exists()
asset['integrity_valid'] = self.deduplicator.verify_asset_integrity(
asset['content_hash']
)
return assets
except Exception as e:
raise AssetManagerError(f"Failed to list assets: {e}", cause=e)
def list_assets_as_objects(self) -> List[Asset]:
"""List all assets as Asset objects.
This method implements the asset model migration from dict-based to object-based assets.
Returns:
List of Asset objects.
"""
try:
asset_dicts = self.list_assets()
return [Asset.from_dict(asset_dict) for asset_dict in asset_dicts]
except Exception as e:
raise AssetManagerError(f"Failed to list assets as objects: {e}", cause=e)
def asset_exists(self, content_hash: str) -> bool:
"""Check if asset exists by content hash.
Args:
content_hash: SHA-256 hash of the asset content.
Returns:
True if asset exists, False otherwise.
"""
return self.registry.asset_exists(content_hash)
def remove_asset(self, content_hash: str) -> Dict[str, Any]:
"""Remove asset by content hash.
Args:
content_hash: SHA-256 hash of the asset content.
Returns:
Dictionary with removal results.
"""
try:
self.logger.info(f"Removing asset: {content_hash}")
result = self.deduplicator.remove_stored_asset(content_hash)
self.logger.info(f"Asset removed: {content_hash}")
result['removed'] = result.get('registry_removed', False)
return result
except Exception as e:
self.logger.error(f"Failed to remove asset {content_hash}: {e}")
raise AssetManagerError(f"Failed to remove asset: {e}", cause=e)
def create_package(self, source_dir: Path, package_path: Path,
description: Optional[str] = None,
exclude_patterns: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Create document package with assets.
Args:
source_dir: Directory containing files to package.
package_path: Path for the output package file.
description: Optional package description.
exclude_patterns: File patterns to exclude from packaging.
metadata: Optional metadata to include in package.
Returns:
Dictionary containing packaging results.
"""
try:
self.logger.info(f"Creating package from {source_dir} to {package_path}")
result = self.packager.create_package(
source_dir, package_path, description, exclude_patterns, metadata
)
self.logger.info(f"Package created: {len(result['assets'])} assets processed")
return result
except Exception as e:
self.logger.error(f"Failed to create package: {e}")
raise AssetManagerError(f"Failed to create package: {e}", cause=e)
def extract_package(self, package_path: Path, extract_dir: Path,
restore_assets: bool = True) -> Dict[str, Any]:
"""Extract package to workspace with asset restoration.
Args:
package_path: Path to the package file.
extract_dir: Directory to extract files to.
restore_assets: Whether to restore asset links.
Returns:
Dictionary containing extraction results.
"""
try:
self.logger.info(f"Extracting package {package_path} to {extract_dir}")
result = self.packager.extract_package(
package_path, extract_dir, restore_symlinks=restore_assets
)
self.logger.info(f"Package extracted: {result['extracted_files']} files")
return result
except Exception as e:
self.logger.error(f"Failed to extract package: {e}")
raise AssetManagerError(f"Failed to extract package: {e}", cause=e)
def get_storage_stats(self) -> Dict[str, Any]:
"""Get asset storage statistics.
Returns:
Dictionary containing storage statistics.
"""
try:
stats = self.deduplicator.list_stored_assets()
# Add additional statistics
stats['storage_path'] = str(self.storage_path)
stats['registry_path'] = str(self.registry_path)
stats['deduplication_enabled'] = self.enable_deduplication
# Calculate storage efficiency (if deduplication is enabled)
if stats['total_assets'] > 0:
total_files = len(self.list_assets())
if total_files > stats['total_assets']:
stats['deduplication_ratio'] = stats['total_assets'] / total_files
stats['space_saved_ratio'] = 1 - stats['deduplication_ratio']
return stats
except Exception as e:
raise AssetManagerError(f"Failed to get storage statistics: {e}", cause=e)
def verify_integrity(self, content_hash: Optional[str] = None) -> Dict[str, Any]:
"""Verify integrity of assets.
Args:
content_hash: Specific asset to verify, or None for all assets.
Returns:
Dictionary containing integrity check results.
"""
try:
if content_hash:
# Verify specific asset
valid = self.deduplicator.verify_asset_integrity(content_hash)
return {
'content_hash': content_hash,
'valid': valid,
'checked': 1
}
else:
# Verify all assets
assets = self.list_assets()
valid_count = 0
invalid_assets = []
for asset in assets:
hash_val = asset['content_hash']
if self.deduplicator.verify_asset_integrity(hash_val):
valid_count += 1
else:
invalid_assets.append(hash_val)
return {
'total_checked': len(assets),
'valid_assets': valid_count,
'invalid_assets': invalid_assets,
'integrity_valid': len(invalid_assets) == 0
}
except Exception as e:
raise AssetManagerError(f"Failed to verify integrity: {e}", cause=e)
def cleanup_orphaned_assets(self) -> Dict[str, Any]:
"""Clean up orphaned assets (in storage but not in registry).
Returns:
Dictionary containing cleanup results.
"""
try:
self.logger.info("Starting orphaned asset cleanup")
# This would involve scanning storage directory and comparing with registry
# For minimal implementation, return placeholder
return {
'orphaned_files_found': 0,
'orphaned_files_removed': 0,
'space_reclaimed_bytes': 0
}
except Exception as e:
raise AssetManagerError(f"Failed to cleanup orphaned assets: {e}", cause=e)
def resolve_asset_references(self, asset_references: List) -> None:
"""Update asset references with resolved hashes for imported assets.
Args:
asset_references: List of AssetReference objects to update
"""
resolved_count = 0
for ref in asset_references:
if not ref.is_broken:
# First resolve the path from relative to absolute
if not ref.resolved_path and ref.asset_path:
# Convert relative path to absolute based on source file location
source_dir = ref.source_file.parent
potential_path = (source_dir / ref.asset_path).resolve()
if potential_path.exists():
ref.resolved_path = potential_path
if ref.resolved_path:
# Try to find the asset hash by checking if file was imported
try:
content_hash = self.registry.generate_content_hash(ref.resolved_path)
if self.registry.asset_exists(content_hash):
ref.resolved_hash = content_hash
# Also record usage for this reference
self.database.record_asset_usage(content_hash, str(ref.source_file))
resolved_count += 1
except Exception as e:
self.logger.warning(f"Failed to resolve reference {ref.asset_path}: {e}")
self.logger.info(f"Resolved {resolved_count} asset references")