feat: implement batch processing and recursive operations (issue #17)
Complete implementation of batch processing capabilities for MarkiTect CLI: New CLI Commands: - markitect ingest-dir: Process all markdown files in directory with recursive support - markitect batch-process: Process files matching glob patterns - markitect recursive: Recursive processing with depth control Core Features: - Sophisticated batch processing engine with progress tracking - Multiple error handling strategies (stop, continue, skip) - Recursive directory traversal with configurable depth limits - Glob pattern matching for flexible file selection - Progress feedback with detailed processing statistics - Integration with existing database and caching systems Technical Implementation: - BatchProcessor class with modular architecture - ProgressTracker for real-time user feedback - Comprehensive error handling and edge case management - Support for multiple operations (ingest, status, validate) - Depth-controlled recursive search with proper boundary handling - Permission error resilience and graceful degradation Testing: - 29 comprehensive tests covering all functionality - Edge cases: empty directories, hidden files, permission errors - CLI integration tests with mocked database operations - Depth logic validation and boundary condition testing - Error handling scenarios and recovery mechanisms All acceptance criteria fulfilled: ✅ Directory and recursive processing ✅ Glob pattern support for file selection ✅ Progress tracking and user feedback ✅ Error handling with continuation options ✅ Comprehensive test coverage 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
379
markitect/batch_processor.py
Normal file
379
markitect/batch_processor.py
Normal file
@@ -0,0 +1,379 @@
|
||||
"""
|
||||
Batch Processing and Recursive Operations - Issue #17
|
||||
|
||||
This module provides batch processing capabilities for MarkiTect, allowing
|
||||
users to process multiple files and directories recursively through CLI.
|
||||
|
||||
Features:
|
||||
- Directory processing with recursive support
|
||||
- Glob pattern matching for file selection
|
||||
- Progress tracking with user feedback
|
||||
- Error handling with continuation options
|
||||
- Depth control for recursive operations
|
||||
|
||||
Commands implemented:
|
||||
- ingest-dir: Process all Markdown files in directory
|
||||
- batch-process: Process files matching glob pattern
|
||||
- recursive operations with depth control
|
||||
"""
|
||||
|
||||
import os
|
||||
import glob
|
||||
import fnmatch
|
||||
from pathlib import Path
|
||||
from typing import List, Optional, Dict, Any, Iterator, Callable
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
import click
|
||||
|
||||
|
||||
class ProcessingMode(Enum):
|
||||
"""Modes for batch processing operations."""
|
||||
INGEST = "ingest"
|
||||
STATUS = "status"
|
||||
VALIDATE = "validate"
|
||||
GENERATE = "generate"
|
||||
|
||||
|
||||
class ErrorHandling(Enum):
|
||||
"""Error handling strategies for batch operations."""
|
||||
STOP = "stop" # Stop on first error
|
||||
CONTINUE = "continue" # Continue processing, collect errors
|
||||
SKIP = "skip" # Skip failed files, no error collection
|
||||
|
||||
|
||||
@dataclass
|
||||
class ProcessingResult:
|
||||
"""Result of processing a single file."""
|
||||
file_path: Path
|
||||
success: bool
|
||||
message: str
|
||||
error: Optional[str] = None
|
||||
processing_time: Optional[float] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class BatchResult:
|
||||
"""Result of a batch processing operation."""
|
||||
total_files: int
|
||||
processed: int
|
||||
succeeded: int
|
||||
failed: int
|
||||
skipped: int
|
||||
errors: List[ProcessingResult]
|
||||
processing_time: float
|
||||
|
||||
|
||||
class ProgressTracker:
|
||||
"""Progress tracking for batch operations."""
|
||||
|
||||
def __init__(self, total: int, show_progress: bool = True):
|
||||
self.total = total
|
||||
self.processed = 0
|
||||
self.succeeded = 0
|
||||
self.failed = 0
|
||||
self.skipped = 0
|
||||
self.show_progress = show_progress
|
||||
|
||||
def update(self, result: ProcessingResult):
|
||||
"""Update progress with a processing result."""
|
||||
self.processed += 1
|
||||
if result.success:
|
||||
self.succeeded += 1
|
||||
else:
|
||||
self.failed += 1
|
||||
|
||||
if self.show_progress:
|
||||
self._display_progress(result)
|
||||
|
||||
def skip_file(self, file_path: Path, reason: str):
|
||||
"""Mark a file as skipped."""
|
||||
self.skipped += 1
|
||||
if self.show_progress:
|
||||
click.echo(f"⚠️ Skipped {file_path}: {reason}")
|
||||
|
||||
def _display_progress(self, result: ProcessingResult):
|
||||
"""Display progress information."""
|
||||
status = "✅" if result.success else "❌"
|
||||
percentage = (self.processed / self.total) * 100
|
||||
|
||||
click.echo(f"{status} [{self.processed}/{self.total}] ({percentage:.1f}%) {result.file_path}")
|
||||
|
||||
if not result.success and result.error:
|
||||
click.echo(f" Error: {result.error}")
|
||||
|
||||
|
||||
class BatchProcessor:
|
||||
"""Core batch processing engine."""
|
||||
|
||||
def __init__(self,
|
||||
error_handling: ErrorHandling = ErrorHandling.CONTINUE,
|
||||
show_progress: bool = True,
|
||||
max_depth: Optional[int] = None):
|
||||
self.error_handling = error_handling
|
||||
self.show_progress = show_progress
|
||||
self.max_depth = max_depth
|
||||
|
||||
def find_markdown_files(self,
|
||||
directory: Path,
|
||||
pattern: str = "*.md",
|
||||
recursive: bool = False,
|
||||
depth: Optional[int] = None) -> List[Path]:
|
||||
"""
|
||||
Find markdown files in directory with pattern matching.
|
||||
|
||||
Args:
|
||||
directory: Directory to search
|
||||
pattern: Glob pattern for file matching
|
||||
recursive: Whether to search recursively
|
||||
depth: Maximum depth for recursive search
|
||||
|
||||
Returns:
|
||||
List of matching file paths
|
||||
"""
|
||||
files = []
|
||||
|
||||
if not directory.exists():
|
||||
raise FileNotFoundError(f"Directory not found: {directory}")
|
||||
|
||||
if not directory.is_dir():
|
||||
raise NotADirectoryError(f"Path is not a directory: {directory}")
|
||||
|
||||
if recursive:
|
||||
effective_depth = depth if depth is not None else self.max_depth
|
||||
files.extend(self._find_recursive(directory, pattern, effective_depth))
|
||||
else:
|
||||
# Non-recursive: only current directory
|
||||
files.extend(self._find_in_directory(directory, pattern))
|
||||
|
||||
return sorted(files)
|
||||
|
||||
def _find_recursive(self, directory: Path, pattern: str, max_depth: Optional[int]) -> List[Path]:
|
||||
"""Find files recursively with depth control."""
|
||||
files = []
|
||||
|
||||
def _search(current_dir: Path, current_depth: int):
|
||||
# Add files from current directory (if within depth limit)
|
||||
if max_depth is None or current_depth <= max_depth:
|
||||
files.extend(self._find_in_directory(current_dir, pattern))
|
||||
|
||||
# Recurse into subdirectories (if we haven't reached depth limit)
|
||||
if max_depth is None or current_depth < max_depth:
|
||||
try:
|
||||
for item in current_dir.iterdir():
|
||||
if item.is_dir() and not item.name.startswith('.'):
|
||||
_search(item, current_depth + 1)
|
||||
except PermissionError:
|
||||
# Skip directories we can't access
|
||||
if self.show_progress:
|
||||
click.echo(f"⚠️ Permission denied: {current_dir}")
|
||||
|
||||
_search(directory, 0)
|
||||
return files
|
||||
|
||||
def _find_in_directory(self, directory: Path, pattern: str) -> List[Path]:
|
||||
"""Find files matching pattern in a specific directory."""
|
||||
files = []
|
||||
|
||||
try:
|
||||
for item in directory.iterdir():
|
||||
if item.is_file() and fnmatch.fnmatch(item.name, pattern):
|
||||
files.append(item)
|
||||
except PermissionError:
|
||||
if self.show_progress:
|
||||
click.echo(f"⚠️ Permission denied: {directory}")
|
||||
|
||||
return files
|
||||
|
||||
def find_files_by_glob(self, glob_pattern: str) -> List[Path]:
|
||||
"""
|
||||
Find files using glob patterns.
|
||||
|
||||
Args:
|
||||
glob_pattern: Glob pattern (e.g., "**/*.md", "docs/*.markdown")
|
||||
|
||||
Returns:
|
||||
List of matching file paths
|
||||
"""
|
||||
matches = glob.glob(glob_pattern, recursive=True)
|
||||
return [Path(match) for match in matches if Path(match).is_file()]
|
||||
|
||||
def process_files(self,
|
||||
files: List[Path],
|
||||
processor_func: Callable[[Path], ProcessingResult],
|
||||
operation_name: str = "Processing") -> BatchResult:
|
||||
"""
|
||||
Process a list of files with progress tracking and error handling.
|
||||
|
||||
Args:
|
||||
files: List of files to process
|
||||
processor_func: Function to process each file
|
||||
operation_name: Name of the operation for progress display
|
||||
|
||||
Returns:
|
||||
BatchResult with processing statistics
|
||||
"""
|
||||
import time
|
||||
start_time = time.time()
|
||||
|
||||
if self.show_progress:
|
||||
click.echo(f"🚀 {operation_name} {len(files)} files...")
|
||||
|
||||
tracker = ProgressTracker(len(files), self.show_progress)
|
||||
errors = []
|
||||
|
||||
for file_path in files:
|
||||
try:
|
||||
# Check if file still exists (might have been deleted during processing)
|
||||
if not file_path.exists():
|
||||
tracker.skip_file(file_path, "File no longer exists")
|
||||
continue
|
||||
|
||||
# Process the file
|
||||
result = processor_func(file_path)
|
||||
tracker.update(result)
|
||||
|
||||
if not result.success:
|
||||
errors.append(result)
|
||||
|
||||
# Handle errors based on strategy
|
||||
if self.error_handling == ErrorHandling.STOP:
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
# Handle unexpected errors
|
||||
error_result = ProcessingResult(
|
||||
file_path=file_path,
|
||||
success=False,
|
||||
message=f"Unexpected error: {str(e)}",
|
||||
error=str(e)
|
||||
)
|
||||
tracker.update(error_result)
|
||||
errors.append(error_result)
|
||||
|
||||
if self.error_handling == ErrorHandling.STOP:
|
||||
break
|
||||
|
||||
processing_time = time.time() - start_time
|
||||
|
||||
result = BatchResult(
|
||||
total_files=len(files),
|
||||
processed=tracker.processed,
|
||||
succeeded=tracker.succeeded,
|
||||
failed=tracker.failed,
|
||||
skipped=tracker.skipped,
|
||||
errors=errors,
|
||||
processing_time=processing_time
|
||||
)
|
||||
|
||||
if self.show_progress:
|
||||
self._display_summary(result, operation_name)
|
||||
|
||||
return result
|
||||
|
||||
def _display_summary(self, result: BatchResult, operation_name: str):
|
||||
"""Display batch processing summary."""
|
||||
click.echo(f"\n📊 {operation_name} Summary:")
|
||||
click.echo(f" Total files: {result.total_files}")
|
||||
click.echo(f" Processed: {result.processed}")
|
||||
click.echo(f" Succeeded: {result.succeeded}")
|
||||
click.echo(f" Failed: {result.failed}")
|
||||
click.echo(f" Skipped: {result.skipped}")
|
||||
click.echo(f" Processing time: {result.processing_time:.2f}s")
|
||||
|
||||
if result.failed > 0:
|
||||
click.echo(f"\n❌ {result.failed} files failed:")
|
||||
for error in result.errors[:10]: # Show first 10 errors
|
||||
click.echo(f" • {error.file_path}: {error.message}")
|
||||
|
||||
if len(result.errors) > 10:
|
||||
click.echo(f" ... and {len(result.errors) - 10} more errors")
|
||||
|
||||
|
||||
def create_file_processor(config: Dict[str, Any],
|
||||
operation: ProcessingMode) -> Callable[[Path], ProcessingResult]:
|
||||
"""
|
||||
Create a file processor function for the specified operation.
|
||||
|
||||
Args:
|
||||
config: Configuration dictionary
|
||||
operation: Type of processing operation
|
||||
|
||||
Returns:
|
||||
Function that processes a single file and returns ProcessingResult
|
||||
"""
|
||||
import time
|
||||
|
||||
def process_file(file_path: Path) -> ProcessingResult:
|
||||
"""Process a single file based on the operation type."""
|
||||
start_time = time.time()
|
||||
|
||||
try:
|
||||
if operation == ProcessingMode.INGEST:
|
||||
# Ingest file into database
|
||||
from .database import DatabaseManager
|
||||
db_manager = DatabaseManager(config.get('database'))
|
||||
|
||||
# Read file content
|
||||
content = file_path.read_text(encoding='utf-8')
|
||||
|
||||
# Store in database
|
||||
db_manager.store_document(str(file_path), content)
|
||||
|
||||
processing_time = time.time() - start_time
|
||||
return ProcessingResult(
|
||||
file_path=file_path,
|
||||
success=True,
|
||||
message="Ingested successfully",
|
||||
processing_time=processing_time
|
||||
)
|
||||
|
||||
elif operation == ProcessingMode.STATUS:
|
||||
# Check file status
|
||||
from .database import DatabaseManager
|
||||
db_manager = DatabaseManager(config.get('database'))
|
||||
|
||||
try:
|
||||
metadata = db_manager.get_metadata(str(file_path))
|
||||
message = f"Found in database (ID: {metadata.get('id', 'Unknown')})"
|
||||
except:
|
||||
message = "Not found in database"
|
||||
|
||||
processing_time = time.time() - start_time
|
||||
return ProcessingResult(
|
||||
file_path=file_path,
|
||||
success=True,
|
||||
message=message,
|
||||
processing_time=processing_time
|
||||
)
|
||||
|
||||
elif operation == ProcessingMode.VALIDATE:
|
||||
# Validate file format/content
|
||||
content = file_path.read_text(encoding='utf-8')
|
||||
|
||||
# Basic validation - check if it's valid markdown
|
||||
if not content.strip():
|
||||
raise ValueError("File is empty")
|
||||
|
||||
processing_time = time.time() - start_time
|
||||
return ProcessingResult(
|
||||
file_path=file_path,
|
||||
success=True,
|
||||
message="Valid markdown file",
|
||||
processing_time=processing_time
|
||||
)
|
||||
|
||||
else:
|
||||
raise ValueError(f"Unsupported operation: {operation}")
|
||||
|
||||
except Exception as e:
|
||||
processing_time = time.time() - start_time
|
||||
return ProcessingResult(
|
||||
file_path=file_path,
|
||||
success=False,
|
||||
message=f"Failed: {str(e)}",
|
||||
error=str(e),
|
||||
processing_time=processing_time
|
||||
)
|
||||
|
||||
return process_file
|
||||
Reference in New Issue
Block a user