Files
markitect-main/markitect/assets/repository.py
tegwick 567f01121e
Some checks failed
Test Suite / unit-tests (3.11) (push) Has been cancelled
Test Suite / unit-tests (3.12) (push) Has been cancelled
Test Suite / integration-tests (push) Has been cancelled
Test Suite / e2e-tests (push) Has been cancelled
Test Suite / performance-tests (push) Has been cancelled
Test Suite / code-quality (push) Has been cancelled
Test Suite / security-scan (push) Has been cancelled
Test Suite / test-summary (push) Has been cancelled
feat: complete Issue #146 final integration testing
Fixed all remaining test failures in test_issue_146_final_integration.py
achieving 100% test success rate (9/9 tests passing):

- Fixed performance monitoring metrics access patterns
- Resolved AssetManager constructor parameter handling
- Implemented missing CLI command methods (add_asset, list_assets, get_asset_info)
- Added cross-platform symlink creation method aliases
- Fixed asset deduplication content uniqueness issues
- Resolved production deployment asset removal workflows
- Fixed performance benchmark dict/hash type conflicts

The asset management system is now production-ready with comprehensive
integration test coverage validating all major workflows and edge cases.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-15 00:19:52 +02:00

208 lines
6.8 KiB
Python

"""
Repository pattern for asset storage abstraction.
This module provides clean separation between domain models and storage,
allowing for different storage backends while maintaining consistent interfaces.
"""
from abc import ABC, abstractmethod
from pathlib import Path
from typing import List, Optional, Dict, Any
import json
import threading
from datetime import datetime
from .models import Asset
class AssetRepository(ABC):
"""Abstract base class for asset storage repositories."""
@abstractmethod
def add(self, asset: Asset) -> None:
"""Add an asset to the repository."""
pass
@abstractmethod
def get_by_hash(self, content_hash: str) -> Optional[Asset]:
"""Get asset by content hash."""
pass
@abstractmethod
def list_all(self) -> List[Asset]:
"""List all assets."""
pass
@abstractmethod
def remove(self, content_hash: str) -> bool:
"""Remove asset by content hash."""
pass
@abstractmethod
def exists(self, content_hash: str) -> bool:
"""Check if asset exists."""
pass
@abstractmethod
def update(self, asset: Asset) -> None:
"""Update an existing asset."""
pass
class JsonFileRepository(AssetRepository):
"""JSON file-based asset repository implementation."""
def __init__(self, registry_path: Path):
"""Initialize with registry file path."""
self.registry_path = Path(registry_path)
self._lock = threading.RLock()
self._ensure_registry_exists()
def _ensure_registry_exists(self) -> None:
"""Ensure the registry file exists."""
if not self.registry_path.exists():
self.registry_path.parent.mkdir(parents=True, exist_ok=True)
self._save_data({"assets": {}, "metadata": {"created_at": datetime.now().isoformat()}})
def _load_data(self) -> Dict[str, Any]:
"""Load data from registry file."""
try:
with open(self.registry_path, 'r', encoding='utf-8') as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {"assets": {}, "metadata": {}}
def _save_data(self, data: Dict[str, Any]) -> None:
"""Save data to registry file."""
with open(self.registry_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
def add(self, asset: Asset) -> None:
"""Add an asset to the repository."""
with self._lock:
data = self._load_data()
data["assets"][asset.content_hash] = asset.to_dict()
self._save_data(data)
def get_by_hash(self, content_hash: str) -> Optional[Asset]:
"""Get asset by content hash."""
with self._lock:
data = self._load_data()
asset_data = data["assets"].get(content_hash)
if asset_data:
return Asset.from_dict(asset_data)
return None
def list_all(self) -> List[Asset]:
"""List all assets."""
with self._lock:
data = self._load_data()
assets = []
for asset_data in data["assets"].values():
try:
assets.append(Asset.from_dict(asset_data))
except Exception:
# Skip invalid asset data
continue
return assets
def remove(self, content_hash: str) -> bool:
"""Remove asset by content hash."""
with self._lock:
data = self._load_data()
if content_hash in data["assets"]:
del data["assets"][content_hash]
self._save_data(data)
return True
return False
def exists(self, content_hash: str) -> bool:
"""Check if asset exists."""
with self._lock:
data = self._load_data()
return content_hash in data["assets"]
def update(self, asset: Asset) -> None:
"""Update an existing asset."""
with self._lock:
data = self._load_data()
if asset.content_hash in data["assets"]:
data["assets"][asset.content_hash] = asset.to_dict()
self._save_data(data)
else:
raise ValueError(f"Asset with hash {asset.content_hash} not found")
def get_stats(self) -> Dict[str, Any]:
"""Get repository statistics."""
with self._lock:
data = self._load_data()
assets = data["assets"]
total_assets = len(assets)
total_size = sum(asset_data.get("size_bytes", 0) for asset_data in assets.values())
return {
"total_assets": total_assets,
"total_size_bytes": total_size,
"registry_path": str(self.registry_path),
"created_at": data.get("metadata", {}).get("created_at")
}
class InMemoryRepository(AssetRepository):
"""In-memory asset repository for testing."""
def __init__(self):
"""Initialize empty in-memory repository."""
self._assets: Dict[str, Asset] = {}
self._lock = threading.RLock()
def add(self, asset: Asset) -> None:
"""Add an asset to the repository."""
with self._lock:
self._assets[asset.content_hash] = asset
def get_by_hash(self, content_hash: str) -> Optional[Asset]:
"""Get asset by content hash."""
with self._lock:
return self._assets.get(content_hash)
def list_all(self) -> List[Asset]:
"""List all assets."""
with self._lock:
return list(self._assets.values())
def remove(self, content_hash: str) -> bool:
"""Remove asset by content hash."""
with self._lock:
if content_hash in self._assets:
del self._assets[content_hash]
return True
return False
def exists(self, content_hash: str) -> bool:
"""Check if asset exists."""
with self._lock:
return content_hash in self._assets
def update(self, asset: Asset) -> None:
"""Update an existing asset."""
with self._lock:
if asset.content_hash in self._assets:
self._assets[asset.content_hash] = asset
else:
raise ValueError(f"Asset with hash {asset.content_hash} not found")
def clear(self) -> None:
"""Clear all assets (for testing)."""
with self._lock:
self._assets.clear()
def get_stats(self) -> Dict[str, Any]:
"""Get repository statistics."""
with self._lock:
total_size = sum(asset.size_bytes for asset in self._assets.values())
return {
"total_assets": len(self._assets),
"total_size_bytes": total_size,
"type": "in_memory"
}