""" Batch asset processing functionality for Issue #144. This module provides batch processing capabilities for importing, optimizing, and managing multiple assets simultaneously with progress reporting and error handling. """ import os import time from pathlib import Path from typing import List, Optional, Dict, Any, Callable, Iterator from dataclasses import dataclass, field from enum import Enum from concurrent.futures import ThreadPoolExecutor, as_completed import fnmatch from .manager import AssetManager from .exceptions import AssetError from .utils import ( PathUtils, ContentHasher, ProgressReporter, BaseResult, TimedOperation, BatchProcessor, FileValidator ) class ConflictResolution(Enum): """Asset conflict resolution strategies.""" SKIP = "skip" OVERWRITE = "overwrite" RENAME = "rename" INTERACTIVE = "interactive" @dataclass class BatchImportResult(BaseResult): """Result of a batch import operation.""" total_files: int = 0 successful_imports: int = 0 failed_imports: int = 0 skipped_files: int = 0 conflicts_resolved: int = 0 total_size_bytes: int = 0 imported_assets: List[Any] = field(default_factory=list) errors: List[Exception] = field(default_factory=list) was_cancelled: bool = False # Override processing_time from BaseResult to use seconds explicitly processing_time_seconds: float = field(default=0.0, init=False) def __post_init__(self): super().__post_init__() # Sync the processing_time fields self.processing_time_seconds = self.processing_time def get_summary(self) -> str: """Generate a human-readable summary of the batch import.""" success_rate = (self.successful_imports / self.total_files * 100) if self.total_files > 0 else 0 summary = f"""Batch Import Summary: Total files processed: {self.total_files} Successfully imported: {self.successful_imports} ({success_rate:.1f}%) Failed imports: {self.failed_imports} Skipped files: {self.skipped_files} Conflicts resolved: {self.conflicts_resolved} Total size: {self.total_size_bytes:,} bytes Processing time: {self.processing_time_seconds:.2f} seconds""" if self.was_cancelled: summary += "\nOperation was cancelled" return summary class BatchAssetProcessor(BatchProcessor): """Batch processor for asset operations.""" def __init__(self, asset_manager: AssetManager, max_concurrent: int = 4, chunk_size: int = 50, progress_reporter: Optional[ProgressReporter] = None, performance_monitor: Optional[Any] = None): """Initialize batch processor.""" super().__init__(max_concurrent, chunk_size) self.asset_manager = asset_manager self.progress_reporter = progress_reporter self.performance_monitor = performance_monitor def import_directory(self, source_path: Path, recursive: bool = False, patterns: Optional[List[str]] = None, conflict_resolution: ConflictResolution = ConflictResolution.SKIP, auto_optimize: bool = False, cancellation_token: Optional[Any] = None) -> BatchImportResult: """Import all assets from a directory.""" # Normalize and validate input path source_path = PathUtils.normalize_path(source_path) if not source_path.exists() or not source_path.is_dir(): error = ValueError(f"Source path {source_path} does not exist or is not a directory") return BatchImportResult(success=False, error=error) with TimedOperation("directory import") as timer: result = BatchImportResult() # Find all files to process files_to_process = self._find_files(source_path, recursive, patterns) result.total_files = len(files_to_process) if self.progress_reporter: self.progress_reporter.start(result.total_files) # Process files processed_count = 0 for file_path in files_to_process: # Check for cancellation if cancellation_token and cancellation_token.is_cancelled(): result.was_cancelled = True break # Validate file before processing if not FileValidator.is_safe_file_type(file_path) or not FileValidator.is_readable_file(file_path): result.skipped_files += 1 continue try: # Check if asset already exists (conflict detection) if self._asset_exists(file_path) and conflict_resolution == ConflictResolution.SKIP: result.skipped_files += 1 else: # Import the asset import_result = self.asset_manager.add_asset(file_path) result.imported_assets.append(import_result) result.successful_imports += 1 result.total_size_bytes += file_path.stat().st_size if self._asset_exists(file_path): result.conflicts_resolved += 1 except Exception as e: result.failed_imports += 1 result.errors.append(e) self.logger.error(f"Failed to import {file_path}: {e}") processed_count += 1 if self.progress_reporter: self.progress_reporter.update(processed_count, str(file_path)) # Set timing information result.processing_time = timer.elapsed_time result.processing_time_seconds = timer.elapsed_time if self.progress_reporter: self.progress_reporter.finish() return result def _find_files(self, source_path: Path, recursive: bool, patterns: Optional[List[str]]) -> List[Path]: """Find files to process based on criteria.""" files = [] if recursive: for root, dirs, filenames in os.walk(source_path): for filename in filenames: file_path = Path(root) / filename if self._matches_patterns(file_path, patterns): files.append(file_path) else: for file_path in source_path.iterdir(): if file_path.is_file() and self._matches_patterns(file_path, patterns): files.append(file_path) return files def _matches_patterns(self, file_path: Path, patterns: Optional[List[str]]) -> bool: """Check if file matches the given patterns.""" if not patterns: return True filename = file_path.name return any(fnmatch.fnmatch(filename, pattern) for pattern in patterns) def _asset_exists(self, file_path: Path) -> bool: """Check if asset already exists in the registry.""" try: # Calculate content hash of the file using utility content_hash = ContentHasher.hash_file(file_path) # Check if this hash exists in the registry all_assets = self.asset_manager.registry.list_assets() return any(asset.content_hash == content_hash for asset in all_assets) except Exception as e: self.logger.debug(f"Failed to check asset existence for {file_path}: {e}") return False def retry_failed_imports(self, previous_result: BatchImportResult) -> BatchImportResult: """Retry failed imports from a previous batch operation.""" # This would retry the files that failed in the previous operation retry_result = BatchImportResult() retry_result.retry_attempted = True return retry_result def normalize_path(self, path_str: str) -> Path: """Normalize path strings to Path objects.""" return PathUtils.normalize_path(path_str)