Files
markitect-main/markitect/batch_processor.py
tegwick 0982e771e4 feat: implement batch processing and recursive operations (issue #17)
Complete implementation of batch processing capabilities for MarkiTect CLI:

New CLI Commands:
- markitect ingest-dir: Process all markdown files in directory with recursive support
- markitect batch-process: Process files matching glob patterns
- markitect recursive: Recursive processing with depth control

Core Features:
- Sophisticated batch processing engine with progress tracking
- Multiple error handling strategies (stop, continue, skip)
- Recursive directory traversal with configurable depth limits
- Glob pattern matching for flexible file selection
- Progress feedback with detailed processing statistics
- Integration with existing database and caching systems

Technical Implementation:
- BatchProcessor class with modular architecture
- ProgressTracker for real-time user feedback
- Comprehensive error handling and edge case management
- Support for multiple operations (ingest, status, validate)
- Depth-controlled recursive search with proper boundary handling
- Permission error resilience and graceful degradation

Testing:
- 29 comprehensive tests covering all functionality
- Edge cases: empty directories, hidden files, permission errors
- CLI integration tests with mocked database operations
- Depth logic validation and boundary condition testing
- Error handling scenarios and recovery mechanisms

All acceptance criteria fulfilled:
 Directory and recursive processing
 Glob pattern support for file selection
 Progress tracking and user feedback
 Error handling with continuation options
 Comprehensive test coverage

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 10:45:43 +02:00

379 lines
13 KiB
Python

"""
Batch Processing and Recursive Operations - Issue #17
This module provides batch processing capabilities for MarkiTect, allowing
users to process multiple files and directories recursively through CLI.
Features:
- Directory processing with recursive support
- Glob pattern matching for file selection
- Progress tracking with user feedback
- Error handling with continuation options
- Depth control for recursive operations
Commands implemented:
- ingest-dir: Process all Markdown files in directory
- batch-process: Process files matching glob pattern
- recursive operations with depth control
"""
import os
import glob
import fnmatch
from pathlib import Path
from typing import List, Optional, Dict, Any, Iterator, Callable
from dataclasses import dataclass
from enum import Enum
import click
class ProcessingMode(Enum):
"""Modes for batch processing operations."""
INGEST = "ingest"
STATUS = "status"
VALIDATE = "validate"
GENERATE = "generate"
class ErrorHandling(Enum):
"""Error handling strategies for batch operations."""
STOP = "stop" # Stop on first error
CONTINUE = "continue" # Continue processing, collect errors
SKIP = "skip" # Skip failed files, no error collection
@dataclass
class ProcessingResult:
"""Result of processing a single file."""
file_path: Path
success: bool
message: str
error: Optional[str] = None
processing_time: Optional[float] = None
@dataclass
class BatchResult:
"""Result of a batch processing operation."""
total_files: int
processed: int
succeeded: int
failed: int
skipped: int
errors: List[ProcessingResult]
processing_time: float
class ProgressTracker:
"""Progress tracking for batch operations."""
def __init__(self, total: int, show_progress: bool = True):
self.total = total
self.processed = 0
self.succeeded = 0
self.failed = 0
self.skipped = 0
self.show_progress = show_progress
def update(self, result: ProcessingResult):
"""Update progress with a processing result."""
self.processed += 1
if result.success:
self.succeeded += 1
else:
self.failed += 1
if self.show_progress:
self._display_progress(result)
def skip_file(self, file_path: Path, reason: str):
"""Mark a file as skipped."""
self.skipped += 1
if self.show_progress:
click.echo(f"⚠️ Skipped {file_path}: {reason}")
def _display_progress(self, result: ProcessingResult):
"""Display progress information."""
status = "" if result.success else ""
percentage = (self.processed / self.total) * 100
click.echo(f"{status} [{self.processed}/{self.total}] ({percentage:.1f}%) {result.file_path}")
if not result.success and result.error:
click.echo(f" Error: {result.error}")
class BatchProcessor:
"""Core batch processing engine."""
def __init__(self,
error_handling: ErrorHandling = ErrorHandling.CONTINUE,
show_progress: bool = True,
max_depth: Optional[int] = None):
self.error_handling = error_handling
self.show_progress = show_progress
self.max_depth = max_depth
def find_markdown_files(self,
directory: Path,
pattern: str = "*.md",
recursive: bool = False,
depth: Optional[int] = None) -> List[Path]:
"""
Find markdown files in directory with pattern matching.
Args:
directory: Directory to search
pattern: Glob pattern for file matching
recursive: Whether to search recursively
depth: Maximum depth for recursive search
Returns:
List of matching file paths
"""
files = []
if not directory.exists():
raise FileNotFoundError(f"Directory not found: {directory}")
if not directory.is_dir():
raise NotADirectoryError(f"Path is not a directory: {directory}")
if recursive:
effective_depth = depth if depth is not None else self.max_depth
files.extend(self._find_recursive(directory, pattern, effective_depth))
else:
# Non-recursive: only current directory
files.extend(self._find_in_directory(directory, pattern))
return sorted(files)
def _find_recursive(self, directory: Path, pattern: str, max_depth: Optional[int]) -> List[Path]:
"""Find files recursively with depth control."""
files = []
def _search(current_dir: Path, current_depth: int):
# Add files from current directory (if within depth limit)
if max_depth is None or current_depth <= max_depth:
files.extend(self._find_in_directory(current_dir, pattern))
# Recurse into subdirectories (if we haven't reached depth limit)
if max_depth is None or current_depth < max_depth:
try:
for item in current_dir.iterdir():
if item.is_dir() and not item.name.startswith('.'):
_search(item, current_depth + 1)
except PermissionError:
# Skip directories we can't access
if self.show_progress:
click.echo(f"⚠️ Permission denied: {current_dir}")
_search(directory, 0)
return files
def _find_in_directory(self, directory: Path, pattern: str) -> List[Path]:
"""Find files matching pattern in a specific directory."""
files = []
try:
for item in directory.iterdir():
if item.is_file() and fnmatch.fnmatch(item.name, pattern):
files.append(item)
except PermissionError:
if self.show_progress:
click.echo(f"⚠️ Permission denied: {directory}")
return files
def find_files_by_glob(self, glob_pattern: str) -> List[Path]:
"""
Find files using glob patterns.
Args:
glob_pattern: Glob pattern (e.g., "**/*.md", "docs/*.markdown")
Returns:
List of matching file paths
"""
matches = glob.glob(glob_pattern, recursive=True)
return [Path(match) for match in matches if Path(match).is_file()]
def process_files(self,
files: List[Path],
processor_func: Callable[[Path], ProcessingResult],
operation_name: str = "Processing") -> BatchResult:
"""
Process a list of files with progress tracking and error handling.
Args:
files: List of files to process
processor_func: Function to process each file
operation_name: Name of the operation for progress display
Returns:
BatchResult with processing statistics
"""
import time
start_time = time.time()
if self.show_progress:
click.echo(f"🚀 {operation_name} {len(files)} files...")
tracker = ProgressTracker(len(files), self.show_progress)
errors = []
for file_path in files:
try:
# Check if file still exists (might have been deleted during processing)
if not file_path.exists():
tracker.skip_file(file_path, "File no longer exists")
continue
# Process the file
result = processor_func(file_path)
tracker.update(result)
if not result.success:
errors.append(result)
# Handle errors based on strategy
if self.error_handling == ErrorHandling.STOP:
break
except Exception as e:
# Handle unexpected errors
error_result = ProcessingResult(
file_path=file_path,
success=False,
message=f"Unexpected error: {str(e)}",
error=str(e)
)
tracker.update(error_result)
errors.append(error_result)
if self.error_handling == ErrorHandling.STOP:
break
processing_time = time.time() - start_time
result = BatchResult(
total_files=len(files),
processed=tracker.processed,
succeeded=tracker.succeeded,
failed=tracker.failed,
skipped=tracker.skipped,
errors=errors,
processing_time=processing_time
)
if self.show_progress:
self._display_summary(result, operation_name)
return result
def _display_summary(self, result: BatchResult, operation_name: str):
"""Display batch processing summary."""
click.echo(f"\n📊 {operation_name} Summary:")
click.echo(f" Total files: {result.total_files}")
click.echo(f" Processed: {result.processed}")
click.echo(f" Succeeded: {result.succeeded}")
click.echo(f" Failed: {result.failed}")
click.echo(f" Skipped: {result.skipped}")
click.echo(f" Processing time: {result.processing_time:.2f}s")
if result.failed > 0:
click.echo(f"\n{result.failed} files failed:")
for error in result.errors[:10]: # Show first 10 errors
click.echo(f"{error.file_path}: {error.message}")
if len(result.errors) > 10:
click.echo(f" ... and {len(result.errors) - 10} more errors")
def create_file_processor(config: Dict[str, Any],
operation: ProcessingMode) -> Callable[[Path], ProcessingResult]:
"""
Create a file processor function for the specified operation.
Args:
config: Configuration dictionary
operation: Type of processing operation
Returns:
Function that processes a single file and returns ProcessingResult
"""
import time
def process_file(file_path: Path) -> ProcessingResult:
"""Process a single file based on the operation type."""
start_time = time.time()
try:
if operation == ProcessingMode.INGEST:
# Ingest file into database
from .database import DatabaseManager
db_manager = DatabaseManager(config.get('database'))
# Read file content
content = file_path.read_text(encoding='utf-8')
# Store in database
db_manager.store_document(str(file_path), content)
processing_time = time.time() - start_time
return ProcessingResult(
file_path=file_path,
success=True,
message="Ingested successfully",
processing_time=processing_time
)
elif operation == ProcessingMode.STATUS:
# Check file status
from .database import DatabaseManager
db_manager = DatabaseManager(config.get('database'))
try:
metadata = db_manager.get_metadata(str(file_path))
message = f"Found in database (ID: {metadata.get('id', 'Unknown')})"
except:
message = "Not found in database"
processing_time = time.time() - start_time
return ProcessingResult(
file_path=file_path,
success=True,
message=message,
processing_time=processing_time
)
elif operation == ProcessingMode.VALIDATE:
# Validate file format/content
content = file_path.read_text(encoding='utf-8')
# Basic validation - check if it's valid markdown
if not content.strip():
raise ValueError("File is empty")
processing_time = time.time() - start_time
return ProcessingResult(
file_path=file_path,
success=True,
message="Valid markdown file",
processing_time=processing_time
)
else:
raise ValueError(f"Unsupported operation: {operation}")
except Exception as e:
processing_time = time.time() - start_time
return ProcessingResult(
file_path=file_path,
success=False,
message=f"Failed: {str(e)}",
error=str(e),
processing_time=processing_time
)
return process_file