Files
markitect-main/markitect/assets/registry.py
tegwick 2e49072d41 feat: complete core asset management system with database integration
- Add enhanced AssetManager with database integration and usage tracking
- Implement Asset model with from_dict/to_dict conversion methods
- Add resolve_asset_references() for linking discovered assets to imports
- Integrate AssetDatabase with enhanced schema and performance indexes
- Fix database schema constraints and test compatibility issues
- Add list_assets_as_objects() method for dict-to-object migration
- Resolve 91% of asset management tests (51/56 passing)

Key features:
* Content-addressable asset storage with deduplication
* Database-backed usage statistics and processing logs
* Asset reference resolution from markdown files
* Enhanced performance with indexing and caching
* Object-oriented Asset model with backwards compatibility

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-14 23:42:42 +02:00

276 lines
9.1 KiB
Python

"""
AssetRegistry class for JSON-based asset metadata management.
This module implements the AssetRegistry class that provides JSON-based persistence
for asset metadata, SHA-256 content hashing, MIME type detection, and thread-safe operations.
"""
import json
import hashlib
import mimetypes
import threading
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Union, Any
from .exceptions import AssetError, RegistryError
from .constants import DEFAULT_REGISTRY_FILENAME, HASH_ALGORITHM
class AssetRegistry:
"""JSON-based asset registry for metadata persistence and content hashing."""
def __init__(self, registry_path: Optional[Path] = None):
"""Initialize AssetRegistry with registry file path.
Args:
registry_path: Path to the JSON registry file. If None, uses default.
Raises:
RegistryError: If registry path is invalid or inaccessible.
"""
if registry_path is None:
registry_path = Path.cwd() / DEFAULT_REGISTRY_FILENAME
self.registry_path = Path(registry_path)
self._lock = threading.Lock()
self._data = {"assets": {}}
# Create registry file if it doesn't exist or load existing
try:
self._initialize_registry()
except Exception as e:
raise RegistryError(f"Failed to initialize registry at {registry_path}", cause=e)
def _initialize_registry(self) -> None:
"""Initialize or load the registry file."""
try:
if self.registry_path.exists():
# Load existing registry
with open(self.registry_path, 'r') as f:
content = f.read().strip()
if content:
self._data = json.loads(content)
# Ensure assets key exists
if "assets" not in self._data:
self._data["assets"] = {}
else:
# Empty file, use default structure
self._data = {"assets": {}}
else:
# Create new registry file
self._save_registry()
except json.JSONDecodeError:
# Handle corrupted JSON - start fresh
self._data = {"assets": {}}
self._save_registry()
except PermissionError:
raise RegistryError(f"Permission denied accessing registry at {self.registry_path}")
def _save_registry(self) -> None:
"""Save the current registry data to file."""
try:
# Ensure parent directory exists
self.registry_path.parent.mkdir(parents=True, exist_ok=True)
# Write with atomic operation (write to temp file, then rename)
temp_path = self.registry_path.with_suffix('.tmp')
with open(temp_path, 'w') as f:
json.dump(self._data, f, indent=2)
temp_path.replace(self.registry_path)
except Exception as e:
raise RegistryError(f"Failed to save registry to {self.registry_path}", cause=e)
def generate_content_hash(self, source: Union[Path, bytes]) -> str:
"""Generate SHA-256 content hash from file or bytes.
Args:
source: File path or byte content to hash.
Returns:
Hex string of SHA-256 hash.
Raises:
AssetError: If file cannot be read or hashing fails.
"""
try:
hasher = hashlib.sha256()
if isinstance(source, bytes):
hasher.update(source)
else:
# Assume it's a Path
source_path = Path(source)
if not source_path.exists():
raise AssetError(f"File does not exist: {source_path}")
with open(source_path, 'rb') as f:
while chunk := f.read(8192):
hasher.update(chunk)
return hasher.hexdigest()
except Exception as e:
if isinstance(e, AssetError):
raise
raise AssetError(f"Failed to generate content hash", cause=e)
def detect_mime_type(self, file_path: Path) -> str:
"""Detect MIME type of a file.
Args:
file_path: Path to the file.
Returns:
MIME type string.
"""
mime_type, _ = mimetypes.guess_type(str(file_path))
if mime_type is None:
# Fallback to generic binary type
mime_type = "application/octet-stream"
# Try to detect some common types by reading file content
try:
with open(file_path, 'rb') as f:
header = f.read(16)
# PNG signature
if header.startswith(b'\x89PNG\r\n\x1a\n'):
mime_type = "image/png"
# Common text files
elif file_path.suffix.lower() in ['.txt', '.md']:
mime_type = "text/plain"
except Exception:
# If we can't read the file, stick with generic type
pass
return mime_type
def register_asset(self, file_path: Path, description: Optional[str] = None) -> Dict[str, Any]:
"""Register a new asset in the registry.
Args:
file_path: Path to the asset file.
description: Optional description for the asset.
Returns:
Dictionary containing asset information.
Raises:
AssetError: If file doesn't exist or registration fails.
"""
if not file_path.exists():
raise AssetError(f"Asset file does not exist: {file_path}")
try:
# Generate content hash
content_hash = self.generate_content_hash(file_path)
# Get file information
stat = file_path.stat()
mime_type = self.detect_mime_type(file_path)
asset_info = {
"path": str(file_path),
"content_hash": content_hash,
"mime_type": mime_type,
"size": stat.st_size,
"created_at": datetime.now().isoformat(),
"description": description
}
# Thread-safe registration
with self._lock:
self._data["assets"][content_hash] = asset_info
self._save_registry()
return asset_info
except Exception as e:
if isinstance(e, AssetError):
raise
raise AssetError(f"Failed to register asset {file_path}", cause=e)
def get_asset(self, content_hash: str) -> Dict[str, Any]:
"""Get asset information by content hash.
Args:
content_hash: SHA-256 hash of the asset content.
Returns:
Dictionary containing asset information.
Raises:
RegistryError: If asset is not found.
"""
with self._lock:
if content_hash not in self._data["assets"]:
raise RegistryError(f"Asset not found with hash: {content_hash}")
return self._data["assets"][content_hash].copy()
def asset_exists(self, content_hash: str) -> bool:
"""Check if asset exists in registry by hash.
Args:
content_hash: SHA-256 hash of the asset content.
Returns:
True if asset exists, False otherwise.
"""
with self._lock:
return content_hash in self._data["assets"]
def list_assets(self) -> List[Dict[str, Any]]:
"""List all registered assets.
Returns:
List of asset information dictionaries.
"""
with self._lock:
return list(self._data["assets"].values())
def list_assets_as_objects(self) -> List['Asset']:
"""List all assets as Asset objects.
Returns:
List of Asset objects.
"""
from .models import Asset
asset_dicts = self.list_assets()
return [Asset.from_dict(asset_dict) for asset_dict in asset_dicts]
def remove_asset(self, content_hash: str) -> bool:
"""Remove asset from registry by hash.
Args:
content_hash: SHA-256 hash of the asset content.
Returns:
True if asset was removed, False if not found.
"""
with self._lock:
if content_hash in self._data["assets"]:
del self._data["assets"][content_hash]
self._save_registry()
return True
return False
def update_asset_description(self, content_hash: str, description: str) -> bool:
"""Update asset description.
Args:
content_hash: SHA-256 hash of the asset content.
description: New description for the asset.
Returns:
True if asset was updated, False if not found.
"""
with self._lock:
if content_hash in self._data["assets"]:
self._data["assets"][content_hash]["description"] = description
self._data["assets"][content_hash]["updated_at"] = datetime.now().isoformat()
self._save_registry()
return True
return False