""" Batch Processing and Recursive Operations - Issue #17 This module provides batch processing capabilities for MarkiTect, allowing users to process multiple files and directories recursively through CLI. Features: - Directory processing with recursive support - Glob pattern matching for file selection - Progress tracking with user feedback - Error handling with continuation options - Depth control for recursive operations Commands implemented: - ingest-dir: Process all Markdown files in directory - batch-process: Process files matching glob pattern - recursive operations with depth control """ import os import glob import fnmatch from pathlib import Path from typing import List, Optional, Dict, Any, Iterator, Callable from dataclasses import dataclass from enum import Enum import click class ProcessingMode(Enum): """Modes for batch processing operations.""" INGEST = "ingest" STATUS = "status" VALIDATE = "validate" GENERATE = "generate" class ErrorHandling(Enum): """Error handling strategies for batch operations.""" STOP = "stop" # Stop on first error CONTINUE = "continue" # Continue processing, collect errors SKIP = "skip" # Skip failed files, no error collection @dataclass class ProcessingResult: """Result of processing a single file.""" file_path: Path success: bool message: str error: Optional[str] = None processing_time: Optional[float] = None @dataclass class BatchResult: """Result of a batch processing operation.""" total_files: int processed: int succeeded: int failed: int skipped: int errors: List[ProcessingResult] processing_time: float class ProgressTracker: """Progress tracking for batch operations.""" def __init__(self, total: int, show_progress: bool = True): self.total = total self.processed = 0 self.succeeded = 0 self.failed = 0 self.skipped = 0 self.show_progress = show_progress def update(self, result: ProcessingResult): """Update progress with a processing result.""" self.processed += 1 if result.success: self.succeeded += 1 else: self.failed += 1 if self.show_progress: self._display_progress(result) def skip_file(self, file_path: Path, reason: str): """Mark a file as skipped.""" self.skipped += 1 if self.show_progress: click.echo(f"⚠️ Skipped {file_path}: {reason}") def _display_progress(self, result: ProcessingResult): """Display progress information.""" status = "✅" if result.success else "❌" percentage = (self.processed / self.total) * 100 click.echo(f"{status} [{self.processed}/{self.total}] ({percentage:.1f}%) {result.file_path}") if not result.success and result.error: click.echo(f" Error: {result.error}") class BatchProcessor: """Core batch processing engine.""" def __init__(self, error_handling: ErrorHandling = ErrorHandling.CONTINUE, show_progress: bool = True, max_depth: Optional[int] = None): self.error_handling = error_handling self.show_progress = show_progress self.max_depth = max_depth def find_markdown_files(self, directory: Path, pattern: str = "*.md", recursive: bool = False, depth: Optional[int] = None) -> List[Path]: """ Find markdown files in directory with pattern matching. Args: directory: Directory to search pattern: Glob pattern for file matching recursive: Whether to search recursively depth: Maximum depth for recursive search Returns: List of matching file paths """ files = [] if not directory.exists(): raise FileNotFoundError(f"Directory not found: {directory}") if not directory.is_dir(): raise NotADirectoryError(f"Path is not a directory: {directory}") if recursive: effective_depth = depth if depth is not None else self.max_depth files.extend(self._find_recursive(directory, pattern, effective_depth)) else: # Non-recursive: only current directory files.extend(self._find_in_directory(directory, pattern)) return sorted(files) def _find_recursive(self, directory: Path, pattern: str, max_depth: Optional[int]) -> List[Path]: """Find files recursively with depth control.""" files = [] def _search(current_dir: Path, current_depth: int): # Add files from current directory (if within depth limit) if max_depth is None or current_depth <= max_depth: files.extend(self._find_in_directory(current_dir, pattern)) # Recurse into subdirectories (if we haven't reached depth limit) if max_depth is None or current_depth < max_depth: try: for item in current_dir.iterdir(): if item.is_dir() and not item.name.startswith('.'): _search(item, current_depth + 1) except PermissionError: # Skip directories we can't access if self.show_progress: click.echo(f"⚠️ Permission denied: {current_dir}") _search(directory, 0) return files def _find_in_directory(self, directory: Path, pattern: str) -> List[Path]: """Find files matching pattern in a specific directory.""" files = [] try: for item in directory.iterdir(): if item.is_file() and fnmatch.fnmatch(item.name, pattern): files.append(item) except PermissionError: if self.show_progress: click.echo(f"⚠️ Permission denied: {directory}") return files def find_files_by_glob(self, glob_pattern: str) -> List[Path]: """ Find files using glob patterns. Args: glob_pattern: Glob pattern (e.g., "**/*.md", "docs/*.markdown") Returns: List of matching file paths """ matches = glob.glob(glob_pattern, recursive=True) return [Path(match) for match in matches if Path(match).is_file()] def process_files(self, files: List[Path], processor_func: Callable[[Path], ProcessingResult], operation_name: str = "Processing") -> BatchResult: """ Process a list of files with progress tracking and error handling. Args: files: List of files to process processor_func: Function to process each file operation_name: Name of the operation for progress display Returns: BatchResult with processing statistics """ import time start_time = time.time() if self.show_progress: click.echo(f"🚀 {operation_name} {len(files)} files...") tracker = ProgressTracker(len(files), self.show_progress) errors = [] for file_path in files: try: # Check if file still exists (might have been deleted during processing) if not file_path.exists(): tracker.skip_file(file_path, "File no longer exists") continue # Process the file result = processor_func(file_path) tracker.update(result) if not result.success: errors.append(result) # Handle errors based on strategy if self.error_handling == ErrorHandling.STOP: break except Exception as e: # Handle unexpected errors error_result = ProcessingResult( file_path=file_path, success=False, message=f"Unexpected error: {str(e)}", error=str(e) ) tracker.update(error_result) errors.append(error_result) if self.error_handling == ErrorHandling.STOP: break processing_time = time.time() - start_time result = BatchResult( total_files=len(files), processed=tracker.processed, succeeded=tracker.succeeded, failed=tracker.failed, skipped=tracker.skipped, errors=errors, processing_time=processing_time ) if self.show_progress: self._display_summary(result, operation_name) return result def _display_summary(self, result: BatchResult, operation_name: str): """Display batch processing summary.""" click.echo(f"\n📊 {operation_name} Summary:") click.echo(f" Total files: {result.total_files}") click.echo(f" Processed: {result.processed}") click.echo(f" Succeeded: {result.succeeded}") click.echo(f" Failed: {result.failed}") click.echo(f" Skipped: {result.skipped}") click.echo(f" Processing time: {result.processing_time:.2f}s") if result.failed > 0: click.echo(f"\n❌ {result.failed} files failed:") for error in result.errors[:10]: # Show first 10 errors click.echo(f" • {error.file_path}: {error.message}") if len(result.errors) > 10: click.echo(f" ... and {len(result.errors) - 10} more errors") def create_file_processor(config: Dict[str, Any], operation: ProcessingMode) -> Callable[[Path], ProcessingResult]: """ Create a file processor function for the specified operation. Args: config: Configuration dictionary operation: Type of processing operation Returns: Function that processes a single file and returns ProcessingResult """ import time def process_file(file_path: Path) -> ProcessingResult: """Process a single file based on the operation type.""" start_time = time.time() try: if operation == ProcessingMode.INGEST: # Ingest file into database from .database import DatabaseManager db_manager = DatabaseManager(config.get('database')) # Read file content content = file_path.read_text(encoding='utf-8') # Store in database db_manager.store_document(str(file_path), content) processing_time = time.time() - start_time return ProcessingResult( file_path=file_path, success=True, message="Ingested successfully", processing_time=processing_time ) elif operation == ProcessingMode.STATUS: # Check file status from .database import DatabaseManager db_manager = DatabaseManager(config.get('database')) try: metadata = db_manager.get_metadata(str(file_path)) message = f"Found in database (ID: {metadata.get('id', 'Unknown')})" except: message = "Not found in database" processing_time = time.time() - start_time return ProcessingResult( file_path=file_path, success=True, message=message, processing_time=processing_time ) elif operation == ProcessingMode.VALIDATE: # Validate file format/content content = file_path.read_text(encoding='utf-8') # Basic validation - check if it's valid markdown if not content.strip(): raise ValueError("File is empty") processing_time = time.time() - start_time return ProcessingResult( file_path=file_path, success=True, message="Valid markdown file", processing_time=processing_time ) else: raise ValueError(f"Unsupported operation: {operation}") except Exception as e: processing_time = time.time() - start_time return ProcessingResult( file_path=file_path, success=False, message=f"Failed: {str(e)}", error=str(e), processing_time=processing_time ) return process_file