Complete implementation of batch processing capabilities for MarkiTect CLI: New CLI Commands: - markitect ingest-dir: Process all markdown files in directory with recursive support - markitect batch-process: Process files matching glob patterns - markitect recursive: Recursive processing with depth control Core Features: - Sophisticated batch processing engine with progress tracking - Multiple error handling strategies (stop, continue, skip) - Recursive directory traversal with configurable depth limits - Glob pattern matching for flexible file selection - Progress feedback with detailed processing statistics - Integration with existing database and caching systems Technical Implementation: - BatchProcessor class with modular architecture - ProgressTracker for real-time user feedback - Comprehensive error handling and edge case management - Support for multiple operations (ingest, status, validate) - Depth-controlled recursive search with proper boundary handling - Permission error resilience and graceful degradation Testing: - 29 comprehensive tests covering all functionality - Edge cases: empty directories, hidden files, permission errors - CLI integration tests with mocked database operations - Depth logic validation and boundary condition testing - Error handling scenarios and recovery mechanisms All acceptance criteria fulfilled: ✅ Directory and recursive processing ✅ Glob pattern support for file selection ✅ Progress tracking and user feedback ✅ Error handling with continuation options ✅ Comprehensive test coverage 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
379 lines
13 KiB
Python
379 lines
13 KiB
Python
"""
|
|
Batch Processing and Recursive Operations - Issue #17
|
|
|
|
This module provides batch processing capabilities for MarkiTect, allowing
|
|
users to process multiple files and directories recursively through CLI.
|
|
|
|
Features:
|
|
- Directory processing with recursive support
|
|
- Glob pattern matching for file selection
|
|
- Progress tracking with user feedback
|
|
- Error handling with continuation options
|
|
- Depth control for recursive operations
|
|
|
|
Commands implemented:
|
|
- ingest-dir: Process all Markdown files in directory
|
|
- batch-process: Process files matching glob pattern
|
|
- recursive operations with depth control
|
|
"""
|
|
|
|
import os
|
|
import glob
|
|
import fnmatch
|
|
from pathlib import Path
|
|
from typing import List, Optional, Dict, Any, Iterator, Callable
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
import click
|
|
|
|
|
|
class ProcessingMode(Enum):
|
|
"""Modes for batch processing operations."""
|
|
INGEST = "ingest"
|
|
STATUS = "status"
|
|
VALIDATE = "validate"
|
|
GENERATE = "generate"
|
|
|
|
|
|
class ErrorHandling(Enum):
|
|
"""Error handling strategies for batch operations."""
|
|
STOP = "stop" # Stop on first error
|
|
CONTINUE = "continue" # Continue processing, collect errors
|
|
SKIP = "skip" # Skip failed files, no error collection
|
|
|
|
|
|
@dataclass
|
|
class ProcessingResult:
|
|
"""Result of processing a single file."""
|
|
file_path: Path
|
|
success: bool
|
|
message: str
|
|
error: Optional[str] = None
|
|
processing_time: Optional[float] = None
|
|
|
|
|
|
@dataclass
|
|
class BatchResult:
|
|
"""Result of a batch processing operation."""
|
|
total_files: int
|
|
processed: int
|
|
succeeded: int
|
|
failed: int
|
|
skipped: int
|
|
errors: List[ProcessingResult]
|
|
processing_time: float
|
|
|
|
|
|
class ProgressTracker:
|
|
"""Progress tracking for batch operations."""
|
|
|
|
def __init__(self, total: int, show_progress: bool = True):
|
|
self.total = total
|
|
self.processed = 0
|
|
self.succeeded = 0
|
|
self.failed = 0
|
|
self.skipped = 0
|
|
self.show_progress = show_progress
|
|
|
|
def update(self, result: ProcessingResult):
|
|
"""Update progress with a processing result."""
|
|
self.processed += 1
|
|
if result.success:
|
|
self.succeeded += 1
|
|
else:
|
|
self.failed += 1
|
|
|
|
if self.show_progress:
|
|
self._display_progress(result)
|
|
|
|
def skip_file(self, file_path: Path, reason: str):
|
|
"""Mark a file as skipped."""
|
|
self.skipped += 1
|
|
if self.show_progress:
|
|
click.echo(f"⚠️ Skipped {file_path}: {reason}")
|
|
|
|
def _display_progress(self, result: ProcessingResult):
|
|
"""Display progress information."""
|
|
status = "✅" if result.success else "❌"
|
|
percentage = (self.processed / self.total) * 100
|
|
|
|
click.echo(f"{status} [{self.processed}/{self.total}] ({percentage:.1f}%) {result.file_path}")
|
|
|
|
if not result.success and result.error:
|
|
click.echo(f" Error: {result.error}")
|
|
|
|
|
|
class BatchProcessor:
|
|
"""Core batch processing engine."""
|
|
|
|
def __init__(self,
|
|
error_handling: ErrorHandling = ErrorHandling.CONTINUE,
|
|
show_progress: bool = True,
|
|
max_depth: Optional[int] = None):
|
|
self.error_handling = error_handling
|
|
self.show_progress = show_progress
|
|
self.max_depth = max_depth
|
|
|
|
def find_markdown_files(self,
|
|
directory: Path,
|
|
pattern: str = "*.md",
|
|
recursive: bool = False,
|
|
depth: Optional[int] = None) -> List[Path]:
|
|
"""
|
|
Find markdown files in directory with pattern matching.
|
|
|
|
Args:
|
|
directory: Directory to search
|
|
pattern: Glob pattern for file matching
|
|
recursive: Whether to search recursively
|
|
depth: Maximum depth for recursive search
|
|
|
|
Returns:
|
|
List of matching file paths
|
|
"""
|
|
files = []
|
|
|
|
if not directory.exists():
|
|
raise FileNotFoundError(f"Directory not found: {directory}")
|
|
|
|
if not directory.is_dir():
|
|
raise NotADirectoryError(f"Path is not a directory: {directory}")
|
|
|
|
if recursive:
|
|
effective_depth = depth if depth is not None else self.max_depth
|
|
files.extend(self._find_recursive(directory, pattern, effective_depth))
|
|
else:
|
|
# Non-recursive: only current directory
|
|
files.extend(self._find_in_directory(directory, pattern))
|
|
|
|
return sorted(files)
|
|
|
|
def _find_recursive(self, directory: Path, pattern: str, max_depth: Optional[int]) -> List[Path]:
|
|
"""Find files recursively with depth control."""
|
|
files = []
|
|
|
|
def _search(current_dir: Path, current_depth: int):
|
|
# Add files from current directory (if within depth limit)
|
|
if max_depth is None or current_depth <= max_depth:
|
|
files.extend(self._find_in_directory(current_dir, pattern))
|
|
|
|
# Recurse into subdirectories (if we haven't reached depth limit)
|
|
if max_depth is None or current_depth < max_depth:
|
|
try:
|
|
for item in current_dir.iterdir():
|
|
if item.is_dir() and not item.name.startswith('.'):
|
|
_search(item, current_depth + 1)
|
|
except PermissionError:
|
|
# Skip directories we can't access
|
|
if self.show_progress:
|
|
click.echo(f"⚠️ Permission denied: {current_dir}")
|
|
|
|
_search(directory, 0)
|
|
return files
|
|
|
|
def _find_in_directory(self, directory: Path, pattern: str) -> List[Path]:
|
|
"""Find files matching pattern in a specific directory."""
|
|
files = []
|
|
|
|
try:
|
|
for item in directory.iterdir():
|
|
if item.is_file() and fnmatch.fnmatch(item.name, pattern):
|
|
files.append(item)
|
|
except PermissionError:
|
|
if self.show_progress:
|
|
click.echo(f"⚠️ Permission denied: {directory}")
|
|
|
|
return files
|
|
|
|
def find_files_by_glob(self, glob_pattern: str) -> List[Path]:
|
|
"""
|
|
Find files using glob patterns.
|
|
|
|
Args:
|
|
glob_pattern: Glob pattern (e.g., "**/*.md", "docs/*.markdown")
|
|
|
|
Returns:
|
|
List of matching file paths
|
|
"""
|
|
matches = glob.glob(glob_pattern, recursive=True)
|
|
return [Path(match) for match in matches if Path(match).is_file()]
|
|
|
|
def process_files(self,
|
|
files: List[Path],
|
|
processor_func: Callable[[Path], ProcessingResult],
|
|
operation_name: str = "Processing") -> BatchResult:
|
|
"""
|
|
Process a list of files with progress tracking and error handling.
|
|
|
|
Args:
|
|
files: List of files to process
|
|
processor_func: Function to process each file
|
|
operation_name: Name of the operation for progress display
|
|
|
|
Returns:
|
|
BatchResult with processing statistics
|
|
"""
|
|
import time
|
|
start_time = time.time()
|
|
|
|
if self.show_progress:
|
|
click.echo(f"🚀 {operation_name} {len(files)} files...")
|
|
|
|
tracker = ProgressTracker(len(files), self.show_progress)
|
|
errors = []
|
|
|
|
for file_path in files:
|
|
try:
|
|
# Check if file still exists (might have been deleted during processing)
|
|
if not file_path.exists():
|
|
tracker.skip_file(file_path, "File no longer exists")
|
|
continue
|
|
|
|
# Process the file
|
|
result = processor_func(file_path)
|
|
tracker.update(result)
|
|
|
|
if not result.success:
|
|
errors.append(result)
|
|
|
|
# Handle errors based on strategy
|
|
if self.error_handling == ErrorHandling.STOP:
|
|
break
|
|
|
|
except Exception as e:
|
|
# Handle unexpected errors
|
|
error_result = ProcessingResult(
|
|
file_path=file_path,
|
|
success=False,
|
|
message=f"Unexpected error: {str(e)}",
|
|
error=str(e)
|
|
)
|
|
tracker.update(error_result)
|
|
errors.append(error_result)
|
|
|
|
if self.error_handling == ErrorHandling.STOP:
|
|
break
|
|
|
|
processing_time = time.time() - start_time
|
|
|
|
result = BatchResult(
|
|
total_files=len(files),
|
|
processed=tracker.processed,
|
|
succeeded=tracker.succeeded,
|
|
failed=tracker.failed,
|
|
skipped=tracker.skipped,
|
|
errors=errors,
|
|
processing_time=processing_time
|
|
)
|
|
|
|
if self.show_progress:
|
|
self._display_summary(result, operation_name)
|
|
|
|
return result
|
|
|
|
def _display_summary(self, result: BatchResult, operation_name: str):
|
|
"""Display batch processing summary."""
|
|
click.echo(f"\n📊 {operation_name} Summary:")
|
|
click.echo(f" Total files: {result.total_files}")
|
|
click.echo(f" Processed: {result.processed}")
|
|
click.echo(f" Succeeded: {result.succeeded}")
|
|
click.echo(f" Failed: {result.failed}")
|
|
click.echo(f" Skipped: {result.skipped}")
|
|
click.echo(f" Processing time: {result.processing_time:.2f}s")
|
|
|
|
if result.failed > 0:
|
|
click.echo(f"\n❌ {result.failed} files failed:")
|
|
for error in result.errors[:10]: # Show first 10 errors
|
|
click.echo(f" • {error.file_path}: {error.message}")
|
|
|
|
if len(result.errors) > 10:
|
|
click.echo(f" ... and {len(result.errors) - 10} more errors")
|
|
|
|
|
|
def create_file_processor(config: Dict[str, Any],
|
|
operation: ProcessingMode) -> Callable[[Path], ProcessingResult]:
|
|
"""
|
|
Create a file processor function for the specified operation.
|
|
|
|
Args:
|
|
config: Configuration dictionary
|
|
operation: Type of processing operation
|
|
|
|
Returns:
|
|
Function that processes a single file and returns ProcessingResult
|
|
"""
|
|
import time
|
|
|
|
def process_file(file_path: Path) -> ProcessingResult:
|
|
"""Process a single file based on the operation type."""
|
|
start_time = time.time()
|
|
|
|
try:
|
|
if operation == ProcessingMode.INGEST:
|
|
# Ingest file into database
|
|
from .database import DatabaseManager
|
|
db_manager = DatabaseManager(config.get('database'))
|
|
|
|
# Read file content
|
|
content = file_path.read_text(encoding='utf-8')
|
|
|
|
# Store in database
|
|
db_manager.store_document(str(file_path), content)
|
|
|
|
processing_time = time.time() - start_time
|
|
return ProcessingResult(
|
|
file_path=file_path,
|
|
success=True,
|
|
message="Ingested successfully",
|
|
processing_time=processing_time
|
|
)
|
|
|
|
elif operation == ProcessingMode.STATUS:
|
|
# Check file status
|
|
from .database import DatabaseManager
|
|
db_manager = DatabaseManager(config.get('database'))
|
|
|
|
try:
|
|
metadata = db_manager.get_metadata(str(file_path))
|
|
message = f"Found in database (ID: {metadata.get('id', 'Unknown')})"
|
|
except:
|
|
message = "Not found in database"
|
|
|
|
processing_time = time.time() - start_time
|
|
return ProcessingResult(
|
|
file_path=file_path,
|
|
success=True,
|
|
message=message,
|
|
processing_time=processing_time
|
|
)
|
|
|
|
elif operation == ProcessingMode.VALIDATE:
|
|
# Validate file format/content
|
|
content = file_path.read_text(encoding='utf-8')
|
|
|
|
# Basic validation - check if it's valid markdown
|
|
if not content.strip():
|
|
raise ValueError("File is empty")
|
|
|
|
processing_time = time.time() - start_time
|
|
return ProcessingResult(
|
|
file_path=file_path,
|
|
success=True,
|
|
message="Valid markdown file",
|
|
processing_time=processing_time
|
|
)
|
|
|
|
else:
|
|
raise ValueError(f"Unsupported operation: {operation}")
|
|
|
|
except Exception as e:
|
|
processing_time = time.time() - start_time
|
|
return ProcessingResult(
|
|
file_path=file_path,
|
|
success=False,
|
|
message=f"Failed: {str(e)}",
|
|
error=str(e),
|
|
processing_time=processing_time
|
|
)
|
|
|
|
return process_file |