Compare commits
4 Commits
818d8346ad
...
f6c285b774
| Author | SHA1 | Date | |
|---|---|---|---|
| f6c285b774 | |||
| 0982e771e4 | |||
| a4805812f3 | |||
| 77db9f6231 |
8
Makefile
8
Makefile
@@ -254,11 +254,11 @@ release-dry-run:
|
||||
# Chaos Engineering targets
|
||||
chaos-validate:
|
||||
@echo "🔥 Running architectural independence validation..."
|
||||
$(VENV_PYTHON) chaos_test_runner.py validate-independence
|
||||
$(VENV_PYTHON) tools/chaos_test_runner.py validate-independence
|
||||
|
||||
chaos-matrix:
|
||||
@echo "🏗️ Showing architectural dependency matrix..."
|
||||
$(VENV_PYTHON) chaos_test_runner.py dependency-matrix
|
||||
$(VENV_PYTHON) tools/chaos_test_runner.py dependency-matrix
|
||||
|
||||
chaos-inject:
|
||||
@echo "💥 Injecting chaos into layer..."
|
||||
@@ -266,11 +266,11 @@ chaos-inject:
|
||||
echo "❌ Usage: make chaos-inject LAYER=L1_Presentation TYPE=import_failure"; \
|
||||
exit 1; \
|
||||
fi
|
||||
$(VENV_PYTHON) chaos_test_runner.py inject-layer-failure --layer $(LAYER) $(if $(TYPE),--injection-type $(TYPE))
|
||||
$(VENV_PYTHON) tools/chaos_test_runner.py inject-layer-failure --layer $(LAYER) $(if $(TYPE),--injection-type $(TYPE))
|
||||
|
||||
chaos-report:
|
||||
@echo "📄 Generating chaos engineering report..."
|
||||
$(VENV_PYTHON) chaos_test_runner.py chaos-report
|
||||
$(VENV_PYTHON) tools/chaos_test_runner.py chaos-report
|
||||
|
||||
# Code linting
|
||||
lint: $(VENV)/bin/activate
|
||||
|
||||
379
markitect/batch_processor.py
Normal file
379
markitect/batch_processor.py
Normal file
@@ -0,0 +1,379 @@
|
||||
"""
|
||||
Batch Processing and Recursive Operations - Issue #17
|
||||
|
||||
This module provides batch processing capabilities for MarkiTect, allowing
|
||||
users to process multiple files and directories recursively through CLI.
|
||||
|
||||
Features:
|
||||
- Directory processing with recursive support
|
||||
- Glob pattern matching for file selection
|
||||
- Progress tracking with user feedback
|
||||
- Error handling with continuation options
|
||||
- Depth control for recursive operations
|
||||
|
||||
Commands implemented:
|
||||
- ingest-dir: Process all Markdown files in directory
|
||||
- batch-process: Process files matching glob pattern
|
||||
- recursive operations with depth control
|
||||
"""
|
||||
|
||||
import os
|
||||
import glob
|
||||
import fnmatch
|
||||
from pathlib import Path
|
||||
from typing import List, Optional, Dict, Any, Iterator, Callable
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
import click
|
||||
|
||||
|
||||
class ProcessingMode(Enum):
|
||||
"""Modes for batch processing operations."""
|
||||
INGEST = "ingest"
|
||||
STATUS = "status"
|
||||
VALIDATE = "validate"
|
||||
GENERATE = "generate"
|
||||
|
||||
|
||||
class ErrorHandling(Enum):
|
||||
"""Error handling strategies for batch operations."""
|
||||
STOP = "stop" # Stop on first error
|
||||
CONTINUE = "continue" # Continue processing, collect errors
|
||||
SKIP = "skip" # Skip failed files, no error collection
|
||||
|
||||
|
||||
@dataclass
|
||||
class ProcessingResult:
|
||||
"""Result of processing a single file."""
|
||||
file_path: Path
|
||||
success: bool
|
||||
message: str
|
||||
error: Optional[str] = None
|
||||
processing_time: Optional[float] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class BatchResult:
|
||||
"""Result of a batch processing operation."""
|
||||
total_files: int
|
||||
processed: int
|
||||
succeeded: int
|
||||
failed: int
|
||||
skipped: int
|
||||
errors: List[ProcessingResult]
|
||||
processing_time: float
|
||||
|
||||
|
||||
class ProgressTracker:
|
||||
"""Progress tracking for batch operations."""
|
||||
|
||||
def __init__(self, total: int, show_progress: bool = True):
|
||||
self.total = total
|
||||
self.processed = 0
|
||||
self.succeeded = 0
|
||||
self.failed = 0
|
||||
self.skipped = 0
|
||||
self.show_progress = show_progress
|
||||
|
||||
def update(self, result: ProcessingResult):
|
||||
"""Update progress with a processing result."""
|
||||
self.processed += 1
|
||||
if result.success:
|
||||
self.succeeded += 1
|
||||
else:
|
||||
self.failed += 1
|
||||
|
||||
if self.show_progress:
|
||||
self._display_progress(result)
|
||||
|
||||
def skip_file(self, file_path: Path, reason: str):
|
||||
"""Mark a file as skipped."""
|
||||
self.skipped += 1
|
||||
if self.show_progress:
|
||||
click.echo(f"⚠️ Skipped {file_path}: {reason}")
|
||||
|
||||
def _display_progress(self, result: ProcessingResult):
|
||||
"""Display progress information."""
|
||||
status = "✅" if result.success else "❌"
|
||||
percentage = (self.processed / self.total) * 100
|
||||
|
||||
click.echo(f"{status} [{self.processed}/{self.total}] ({percentage:.1f}%) {result.file_path}")
|
||||
|
||||
if not result.success and result.error:
|
||||
click.echo(f" Error: {result.error}")
|
||||
|
||||
|
||||
class BatchProcessor:
|
||||
"""Core batch processing engine."""
|
||||
|
||||
def __init__(self,
|
||||
error_handling: ErrorHandling = ErrorHandling.CONTINUE,
|
||||
show_progress: bool = True,
|
||||
max_depth: Optional[int] = None):
|
||||
self.error_handling = error_handling
|
||||
self.show_progress = show_progress
|
||||
self.max_depth = max_depth
|
||||
|
||||
def find_markdown_files(self,
|
||||
directory: Path,
|
||||
pattern: str = "*.md",
|
||||
recursive: bool = False,
|
||||
depth: Optional[int] = None) -> List[Path]:
|
||||
"""
|
||||
Find markdown files in directory with pattern matching.
|
||||
|
||||
Args:
|
||||
directory: Directory to search
|
||||
pattern: Glob pattern for file matching
|
||||
recursive: Whether to search recursively
|
||||
depth: Maximum depth for recursive search
|
||||
|
||||
Returns:
|
||||
List of matching file paths
|
||||
"""
|
||||
files = []
|
||||
|
||||
if not directory.exists():
|
||||
raise FileNotFoundError(f"Directory not found: {directory}")
|
||||
|
||||
if not directory.is_dir():
|
||||
raise NotADirectoryError(f"Path is not a directory: {directory}")
|
||||
|
||||
if recursive:
|
||||
effective_depth = depth if depth is not None else self.max_depth
|
||||
files.extend(self._find_recursive(directory, pattern, effective_depth))
|
||||
else:
|
||||
# Non-recursive: only current directory
|
||||
files.extend(self._find_in_directory(directory, pattern))
|
||||
|
||||
return sorted(files)
|
||||
|
||||
def _find_recursive(self, directory: Path, pattern: str, max_depth: Optional[int]) -> List[Path]:
|
||||
"""Find files recursively with depth control."""
|
||||
files = []
|
||||
|
||||
def _search(current_dir: Path, current_depth: int):
|
||||
# Add files from current directory (if within depth limit)
|
||||
if max_depth is None or current_depth <= max_depth:
|
||||
files.extend(self._find_in_directory(current_dir, pattern))
|
||||
|
||||
# Recurse into subdirectories (if we haven't reached depth limit)
|
||||
if max_depth is None or current_depth < max_depth:
|
||||
try:
|
||||
for item in current_dir.iterdir():
|
||||
if item.is_dir() and not item.name.startswith('.'):
|
||||
_search(item, current_depth + 1)
|
||||
except PermissionError:
|
||||
# Skip directories we can't access
|
||||
if self.show_progress:
|
||||
click.echo(f"⚠️ Permission denied: {current_dir}")
|
||||
|
||||
_search(directory, 0)
|
||||
return files
|
||||
|
||||
def _find_in_directory(self, directory: Path, pattern: str) -> List[Path]:
|
||||
"""Find files matching pattern in a specific directory."""
|
||||
files = []
|
||||
|
||||
try:
|
||||
for item in directory.iterdir():
|
||||
if item.is_file() and fnmatch.fnmatch(item.name, pattern):
|
||||
files.append(item)
|
||||
except PermissionError:
|
||||
if self.show_progress:
|
||||
click.echo(f"⚠️ Permission denied: {directory}")
|
||||
|
||||
return files
|
||||
|
||||
def find_files_by_glob(self, glob_pattern: str) -> List[Path]:
|
||||
"""
|
||||
Find files using glob patterns.
|
||||
|
||||
Args:
|
||||
glob_pattern: Glob pattern (e.g., "**/*.md", "docs/*.markdown")
|
||||
|
||||
Returns:
|
||||
List of matching file paths
|
||||
"""
|
||||
matches = glob.glob(glob_pattern, recursive=True)
|
||||
return [Path(match) for match in matches if Path(match).is_file()]
|
||||
|
||||
def process_files(self,
|
||||
files: List[Path],
|
||||
processor_func: Callable[[Path], ProcessingResult],
|
||||
operation_name: str = "Processing") -> BatchResult:
|
||||
"""
|
||||
Process a list of files with progress tracking and error handling.
|
||||
|
||||
Args:
|
||||
files: List of files to process
|
||||
processor_func: Function to process each file
|
||||
operation_name: Name of the operation for progress display
|
||||
|
||||
Returns:
|
||||
BatchResult with processing statistics
|
||||
"""
|
||||
import time
|
||||
start_time = time.time()
|
||||
|
||||
if self.show_progress:
|
||||
click.echo(f"🚀 {operation_name} {len(files)} files...")
|
||||
|
||||
tracker = ProgressTracker(len(files), self.show_progress)
|
||||
errors = []
|
||||
|
||||
for file_path in files:
|
||||
try:
|
||||
# Check if file still exists (might have been deleted during processing)
|
||||
if not file_path.exists():
|
||||
tracker.skip_file(file_path, "File no longer exists")
|
||||
continue
|
||||
|
||||
# Process the file
|
||||
result = processor_func(file_path)
|
||||
tracker.update(result)
|
||||
|
||||
if not result.success:
|
||||
errors.append(result)
|
||||
|
||||
# Handle errors based on strategy
|
||||
if self.error_handling == ErrorHandling.STOP:
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
# Handle unexpected errors
|
||||
error_result = ProcessingResult(
|
||||
file_path=file_path,
|
||||
success=False,
|
||||
message=f"Unexpected error: {str(e)}",
|
||||
error=str(e)
|
||||
)
|
||||
tracker.update(error_result)
|
||||
errors.append(error_result)
|
||||
|
||||
if self.error_handling == ErrorHandling.STOP:
|
||||
break
|
||||
|
||||
processing_time = time.time() - start_time
|
||||
|
||||
result = BatchResult(
|
||||
total_files=len(files),
|
||||
processed=tracker.processed,
|
||||
succeeded=tracker.succeeded,
|
||||
failed=tracker.failed,
|
||||
skipped=tracker.skipped,
|
||||
errors=errors,
|
||||
processing_time=processing_time
|
||||
)
|
||||
|
||||
if self.show_progress:
|
||||
self._display_summary(result, operation_name)
|
||||
|
||||
return result
|
||||
|
||||
def _display_summary(self, result: BatchResult, operation_name: str):
|
||||
"""Display batch processing summary."""
|
||||
click.echo(f"\n📊 {operation_name} Summary:")
|
||||
click.echo(f" Total files: {result.total_files}")
|
||||
click.echo(f" Processed: {result.processed}")
|
||||
click.echo(f" Succeeded: {result.succeeded}")
|
||||
click.echo(f" Failed: {result.failed}")
|
||||
click.echo(f" Skipped: {result.skipped}")
|
||||
click.echo(f" Processing time: {result.processing_time:.2f}s")
|
||||
|
||||
if result.failed > 0:
|
||||
click.echo(f"\n❌ {result.failed} files failed:")
|
||||
for error in result.errors[:10]: # Show first 10 errors
|
||||
click.echo(f" • {error.file_path}: {error.message}")
|
||||
|
||||
if len(result.errors) > 10:
|
||||
click.echo(f" ... and {len(result.errors) - 10} more errors")
|
||||
|
||||
|
||||
def create_file_processor(config: Dict[str, Any],
|
||||
operation: ProcessingMode) -> Callable[[Path], ProcessingResult]:
|
||||
"""
|
||||
Create a file processor function for the specified operation.
|
||||
|
||||
Args:
|
||||
config: Configuration dictionary
|
||||
operation: Type of processing operation
|
||||
|
||||
Returns:
|
||||
Function that processes a single file and returns ProcessingResult
|
||||
"""
|
||||
import time
|
||||
|
||||
def process_file(file_path: Path) -> ProcessingResult:
|
||||
"""Process a single file based on the operation type."""
|
||||
start_time = time.time()
|
||||
|
||||
try:
|
||||
if operation == ProcessingMode.INGEST:
|
||||
# Ingest file into database
|
||||
from .database import DatabaseManager
|
||||
db_manager = DatabaseManager(config.get('database'))
|
||||
|
||||
# Read file content
|
||||
content = file_path.read_text(encoding='utf-8')
|
||||
|
||||
# Store in database
|
||||
db_manager.store_document(str(file_path), content)
|
||||
|
||||
processing_time = time.time() - start_time
|
||||
return ProcessingResult(
|
||||
file_path=file_path,
|
||||
success=True,
|
||||
message="Ingested successfully",
|
||||
processing_time=processing_time
|
||||
)
|
||||
|
||||
elif operation == ProcessingMode.STATUS:
|
||||
# Check file status
|
||||
from .database import DatabaseManager
|
||||
db_manager = DatabaseManager(config.get('database'))
|
||||
|
||||
try:
|
||||
metadata = db_manager.get_metadata(str(file_path))
|
||||
message = f"Found in database (ID: {metadata.get('id', 'Unknown')})"
|
||||
except:
|
||||
message = "Not found in database"
|
||||
|
||||
processing_time = time.time() - start_time
|
||||
return ProcessingResult(
|
||||
file_path=file_path,
|
||||
success=True,
|
||||
message=message,
|
||||
processing_time=processing_time
|
||||
)
|
||||
|
||||
elif operation == ProcessingMode.VALIDATE:
|
||||
# Validate file format/content
|
||||
content = file_path.read_text(encoding='utf-8')
|
||||
|
||||
# Basic validation - check if it's valid markdown
|
||||
if not content.strip():
|
||||
raise ValueError("File is empty")
|
||||
|
||||
processing_time = time.time() - start_time
|
||||
return ProcessingResult(
|
||||
file_path=file_path,
|
||||
success=True,
|
||||
message="Valid markdown file",
|
||||
processing_time=processing_time
|
||||
)
|
||||
|
||||
else:
|
||||
raise ValueError(f"Unsupported operation: {operation}")
|
||||
|
||||
except Exception as e:
|
||||
processing_time = time.time() - start_time
|
||||
return ProcessingResult(
|
||||
file_path=file_path,
|
||||
success=False,
|
||||
message=f"Failed: {str(e)}",
|
||||
error=str(e),
|
||||
processing_time=processing_time
|
||||
)
|
||||
|
||||
return process_file
|
||||
491
markitect/cli.py
491
markitect/cli.py
@@ -28,6 +28,8 @@ import builtins
|
||||
from .database import DatabaseManager
|
||||
from .legacy_compat import LegacyMode, emit_deprecation_warning, legacy_switch_option
|
||||
from .__version__ import get_version_info, get_release_info
|
||||
from .batch_processor import BatchProcessor, ProcessingMode, ErrorHandling, create_file_processor
|
||||
from .config_manager import ConfigurationManager
|
||||
|
||||
# Import legacy system components for advanced management
|
||||
try:
|
||||
@@ -4549,6 +4551,495 @@ def perf_history(config, limit, trend_days, output_format, output):
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
# Batch Processing Commands - Issue #17
|
||||
|
||||
|
||||
@cli.command(name='ingest-dir')
|
||||
@click.argument('directory', type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path))
|
||||
@click.option('--recursive', '-r', is_flag=True, help='Process directories recursively')
|
||||
@click.option('--depth', type=int, help='Maximum depth for recursive processing')
|
||||
@click.option('--pattern', default='*.md', help='File pattern to match (default: *.md)')
|
||||
@click.option('--error-handling', type=click.Choice(['stop', 'continue', 'skip']),
|
||||
default='continue', help='Error handling strategy')
|
||||
@click.option('--quiet', '-q', is_flag=True, help='Suppress progress output')
|
||||
@pass_config
|
||||
def ingest_dir(config, directory, recursive, depth, pattern, error_handling, quiet):
|
||||
"""Process all Markdown files in directory.
|
||||
|
||||
Ingests all markdown files found in the specified directory into the database.
|
||||
Supports recursive processing with depth control and flexible error handling.
|
||||
|
||||
Examples:
|
||||
markitect ingest-dir ./docs
|
||||
markitect ingest-dir ./content --recursive --depth 3
|
||||
markitect ingest-dir ./articles --pattern "*.markdown" --error-handling stop
|
||||
"""
|
||||
try:
|
||||
# Convert error handling string to enum
|
||||
error_strategy = ErrorHandling[error_handling.upper()]
|
||||
|
||||
# Initialize batch processor
|
||||
processor = BatchProcessor(
|
||||
error_handling=error_strategy,
|
||||
show_progress=not quiet,
|
||||
max_depth=depth
|
||||
)
|
||||
|
||||
# Find files to process
|
||||
if not quiet:
|
||||
click.echo(f"🔍 Searching for files in {directory}...")
|
||||
|
||||
files = processor.find_markdown_files(
|
||||
directory=directory,
|
||||
pattern=pattern,
|
||||
recursive=recursive,
|
||||
depth=depth
|
||||
)
|
||||
|
||||
if not files:
|
||||
click.echo(f"📭 No files found matching pattern '{pattern}' in {directory}")
|
||||
return
|
||||
|
||||
# Create file processor for ingestion
|
||||
file_processor = create_file_processor(config, ProcessingMode.INGEST)
|
||||
|
||||
# Process files
|
||||
result = processor.process_files(files, file_processor, "Ingesting")
|
||||
|
||||
# Exit with error code if there were failures
|
||||
if result.failed > 0 and error_strategy == ErrorHandling.STOP:
|
||||
sys.exit(1)
|
||||
|
||||
except Exception as e:
|
||||
click.echo(f"Directory ingestion failed: {e}", err=True)
|
||||
if config.get('verbose'):
|
||||
import traceback
|
||||
click.echo(traceback.format_exc(), err=True)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
@cli.command(name='batch-process')
|
||||
@click.argument('pattern', type=str)
|
||||
@click.option('--operation', type=click.Choice(['ingest', 'status', 'validate']),
|
||||
default='ingest', help='Operation to perform on matched files')
|
||||
@click.option('--error-handling', type=click.Choice(['stop', 'continue', 'skip']),
|
||||
default='continue', help='Error handling strategy')
|
||||
@click.option('--quiet', '-q', is_flag=True, help='Suppress progress output')
|
||||
@pass_config
|
||||
def batch_process(config, pattern, operation, error_handling, quiet):
|
||||
"""Process files matching glob pattern.
|
||||
|
||||
Uses glob patterns to find and process files. Supports various operations
|
||||
including ingestion, status checking, and validation.
|
||||
|
||||
Examples:
|
||||
markitect batch-process "**/*.md" --operation ingest
|
||||
markitect batch-process "docs/**/*.markdown" --operation status
|
||||
markitect batch-process "./content/*.md" --operation validate --error-handling stop
|
||||
"""
|
||||
try:
|
||||
# Convert strings to enums
|
||||
error_strategy = ErrorHandling[error_handling.upper()]
|
||||
processing_mode = ProcessingMode[operation.upper()]
|
||||
|
||||
# Initialize batch processor
|
||||
processor = BatchProcessor(
|
||||
error_handling=error_strategy,
|
||||
show_progress=not quiet
|
||||
)
|
||||
|
||||
# Find files using glob pattern
|
||||
if not quiet:
|
||||
click.echo(f"🔍 Searching for files matching '{pattern}'...")
|
||||
|
||||
files = processor.find_files_by_glob(pattern)
|
||||
|
||||
if not files:
|
||||
click.echo(f"📭 No files found matching pattern '{pattern}'")
|
||||
return
|
||||
|
||||
# Create file processor for the specified operation
|
||||
file_processor = create_file_processor(config, processing_mode)
|
||||
|
||||
# Process files
|
||||
operation_name = f"{operation.title()}ing"
|
||||
result = processor.process_files(files, file_processor, operation_name)
|
||||
|
||||
# Exit with error code if there were failures
|
||||
if result.failed > 0 and error_strategy == ErrorHandling.STOP:
|
||||
sys.exit(1)
|
||||
|
||||
except Exception as e:
|
||||
click.echo(f"Batch processing failed: {e}", err=True)
|
||||
if config.get('verbose'):
|
||||
import traceback
|
||||
click.echo(traceback.format_exc(), err=True)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
@cli.command(name='recursive')
|
||||
@click.argument('directory', type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path))
|
||||
@click.option('--depth', type=int, default=None, help='Maximum recursion depth')
|
||||
@click.option('--operation', type=click.Choice(['ingest', 'status', 'validate']),
|
||||
default='status', help='Operation to perform')
|
||||
@click.option('--pattern', default='*.md', help='File pattern to match (default: *.md)')
|
||||
@click.option('--error-handling', type=click.Choice(['stop', 'continue', 'skip']),
|
||||
default='continue', help='Error handling strategy')
|
||||
@click.option('--quiet', '-q', is_flag=True, help='Suppress progress output')
|
||||
@pass_config
|
||||
def recursive(config, directory, depth, operation, pattern, error_handling, quiet):
|
||||
"""Recursive processing with depth control.
|
||||
|
||||
Performs recursive operations on directory trees with configurable depth limits.
|
||||
This command provides fine-grained control over recursive processing behavior.
|
||||
|
||||
Examples:
|
||||
markitect recursive ./docs --depth 2 --operation ingest
|
||||
markitect recursive ./content --depth 5 --operation status --pattern "*.markdown"
|
||||
markitect recursive ./src --operation validate --error-handling stop
|
||||
"""
|
||||
try:
|
||||
# Convert strings to enums
|
||||
error_strategy = ErrorHandling[error_handling.upper()]
|
||||
processing_mode = ProcessingMode[operation.upper()]
|
||||
|
||||
# Initialize batch processor with depth control
|
||||
processor = BatchProcessor(
|
||||
error_handling=error_strategy,
|
||||
show_progress=not quiet,
|
||||
max_depth=depth
|
||||
)
|
||||
|
||||
# Find files recursively
|
||||
if not quiet:
|
||||
depth_str = f" (max depth: {depth})" if depth else ""
|
||||
click.echo(f"🔍 Recursively searching {directory}{depth_str}...")
|
||||
|
||||
files = processor.find_markdown_files(
|
||||
directory=directory,
|
||||
pattern=pattern,
|
||||
recursive=True,
|
||||
depth=depth
|
||||
)
|
||||
|
||||
if not files:
|
||||
click.echo(f"📭 No files found matching pattern '{pattern}' in {directory}")
|
||||
return
|
||||
|
||||
# Create file processor for the specified operation
|
||||
file_processor = create_file_processor(config, processing_mode)
|
||||
|
||||
# Process files
|
||||
operation_name = f"Recursively {operation}ing"
|
||||
result = processor.process_files(files, file_processor, operation_name)
|
||||
|
||||
# Exit with error code if there were failures
|
||||
if result.failed > 0 and error_strategy == ErrorHandling.STOP:
|
||||
sys.exit(1)
|
||||
|
||||
except Exception as e:
|
||||
click.echo(f"Recursive processing failed: {e}", err=True)
|
||||
if config.get('verbose'):
|
||||
import traceback
|
||||
click.echo(traceback.format_exc(), err=True)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
# Configuration Management Commands - Issue #18
|
||||
|
||||
|
||||
@cli.command(name='config-show')
|
||||
@click.option('--format', 'output_format', type=click.Choice(['yaml', 'json', 'simple']),
|
||||
default='yaml', help='Output format for configuration display')
|
||||
@click.option('--show-sensitive', is_flag=True, help='Show sensitive values (tokens, passwords)')
|
||||
@pass_config
|
||||
def config_show(config, output_format, show_sensitive):
|
||||
"""Display current configuration.
|
||||
|
||||
Shows comprehensive configuration information including current settings,
|
||||
file sources, environment variables, and workspace information.
|
||||
|
||||
Examples:
|
||||
markitect config-show
|
||||
markitect config-show --format json
|
||||
markitect config-show --format simple --show-sensitive
|
||||
"""
|
||||
try:
|
||||
config_manager = ConfigurationManager()
|
||||
output = config_manager.display_config(
|
||||
show_sensitive=show_sensitive,
|
||||
output_format=output_format
|
||||
)
|
||||
click.echo(output)
|
||||
|
||||
except Exception as e:
|
||||
click.echo(f"Failed to display configuration: {e}", err=True)
|
||||
if config.get('verbose'):
|
||||
import traceback
|
||||
click.echo(traceback.format_exc(), err=True)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
@cli.command(name='config-set')
|
||||
@click.argument('key', type=str)
|
||||
@click.argument('value', type=str)
|
||||
@click.option('--config-file', type=click.Path(), help='Target configuration file')
|
||||
@click.option('--validate/--no-validate', default=True, help='Validate configuration after setting')
|
||||
@pass_config
|
||||
def config_set(config, key, value, config_file, validate):
|
||||
"""Set configuration values.
|
||||
|
||||
Sets a configuration value and persists it to a configuration file.
|
||||
Supports nested keys using dot notation (e.g., 'gitea.url').
|
||||
|
||||
Examples:
|
||||
markitect config-set gitea_url http://localhost:3000
|
||||
markitect config-set repo_owner myorganization
|
||||
markitect config-set api_token abc123def456
|
||||
markitect config-set workspace.dir ./my_workspace
|
||||
"""
|
||||
try:
|
||||
config_manager = ConfigurationManager()
|
||||
|
||||
# Set the configuration value
|
||||
success = config_manager.set_config_value(key, value, config_file)
|
||||
|
||||
if success:
|
||||
click.echo(f"✅ Configuration updated: {key} = {value}")
|
||||
|
||||
# Show which file was updated
|
||||
target_file = config_manager._get_target_config_file(config_file)
|
||||
click.echo(f"📁 Updated file: {target_file}")
|
||||
|
||||
# Validate configuration if requested
|
||||
if validate:
|
||||
validation_results = config_manager.validate_configuration()
|
||||
errors = [r for r in validation_results if r['status'] == 'error']
|
||||
if errors:
|
||||
click.echo("⚠️ Configuration validation warnings:")
|
||||
for error in errors:
|
||||
click.echo(f" • {error['key']}: {error['message']}")
|
||||
|
||||
else:
|
||||
click.echo(f"❌ Failed to set configuration: {key}", err=True)
|
||||
sys.exit(1)
|
||||
|
||||
except ValueError as e:
|
||||
click.echo(f"❌ Configuration error: {e}", err=True)
|
||||
sys.exit(1)
|
||||
except Exception as e:
|
||||
click.echo(f"❌ Failed to set configuration: {e}", err=True)
|
||||
if config.get('verbose'):
|
||||
import traceback
|
||||
click.echo(traceback.format_exc(), err=True)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
@cli.command(name='config-init')
|
||||
@click.option('--project-dir', type=click.Path(path_type=Path), help='Target project directory')
|
||||
@click.option('--interactive/--no-interactive', default=True, help='Interactive configuration setup')
|
||||
@click.option('--force', is_flag=True, help='Overwrite existing configuration')
|
||||
@pass_config
|
||||
def config_init(config, project_dir, interactive, force):
|
||||
"""Initialize configuration for new project.
|
||||
|
||||
Creates a new configuration file with sensible defaults and sets up
|
||||
the necessary directory structure for a MarkiTect project.
|
||||
|
||||
Examples:
|
||||
markitect config-init
|
||||
markitect config-init --project-dir ./my-project
|
||||
markitect config-init --no-interactive --force
|
||||
"""
|
||||
try:
|
||||
target_dir = project_dir or Path.cwd()
|
||||
config_file = target_dir / '.markitect.yml'
|
||||
|
||||
# Check if configuration already exists
|
||||
if config_file.exists() and not force:
|
||||
click.echo(f"❌ Configuration file already exists: {config_file}")
|
||||
click.echo(" Use --force to overwrite or choose a different directory")
|
||||
sys.exit(1)
|
||||
|
||||
config_manager = ConfigurationManager()
|
||||
|
||||
# Interactive setup if requested
|
||||
initial_config = {
|
||||
'gitea_url': 'http://localhost:3000',
|
||||
'repo_owner': '',
|
||||
'repo_name': target_dir.name,
|
||||
'workspace_dir': '.markitect_workspace',
|
||||
'cache_dir': '.ast_cache',
|
||||
'tests_dir': 'tests',
|
||||
'test_file_pattern': 'test_issue_{issue_num}_{scenario}.py',
|
||||
'claude_code_command': 'claude'
|
||||
}
|
||||
|
||||
if interactive:
|
||||
click.echo("🔧 Interactive MarkiTect configuration setup")
|
||||
click.echo(f"📁 Target directory: {target_dir}")
|
||||
click.echo()
|
||||
|
||||
# Prompt for each configuration value
|
||||
initial_config['gitea_url'] = click.prompt(
|
||||
'Gitea server URL',
|
||||
default=initial_config['gitea_url']
|
||||
)
|
||||
initial_config['repo_owner'] = click.prompt(
|
||||
'Repository owner/organization',
|
||||
default=initial_config['repo_owner']
|
||||
)
|
||||
initial_config['repo_name'] = click.prompt(
|
||||
'Repository name',
|
||||
default=initial_config['repo_name']
|
||||
)
|
||||
|
||||
if click.confirm('Configure API token now?', default=False):
|
||||
initial_config['api_token'] = click.prompt(
|
||||
'API token',
|
||||
default='',
|
||||
hide_input=True
|
||||
)
|
||||
|
||||
initial_config['workspace_dir'] = click.prompt(
|
||||
'Workspace directory',
|
||||
default=initial_config['workspace_dir']
|
||||
)
|
||||
initial_config['tests_dir'] = click.prompt(
|
||||
'Tests directory',
|
||||
default=initial_config['tests_dir']
|
||||
)
|
||||
|
||||
# Initialize the project
|
||||
result = config_manager.initialize_project_config(target_dir, interactive=False)
|
||||
|
||||
# Update with interactive values if provided
|
||||
if interactive:
|
||||
config_manager._save_config_file(initial_config, config_file)
|
||||
result['config'] = initial_config
|
||||
|
||||
click.echo("✅ MarkiTect project initialized successfully!")
|
||||
click.echo(f"📄 Configuration file: {result['config_file']}")
|
||||
click.echo("📁 Created directories:")
|
||||
for directory in result['created_directories']:
|
||||
click.echo(f" • {directory}")
|
||||
|
||||
# Show validation results
|
||||
validation_results = config_manager.validate_configuration(result['config'])
|
||||
warnings = [r for r in validation_results if r['status'] == 'warning']
|
||||
errors = [r for r in validation_results if r['status'] == 'error']
|
||||
|
||||
if warnings:
|
||||
click.echo("⚠️ Configuration warnings:")
|
||||
for warning in warnings:
|
||||
click.echo(f" • {warning['message']}")
|
||||
|
||||
if errors:
|
||||
click.echo("❌ Configuration errors:")
|
||||
for error in errors:
|
||||
click.echo(f" • {error['message']}")
|
||||
else:
|
||||
click.echo("🎉 Configuration validation passed!")
|
||||
|
||||
except Exception as e:
|
||||
click.echo(f"❌ Failed to initialize configuration: {e}", err=True)
|
||||
if config.get('verbose'):
|
||||
import traceback
|
||||
click.echo(traceback.format_exc(), err=True)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
@cli.command(name='config-validate')
|
||||
@click.option('--verbose', '-v', is_flag=True, help='Show detailed validation information')
|
||||
@pass_config
|
||||
def config_validate(config, verbose):
|
||||
"""Validate current configuration.
|
||||
|
||||
Checks the current configuration for common issues, missing required fields,
|
||||
and validates paths and URLs. Provides suggestions for fixing any problems.
|
||||
|
||||
Examples:
|
||||
markitect config-validate
|
||||
markitect config-validate --verbose
|
||||
"""
|
||||
try:
|
||||
config_manager = ConfigurationManager()
|
||||
validation_results = config_manager.validate_configuration()
|
||||
|
||||
# Categorize results
|
||||
errors = [r for r in validation_results if r['status'] == 'error']
|
||||
warnings = [r for r in validation_results if r['status'] == 'warning']
|
||||
ok_results = [r for r in validation_results if r['status'] == 'ok']
|
||||
|
||||
# Display summary
|
||||
click.echo(f"📊 Configuration Validation Summary:")
|
||||
click.echo(f" ✅ OK: {len(ok_results)}")
|
||||
click.echo(f" ⚠️ Warnings: {len(warnings)}")
|
||||
click.echo(f" ❌ Errors: {len(errors)}")
|
||||
click.echo()
|
||||
|
||||
# Show errors
|
||||
if errors:
|
||||
click.echo("❌ Configuration Errors:")
|
||||
for error in errors:
|
||||
click.echo(f" • {error['key']}: {error['message']}")
|
||||
click.echo()
|
||||
|
||||
# Show warnings
|
||||
if warnings:
|
||||
click.echo("⚠️ Configuration Warnings:")
|
||||
for warning in warnings:
|
||||
click.echo(f" • {warning['key']}: {warning['message']}")
|
||||
click.echo()
|
||||
|
||||
# Show OK results in verbose mode
|
||||
if verbose and ok_results:
|
||||
click.echo("✅ Valid Configuration:")
|
||||
for ok_result in ok_results:
|
||||
click.echo(f" • {ok_result['key']}: {ok_result['message']}")
|
||||
click.echo()
|
||||
|
||||
# Overall status
|
||||
if errors:
|
||||
click.echo("❌ Configuration validation failed")
|
||||
sys.exit(1)
|
||||
elif warnings:
|
||||
click.echo("⚠️ Configuration validation passed with warnings")
|
||||
else:
|
||||
click.echo("✅ Configuration validation passed")
|
||||
|
||||
except Exception as e:
|
||||
click.echo(f"❌ Configuration validation failed: {e}", err=True)
|
||||
if config.get('verbose'):
|
||||
import traceback
|
||||
click.echo(traceback.format_exc(), err=True)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
@cli.command(name='config-help')
|
||||
@click.argument('key', required=False)
|
||||
@pass_config
|
||||
def config_help(config, key):
|
||||
"""Get help information for configuration keys.
|
||||
|
||||
Provides detailed information about available configuration options,
|
||||
their purposes, and example values.
|
||||
|
||||
Examples:
|
||||
markitect config-help
|
||||
markitect config-help gitea_url
|
||||
markitect config-help api_token
|
||||
"""
|
||||
try:
|
||||
config_manager = ConfigurationManager()
|
||||
help_text = config_manager.get_config_help(key)
|
||||
click.echo(help_text)
|
||||
|
||||
except Exception as e:
|
||||
click.echo(f"❌ Failed to get configuration help: {e}", err=True)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
# Register issue management commands
|
||||
cli.add_command(issues_group)
|
||||
|
||||
|
||||
490
markitect/config_manager.py
Normal file
490
markitect/config_manager.py
Normal file
@@ -0,0 +1,490 @@
|
||||
"""
|
||||
Configuration and Environment Management - Issue #18
|
||||
|
||||
This module provides comprehensive configuration management capabilities for MarkiTect,
|
||||
allowing users to manage configuration and environment settings through CLI commands.
|
||||
|
||||
Features:
|
||||
- Display current configuration with optional sensitive data masking
|
||||
- Set configuration values with validation
|
||||
- Initialize configuration for new projects
|
||||
- Configuration validation and help
|
||||
- Integration with existing configuration system
|
||||
- Environment variable management
|
||||
- Project-specific configuration support
|
||||
|
||||
Commands implemented:
|
||||
- config-show: Display current configuration
|
||||
- config-set: Set configuration values
|
||||
- config-init: Initialize configuration for new project
|
||||
"""
|
||||
|
||||
import os
|
||||
import json
|
||||
import yaml
|
||||
from pathlib import Path
|
||||
from typing import Dict, Any, Optional, List, Union, Tuple
|
||||
from dataclasses import asdict, fields
|
||||
from copy import deepcopy
|
||||
|
||||
import click
|
||||
|
||||
|
||||
class ConfigurationManager:
|
||||
"""Core configuration management functionality."""
|
||||
|
||||
def __init__(self):
|
||||
self.config_file_names = [
|
||||
'.markitect.yml',
|
||||
'.markitect.yaml',
|
||||
'.markitect.json',
|
||||
'markitect.config.yml',
|
||||
'markitect.config.yaml',
|
||||
'markitect.config.json'
|
||||
]
|
||||
self.sensitive_keys = {
|
||||
'api_token', 'token', 'password', 'secret', 'key',
|
||||
'gitea_token', 'github_token', 'auth_token'
|
||||
}
|
||||
|
||||
def get_current_config(self) -> Dict[str, Any]:
|
||||
"""Get current configuration from all sources."""
|
||||
config = {}
|
||||
|
||||
# Load from existing configuration system
|
||||
try:
|
||||
from config.manager import MarkitectConfig
|
||||
markitect_config = MarkitectConfig()
|
||||
config = asdict(markitect_config)
|
||||
|
||||
# Convert Path objects to strings for JSON serialization
|
||||
for key, value in config.items():
|
||||
if isinstance(value, Path):
|
||||
config[key] = str(value)
|
||||
|
||||
except (ImportError, Exception):
|
||||
# Fallback to basic configuration
|
||||
config = {
|
||||
'gitea_url': os.getenv('MARKITECT_GITEA_URL', 'http://localhost:3000'),
|
||||
'repo_owner': os.getenv('MARKITECT_REPO_OWNER', ''),
|
||||
'repo_name': os.getenv('MARKITECT_REPO_NAME', ''),
|
||||
'api_token': os.getenv('MARKITECT_API_TOKEN', ''),
|
||||
'workspace_dir': os.getenv('MARKITECT_WORKSPACE_DIR', '.markitect_workspace'),
|
||||
'database_path': os.getenv('MARKITECT_DATABASE_PATH', str(Path.home() / '.markitect' / 'markitect.db')),
|
||||
'cache_dir': os.getenv('MARKITECT_CACHE_DIR', '.ast_cache'),
|
||||
'tests_dir': os.getenv('MARKITECT_TESTS_DIR', 'tests'),
|
||||
'test_file_pattern': os.getenv('MARKITECT_TEST_FILE_PATTERN', 'test_issue_{issue_num}_{scenario}.py'),
|
||||
'claude_code_command': os.getenv('MARKITECT_CLAUDE_CODE_COMMAND', 'claude')
|
||||
}
|
||||
|
||||
# Add metadata about configuration sources
|
||||
config['_meta'] = {
|
||||
'config_sources': self._get_config_sources(),
|
||||
'env_variables': self._get_relevant_env_vars(),
|
||||
'working_directory': str(Path.cwd()),
|
||||
'config_file_locations': self._get_config_file_locations()
|
||||
}
|
||||
|
||||
return config
|
||||
|
||||
def _get_config_sources(self) -> List[str]:
|
||||
"""Get list of configuration sources in order of precedence."""
|
||||
sources = []
|
||||
|
||||
# Check for config files
|
||||
for config_name in self.config_file_names:
|
||||
if Path(config_name).exists():
|
||||
sources.append(f"File: {config_name}")
|
||||
|
||||
# Check for environment variables
|
||||
env_vars = [key for key in os.environ.keys() if key.startswith('MARKITECT_')]
|
||||
if env_vars:
|
||||
sources.append(f"Environment variables: {len(env_vars)} found")
|
||||
|
||||
# Always include defaults
|
||||
sources.append("Built-in defaults")
|
||||
|
||||
return sources
|
||||
|
||||
def _get_relevant_env_vars(self) -> Dict[str, str]:
|
||||
"""Get MARKITECT-related environment variables."""
|
||||
env_vars = {}
|
||||
for key, value in os.environ.items():
|
||||
if key.startswith('MARKITECT_'):
|
||||
env_vars[key] = value
|
||||
return env_vars
|
||||
|
||||
def _get_config_file_locations(self) -> Dict[str, bool]:
|
||||
"""Get status of potential config file locations."""
|
||||
locations = {}
|
||||
|
||||
# Current directory
|
||||
for config_name in self.config_file_names:
|
||||
current_dir_path = Path.cwd() / config_name
|
||||
locations[str(current_dir_path)] = current_dir_path.exists()
|
||||
|
||||
# Home directory
|
||||
home_config = Path.home() / '.markitect' / 'config.yml'
|
||||
locations[str(home_config)] = home_config.exists()
|
||||
|
||||
return locations
|
||||
|
||||
def display_config(self, show_sensitive: bool = False, output_format: str = 'yaml') -> str:
|
||||
"""Display current configuration in specified format."""
|
||||
config = self.get_current_config()
|
||||
|
||||
if not show_sensitive:
|
||||
config = self._mask_sensitive_data(config)
|
||||
|
||||
if output_format == 'json':
|
||||
return json.dumps(config, indent=2, default=str)
|
||||
elif output_format == 'yaml':
|
||||
return yaml.dump(config, default_flow_style=False, sort_keys=True)
|
||||
else:
|
||||
# Simple format
|
||||
return self._format_simple(config)
|
||||
|
||||
def _mask_sensitive_data(self, config: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Mask sensitive configuration values."""
|
||||
masked_config = deepcopy(config)
|
||||
|
||||
def mask_recursive(obj):
|
||||
if isinstance(obj, dict):
|
||||
for key, value in obj.items():
|
||||
if any(sensitive in key.lower() for sensitive in self.sensitive_keys):
|
||||
if value and str(value).strip():
|
||||
obj[key] = '***MASKED***'
|
||||
else:
|
||||
mask_recursive(value)
|
||||
elif isinstance(obj, list):
|
||||
for item in obj:
|
||||
mask_recursive(item)
|
||||
|
||||
mask_recursive(masked_config)
|
||||
return masked_config
|
||||
|
||||
def _format_simple(self, config: Dict[str, Any]) -> str:
|
||||
"""Format configuration in simple key=value style."""
|
||||
lines = []
|
||||
|
||||
def format_recursive(obj, prefix=''):
|
||||
if isinstance(obj, dict):
|
||||
for key, value in sorted(obj.items()):
|
||||
full_key = f"{prefix}{key}" if prefix else key
|
||||
if isinstance(value, (dict, list)):
|
||||
if key != '_meta': # Skip meta in simple format
|
||||
format_recursive(value, f"{full_key}.")
|
||||
else:
|
||||
lines.append(f"{full_key} = {value}")
|
||||
elif isinstance(obj, list):
|
||||
for i, item in enumerate(obj):
|
||||
format_recursive(item, f"{prefix}{i}.")
|
||||
|
||||
format_recursive(config)
|
||||
return '\n'.join(lines)
|
||||
|
||||
def set_config_value(self, key: str, value: str, config_file: Optional[str] = None) -> bool:
|
||||
"""Set a configuration value and persist it."""
|
||||
# Determine target config file
|
||||
target_file = self._get_target_config_file(config_file)
|
||||
|
||||
# Load existing config from file
|
||||
existing_config = self._load_config_file(target_file) if target_file.exists() else {}
|
||||
|
||||
# Set the value (support nested keys with dot notation)
|
||||
self._set_nested_value(existing_config, key, value)
|
||||
|
||||
# Validate the new configuration
|
||||
validation_errors = self._validate_config_value(key, value)
|
||||
if validation_errors:
|
||||
raise ValueError(f"Configuration validation failed: {'; '.join(validation_errors)}")
|
||||
|
||||
# Save back to file
|
||||
self._save_config_file(existing_config, target_file)
|
||||
|
||||
return True
|
||||
|
||||
def _get_target_config_file(self, config_file: Optional[str] = None) -> Path:
|
||||
"""Determine target configuration file."""
|
||||
if config_file:
|
||||
return Path(config_file)
|
||||
|
||||
# Look for existing config file
|
||||
for config_name in self.config_file_names:
|
||||
config_path = Path(config_name)
|
||||
if config_path.exists():
|
||||
return config_path
|
||||
|
||||
# Default to .markitect.yml in current directory
|
||||
return Path('.markitect.yml')
|
||||
|
||||
def _load_config_file(self, config_file: Path) -> Dict[str, Any]:
|
||||
"""Load configuration from file."""
|
||||
try:
|
||||
content = config_file.read_text()
|
||||
|
||||
if config_file.suffix in ['.yml', '.yaml']:
|
||||
return yaml.safe_load(content) or {}
|
||||
elif config_file.suffix == '.json':
|
||||
return json.loads(content)
|
||||
else:
|
||||
# Try YAML first, then JSON
|
||||
try:
|
||||
return yaml.safe_load(content) or {}
|
||||
except yaml.YAMLError:
|
||||
return json.loads(content)
|
||||
|
||||
except Exception as e:
|
||||
raise ValueError(f"Failed to load config file {config_file}: {e}")
|
||||
|
||||
def _save_config_file(self, config: Dict[str, Any], config_file: Path) -> None:
|
||||
"""Save configuration to file."""
|
||||
try:
|
||||
# Ensure directory exists
|
||||
config_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if config_file.suffix in ['.yml', '.yaml']:
|
||||
content = yaml.dump(config, default_flow_style=False, sort_keys=True)
|
||||
elif config_file.suffix == '.json':
|
||||
content = json.dumps(config, indent=2, sort_keys=True)
|
||||
else:
|
||||
# Default to YAML
|
||||
content = yaml.dump(config, default_flow_style=False, sort_keys=True)
|
||||
|
||||
config_file.write_text(content)
|
||||
|
||||
except Exception as e:
|
||||
raise ValueError(f"Failed to save config file {config_file}: {e}")
|
||||
|
||||
def _set_nested_value(self, config: Dict[str, Any], key: str, value: str) -> None:
|
||||
"""Set a nested configuration value using dot notation."""
|
||||
keys = key.split('.')
|
||||
current = config
|
||||
|
||||
# Navigate to the parent of the target key
|
||||
for k in keys[:-1]:
|
||||
if k not in current:
|
||||
current[k] = {}
|
||||
current = current[k]
|
||||
|
||||
# Set the final value
|
||||
final_key = keys[-1]
|
||||
|
||||
# Try to convert value to appropriate type
|
||||
converted_value = self._convert_value(value)
|
||||
current[final_key] = converted_value
|
||||
|
||||
def _convert_value(self, value: str) -> Any:
|
||||
"""Convert string value to appropriate type."""
|
||||
# Handle boolean values
|
||||
if value.lower() in ('true', 'yes', 'on', '1'):
|
||||
return True
|
||||
elif value.lower() in ('false', 'no', 'off', '0'):
|
||||
return False
|
||||
|
||||
# Try to convert to number
|
||||
try:
|
||||
if '.' in value:
|
||||
return float(value)
|
||||
else:
|
||||
return int(value)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
# Return as string
|
||||
return value
|
||||
|
||||
def _validate_config_value(self, key: str, value: str) -> List[str]:
|
||||
"""Validate a configuration value."""
|
||||
errors = []
|
||||
|
||||
# Path validation
|
||||
if any(path_key in key.lower() for path_key in ['dir', 'path', 'file']):
|
||||
if value and not self._is_valid_path(value):
|
||||
errors.append(f"'{value}' is not a valid path format")
|
||||
|
||||
# URL validation
|
||||
if 'url' in key.lower():
|
||||
if value and not self._is_valid_url(value):
|
||||
errors.append(f"'{value}' is not a valid URL format")
|
||||
|
||||
# Required field validation for setting values (not for display)
|
||||
# We only enforce required fields when explicitly setting them
|
||||
if key == 'gitea_url' and not value.strip():
|
||||
errors.append(f"'{key}' is required and cannot be empty")
|
||||
elif key == 'database_path' and not value.strip():
|
||||
errors.append(f"'{key}' is required and cannot be empty")
|
||||
|
||||
return errors
|
||||
|
||||
def _is_valid_path(self, path_str: str) -> bool:
|
||||
"""Check if a string represents a valid path."""
|
||||
try:
|
||||
Path(path_str)
|
||||
return True
|
||||
except (ValueError, OSError):
|
||||
return False
|
||||
|
||||
def _is_valid_url(self, url_str: str) -> bool:
|
||||
"""Check if a string represents a valid URL."""
|
||||
import re
|
||||
url_pattern = re.compile(
|
||||
r'^https?://' # http:// or https://
|
||||
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' # domain...
|
||||
r'localhost|' # localhost...
|
||||
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip
|
||||
r'(?::\d+)?' # optional port
|
||||
r'(?:/?|[/?]\S+)$', re.IGNORECASE)
|
||||
return url_pattern.match(url_str) is not None
|
||||
|
||||
def initialize_project_config(self, project_dir: Optional[Path] = None, interactive: bool = True) -> Dict[str, Any]:
|
||||
"""Initialize configuration for a new project."""
|
||||
target_dir = project_dir or Path.cwd()
|
||||
config_file = target_dir / '.markitect.yml'
|
||||
|
||||
# Default configuration template
|
||||
default_config = {
|
||||
'gitea_url': 'http://localhost:3000',
|
||||
'repo_owner': '',
|
||||
'repo_name': target_dir.name,
|
||||
'workspace_dir': '.markitect_workspace',
|
||||
'cache_dir': '.ast_cache',
|
||||
'tests_dir': 'tests',
|
||||
'test_file_pattern': 'test_issue_{issue_num}_{scenario}.py',
|
||||
'claude_code_command': 'claude'
|
||||
}
|
||||
|
||||
if interactive:
|
||||
default_config = self._interactive_config_setup(default_config)
|
||||
|
||||
# Create necessary directories
|
||||
workspace_dir = target_dir / default_config['workspace_dir']
|
||||
cache_dir = target_dir / default_config['cache_dir']
|
||||
tests_dir = target_dir / default_config['tests_dir']
|
||||
|
||||
for directory in [workspace_dir, cache_dir, tests_dir]:
|
||||
directory.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Save configuration
|
||||
self._save_config_file(default_config, config_file)
|
||||
|
||||
return {
|
||||
'config_file': str(config_file),
|
||||
'config': default_config,
|
||||
'created_directories': [str(workspace_dir), str(cache_dir), str(tests_dir)]
|
||||
}
|
||||
|
||||
def _interactive_config_setup(self, config: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Interactive configuration setup (would be used with CLI)."""
|
||||
# This method would be called by CLI with click.prompt()
|
||||
# For now, return the default config
|
||||
# The CLI integration will handle the interactive prompts
|
||||
return config
|
||||
|
||||
def list_config_keys(self) -> List[Tuple[str, str, Any]]:
|
||||
"""List all available configuration keys with descriptions."""
|
||||
config_schema = [
|
||||
('gitea_url', 'Gitea server URL', 'http://localhost:3000'),
|
||||
('repo_owner', 'Repository owner/organization', ''),
|
||||
('repo_name', 'Repository name', ''),
|
||||
('api_token', 'API token for authentication', ''),
|
||||
('workspace_dir', 'Workspace directory path', '.markitect_workspace'),
|
||||
('database_path', 'Database file path', '~/.markitect/markitect.db'),
|
||||
('cache_dir', 'AST cache directory', '.ast_cache'),
|
||||
('tests_dir', 'Tests directory', 'tests'),
|
||||
('test_file_pattern', 'Test file naming pattern', 'test_issue_{issue_num}_{scenario}.py'),
|
||||
('claude_code_command', 'Claude Code command', 'claude'),
|
||||
]
|
||||
|
||||
return config_schema
|
||||
|
||||
def get_config_help(self, key: Optional[str] = None) -> str:
|
||||
"""Get help information for configuration."""
|
||||
if key:
|
||||
# Get help for specific key
|
||||
for config_key, description, default in self.list_config_keys():
|
||||
if config_key == key:
|
||||
return f"{config_key}: {description} (default: {default})"
|
||||
return f"Unknown configuration key: {key}"
|
||||
else:
|
||||
# Get general help
|
||||
lines = ["Available configuration keys:"]
|
||||
for config_key, description, default in self.list_config_keys():
|
||||
lines.append(f" {config_key:<20} {description}")
|
||||
return '\n'.join(lines)
|
||||
|
||||
def validate_configuration(self, config: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
|
||||
"""Validate current or provided configuration."""
|
||||
if config is None:
|
||||
config = self.get_current_config()
|
||||
|
||||
validation_results = []
|
||||
|
||||
# Check required fields
|
||||
required_fields = ['gitea_url', 'database_path']
|
||||
for field in required_fields:
|
||||
if field not in config or not config[field]:
|
||||
validation_results.append({
|
||||
'key': field,
|
||||
'status': 'error',
|
||||
'message': f'Required field {field} is missing or empty'
|
||||
})
|
||||
else:
|
||||
validation_results.append({
|
||||
'key': field,
|
||||
'status': 'ok',
|
||||
'message': f'{field} is configured'
|
||||
})
|
||||
|
||||
# Check paths exist or are creatable
|
||||
path_fields = ['workspace_dir', 'cache_dir', 'tests_dir']
|
||||
for field in path_fields:
|
||||
if field in config:
|
||||
path = Path(config[field])
|
||||
if path.exists():
|
||||
validation_results.append({
|
||||
'key': field,
|
||||
'status': 'ok',
|
||||
'message': f'{field} exists at {path}'
|
||||
})
|
||||
else:
|
||||
try:
|
||||
path.mkdir(parents=True, exist_ok=True)
|
||||
validation_results.append({
|
||||
'key': field,
|
||||
'status': 'warning',
|
||||
'message': f'{field} created at {path}'
|
||||
})
|
||||
except OSError as e:
|
||||
validation_results.append({
|
||||
'key': field,
|
||||
'status': 'error',
|
||||
'message': f'{field} cannot be created: {e}'
|
||||
})
|
||||
|
||||
# Check database path parent directory
|
||||
if 'database_path' in config:
|
||||
db_path = Path(config['database_path'])
|
||||
db_parent = db_path.parent
|
||||
if not db_parent.exists():
|
||||
try:
|
||||
db_parent.mkdir(parents=True, exist_ok=True)
|
||||
validation_results.append({
|
||||
'key': 'database_path',
|
||||
'status': 'warning',
|
||||
'message': f'Database directory created at {db_parent}'
|
||||
})
|
||||
except OSError as e:
|
||||
validation_results.append({
|
||||
'key': 'database_path',
|
||||
'status': 'error',
|
||||
'message': f'Cannot create database directory: {e}'
|
||||
})
|
||||
else:
|
||||
validation_results.append({
|
||||
'key': 'database_path',
|
||||
'status': 'ok',
|
||||
'message': f'Database directory exists at {db_parent}'
|
||||
})
|
||||
|
||||
return validation_results
|
||||
@@ -3,11 +3,44 @@ Data-driven Draft Generator for Issue #56: Generate multiple drafts from data so
|
||||
|
||||
This module provides functionality to create multiple markdown documents from JSON schemas
|
||||
and data sources (JSON, CSV) with field mapping support.
|
||||
|
||||
Examples:
|
||||
Basic usage with JSON data:
|
||||
>>> generator = DraftGenerator()
|
||||
>>> schema = {...} # JSON schema with field mappings
|
||||
>>> data = [{"name": "John", "role": "Developer"}]
|
||||
>>> files = generator.generate_drafts_from_data_source(
|
||||
... schema, data, Path("./output")
|
||||
... )
|
||||
|
||||
Using with CSV file:
|
||||
>>> files = generator.generate_drafts_from_data_source(
|
||||
... schema, Path("data.csv"), Path("./output")
|
||||
... )
|
||||
|
||||
Field mapping is configured in the schema using x-markitect-field-mapping extension:
|
||||
{
|
||||
"properties": {
|
||||
"headings": {
|
||||
"properties": {
|
||||
"level_1": {
|
||||
"x-markitect-field-mapping": {"const": "name"}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Architecture:
|
||||
The DraftGenerator extends the existing StubGenerator to add data-driven
|
||||
capabilities. It processes data sources, validates compatibility with schemas,
|
||||
and generates multiple document drafts with populated content.
|
||||
"""
|
||||
|
||||
import json
|
||||
import csv
|
||||
import io
|
||||
import copy
|
||||
from pathlib import Path
|
||||
from typing import Dict, Any, List, Optional, Union
|
||||
from .stub_generator import StubGenerator
|
||||
@@ -163,7 +196,6 @@ class DraftGenerator:
|
||||
def _apply_field_mapping(self, schema: Dict[str, Any], record: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Apply field mapping to populate schema content areas with data."""
|
||||
# Create a deep copy of the schema
|
||||
import copy
|
||||
populated_schema = copy.deepcopy(schema)
|
||||
|
||||
# Apply title mapping if exists
|
||||
@@ -175,15 +207,26 @@ class DraftGenerator:
|
||||
def _generate_filename(self, record: Dict[str, Any], index: int) -> str:
|
||||
"""Generate appropriate filename for the draft."""
|
||||
# Try to use common identifying fields
|
||||
for field in ['name', 'title', 'id']:
|
||||
identifier_fields = ['name', 'title', 'id']
|
||||
|
||||
for field in identifier_fields:
|
||||
if field in record and record[field]:
|
||||
# Sanitize filename
|
||||
name = str(record[field]).replace(' ', '_').replace('/', '_')
|
||||
name = self._sanitize_filename(str(record[field]))
|
||||
return f"{name}.md"
|
||||
|
||||
# Fall back to index-based naming
|
||||
return f"draft_{index + 1:03d}.md"
|
||||
|
||||
def _sanitize_filename(self, filename: str) -> str:
|
||||
"""Sanitize a string to be safe for use as a filename."""
|
||||
# Replace problematic characters with underscores
|
||||
unsafe_chars = [' ', '/', '\\', ':', '*', '?', '"', '<', '>', '|']
|
||||
sanitized = filename
|
||||
for char in unsafe_chars:
|
||||
sanitized = sanitized.replace(char, '_')
|
||||
return sanitized
|
||||
|
||||
def _generate_draft_content(self, schema: Dict[str, Any], record: Dict[str, Any], schema_file_path: Optional[str] = None) -> str:
|
||||
"""Generate the actual draft content from populated schema."""
|
||||
# Use the existing stub generator as the base
|
||||
@@ -194,20 +237,43 @@ class DraftGenerator:
|
||||
)
|
||||
|
||||
# Add data-driven enhancements - replace placeholders with actual data
|
||||
content = self._apply_data_replacements(content, record)
|
||||
|
||||
return content
|
||||
|
||||
def _apply_data_replacements(self, content: str, record: Dict[str, Any]) -> str:
|
||||
"""Apply data replacements to content using various replacement strategies."""
|
||||
for field_name, field_value in record.items():
|
||||
# Simple replacement strategy for testing
|
||||
placeholder_pattern = f"TODO: Add content for {field_name}"
|
||||
if placeholder_pattern in content:
|
||||
content = content.replace(placeholder_pattern, str(field_value))
|
||||
content = self._apply_field_replacements(content, field_name, str(field_value))
|
||||
|
||||
# Replace template variables in content instructions (e.g., {role} -> Software Engineer)
|
||||
template_pattern = f"{{{field_name}}}"
|
||||
if template_pattern in content:
|
||||
content = content.replace(template_pattern, str(field_value))
|
||||
return content
|
||||
|
||||
# Also try to replace role-specific content
|
||||
if field_name == 'role':
|
||||
content = content.replace("TODO: Add content for introduction section.", f"Role: {field_value}")
|
||||
content = content.replace("TODO: Add content for section_level_2 section.", f"Department information and role details for {field_value}")
|
||||
def _apply_field_replacements(self, content: str, field_name: str, field_value: str) -> str:
|
||||
"""Apply all replacement patterns for a specific field."""
|
||||
# Simple placeholder replacement
|
||||
placeholder_pattern = f"TODO: Add content for {field_name}"
|
||||
if placeholder_pattern in content:
|
||||
content = content.replace(placeholder_pattern, field_value)
|
||||
|
||||
# Template variable replacement (e.g., {role} -> Software Engineer)
|
||||
template_pattern = f"{{{field_name}}}"
|
||||
if template_pattern in content:
|
||||
content = content.replace(template_pattern, field_value)
|
||||
|
||||
# Role-specific content replacement (can be extended for other field types)
|
||||
if field_name == 'role':
|
||||
content = self._apply_role_specific_replacements(content, field_value)
|
||||
|
||||
return content
|
||||
|
||||
def _apply_role_specific_replacements(self, content: str, role_value: str) -> str:
|
||||
"""Apply role-specific content replacements."""
|
||||
replacements = {
|
||||
"TODO: Add content for introduction section.": f"Role: {role_value}",
|
||||
"TODO: Add content for section_level_2 section.": f"Department information and role details for {role_value}"
|
||||
}
|
||||
|
||||
for old_text, new_text in replacements.items():
|
||||
content = content.replace(old_text, new_text)
|
||||
|
||||
return content
|
||||
653
tests/test_issue_17_batch_processing.py
Normal file
653
tests/test_issue_17_batch_processing.py
Normal file
@@ -0,0 +1,653 @@
|
||||
"""
|
||||
Tests for Issue #17: Batch Processing and Recursive Operations
|
||||
|
||||
This test suite verifies the batch processing functionality including:
|
||||
- Directory processing with recursive support
|
||||
- Glob pattern matching for file selection
|
||||
- Progress tracking and error handling
|
||||
- Depth control for recursive operations
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import tempfile
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from unittest.mock import Mock, patch, MagicMock
|
||||
from click.testing import CliRunner
|
||||
|
||||
from markitect.batch_processor import (
|
||||
BatchProcessor, ProcessingMode, ErrorHandling,
|
||||
ProcessingResult, BatchResult, ProgressTracker,
|
||||
create_file_processor
|
||||
)
|
||||
from markitect.cli import cli
|
||||
|
||||
|
||||
class TestBatchProcessor:
|
||||
"""Test the core BatchProcessor functionality."""
|
||||
|
||||
def setup_method(self):
|
||||
"""Set up test environment."""
|
||||
self.temp_dir = tempfile.mkdtemp()
|
||||
self.test_dir = Path(self.temp_dir)
|
||||
|
||||
def teardown_method(self):
|
||||
"""Clean up test environment."""
|
||||
shutil.rmtree(self.temp_dir)
|
||||
|
||||
def create_test_files(self, structure):
|
||||
"""Create test file structure from dict."""
|
||||
for path, content in structure.items():
|
||||
file_path = self.test_dir / path
|
||||
file_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
file_path.write_text(content)
|
||||
|
||||
def test_find_markdown_files_non_recursive(self):
|
||||
"""Test finding markdown files without recursion."""
|
||||
# Create test structure
|
||||
self.create_test_files({
|
||||
'file1.md': '# Test 1',
|
||||
'file2.md': '# Test 2',
|
||||
'file3.txt': 'Not markdown',
|
||||
'subdir/file4.md': '# Test 4'
|
||||
})
|
||||
|
||||
processor = BatchProcessor()
|
||||
files = processor.find_markdown_files(self.test_dir, recursive=False)
|
||||
|
||||
# Should find only files in root directory
|
||||
assert len(files) == 2
|
||||
file_names = [f.name for f in files]
|
||||
assert 'file1.md' in file_names
|
||||
assert 'file2.md' in file_names
|
||||
assert 'file4.md' not in file_names
|
||||
|
||||
def test_find_markdown_files_recursive(self):
|
||||
"""Test finding markdown files with recursion."""
|
||||
# Create test structure
|
||||
self.create_test_files({
|
||||
'file1.md': '# Test 1',
|
||||
'subdir/file2.md': '# Test 2',
|
||||
'subdir/nested/file3.md': '# Test 3',
|
||||
'subdir/file4.txt': 'Not markdown'
|
||||
})
|
||||
|
||||
processor = BatchProcessor()
|
||||
files = processor.find_markdown_files(self.test_dir, recursive=True)
|
||||
|
||||
# Should find all markdown files
|
||||
assert len(files) == 3
|
||||
file_names = [f.name for f in files]
|
||||
assert 'file1.md' in file_names
|
||||
assert 'file2.md' in file_names
|
||||
assert 'file3.md' in file_names
|
||||
|
||||
def test_find_markdown_files_with_depth_limit(self):
|
||||
"""Test recursive search with depth limit."""
|
||||
# Create test structure
|
||||
self.create_test_files({
|
||||
'file1.md': '# Test 1',
|
||||
'level1/file2.md': '# Test 2',
|
||||
'level1/level2/file3.md': '# Test 3',
|
||||
'level1/level2/level3/file4.md': '# Test 4'
|
||||
})
|
||||
|
||||
processor = BatchProcessor()
|
||||
files = processor.find_markdown_files(self.test_dir, recursive=True, depth=1)
|
||||
|
||||
# Should find files up to depth 1
|
||||
assert len(files) == 2
|
||||
file_names = [f.name for f in files]
|
||||
assert 'file1.md' in file_names
|
||||
assert 'file2.md' in file_names
|
||||
assert 'file3.md' not in file_names
|
||||
assert 'file4.md' not in file_names
|
||||
|
||||
def test_find_markdown_files_with_pattern(self):
|
||||
"""Test finding files with custom pattern."""
|
||||
# Create test structure
|
||||
self.create_test_files({
|
||||
'file1.md': '# Test 1',
|
||||
'file2.markdown': '# Test 2',
|
||||
'file3.txt': 'Not markdown'
|
||||
})
|
||||
|
||||
processor = BatchProcessor()
|
||||
files = processor.find_markdown_files(self.test_dir, pattern='*.markdown')
|
||||
|
||||
# Should find only .markdown files
|
||||
assert len(files) == 1
|
||||
assert files[0].name == 'file2.markdown'
|
||||
|
||||
def test_find_files_by_glob(self):
|
||||
"""Test glob pattern file finding."""
|
||||
# Create test structure
|
||||
self.create_test_files({
|
||||
'docs/file1.md': '# Test 1',
|
||||
'docs/subdir/file2.md': '# Test 2',
|
||||
'src/file3.md': '# Test 3',
|
||||
'file4.txt': 'Not markdown'
|
||||
})
|
||||
|
||||
processor = BatchProcessor()
|
||||
|
||||
# Test recursive glob
|
||||
files = processor.find_files_by_glob(str(self.test_dir / "**/*.md"))
|
||||
assert len(files) == 3
|
||||
|
||||
# Test specific directory glob
|
||||
files = processor.find_files_by_glob(str(self.test_dir / "docs/*.md"))
|
||||
assert len(files) == 1
|
||||
assert files[0].name == 'file1.md'
|
||||
|
||||
def test_process_files_success(self):
|
||||
"""Test successful file processing."""
|
||||
# Create test files
|
||||
self.create_test_files({
|
||||
'file1.md': '# Test 1',
|
||||
'file2.md': '# Test 2'
|
||||
})
|
||||
|
||||
processor = BatchProcessor(show_progress=False)
|
||||
files = list(self.test_dir.glob('*.md'))
|
||||
|
||||
def mock_processor(file_path):
|
||||
return ProcessingResult(
|
||||
file_path=file_path,
|
||||
success=True,
|
||||
message="Processed successfully"
|
||||
)
|
||||
|
||||
result = processor.process_files(files, mock_processor, "Testing")
|
||||
|
||||
assert result.total_files == 2
|
||||
assert result.processed == 2
|
||||
assert result.succeeded == 2
|
||||
assert result.failed == 0
|
||||
assert result.skipped == 0
|
||||
|
||||
def test_process_files_with_errors(self):
|
||||
"""Test file processing with errors."""
|
||||
# Create test files
|
||||
self.create_test_files({
|
||||
'file1.md': '# Test 1',
|
||||
'file2.md': '# Test 2',
|
||||
'file3.md': '# Test 3'
|
||||
})
|
||||
|
||||
processor = BatchProcessor(show_progress=False, error_handling=ErrorHandling.CONTINUE)
|
||||
files = list(self.test_dir.glob('*.md'))
|
||||
|
||||
def mock_processor(file_path):
|
||||
# Fail on file2.md
|
||||
if file_path.name == 'file2.md':
|
||||
return ProcessingResult(
|
||||
file_path=file_path,
|
||||
success=False,
|
||||
message="Processing failed",
|
||||
error="Mock error"
|
||||
)
|
||||
return ProcessingResult(
|
||||
file_path=file_path,
|
||||
success=True,
|
||||
message="Processed successfully"
|
||||
)
|
||||
|
||||
result = processor.process_files(files, mock_processor, "Testing")
|
||||
|
||||
assert result.total_files == 3
|
||||
assert result.processed == 3
|
||||
assert result.succeeded == 2
|
||||
assert result.failed == 1
|
||||
assert len(result.errors) == 1
|
||||
|
||||
def test_process_files_stop_on_error(self):
|
||||
"""Test stop-on-error behavior."""
|
||||
# Create test files
|
||||
self.create_test_files({
|
||||
'file1.md': '# Test 1',
|
||||
'file2.md': '# Test 2',
|
||||
'file3.md': '# Test 3'
|
||||
})
|
||||
|
||||
processor = BatchProcessor(show_progress=False, error_handling=ErrorHandling.STOP)
|
||||
files = sorted(list(self.test_dir.glob('*.md')))
|
||||
|
||||
def mock_processor(file_path):
|
||||
# Fail on second file
|
||||
if file_path.name == 'file2.md':
|
||||
return ProcessingResult(
|
||||
file_path=file_path,
|
||||
success=False,
|
||||
message="Processing failed",
|
||||
error="Mock error"
|
||||
)
|
||||
return ProcessingResult(
|
||||
file_path=file_path,
|
||||
success=True,
|
||||
message="Processed successfully"
|
||||
)
|
||||
|
||||
result = processor.process_files(files, mock_processor, "Testing")
|
||||
|
||||
# Should stop after the error
|
||||
assert result.processed == 2 # file1 success, file2 error
|
||||
assert result.succeeded == 1
|
||||
assert result.failed == 1
|
||||
|
||||
|
||||
class TestProgressTracker:
|
||||
"""Test the ProgressTracker functionality."""
|
||||
|
||||
def test_progress_tracking(self):
|
||||
"""Test basic progress tracking."""
|
||||
tracker = ProgressTracker(total=3, show_progress=False)
|
||||
|
||||
# Test successful processing
|
||||
result1 = ProcessingResult(Path("file1.md"), True, "Success")
|
||||
tracker.update(result1)
|
||||
|
||||
assert tracker.processed == 1
|
||||
assert tracker.succeeded == 1
|
||||
assert tracker.failed == 0
|
||||
|
||||
# Test failed processing
|
||||
result2 = ProcessingResult(Path("file2.md"), False, "Failed", "Error message")
|
||||
tracker.update(result2)
|
||||
|
||||
assert tracker.processed == 2
|
||||
assert tracker.succeeded == 1
|
||||
assert tracker.failed == 1
|
||||
|
||||
# Test skipped file
|
||||
tracker.skip_file(Path("file3.md"), "Skipped reason")
|
||||
assert tracker.skipped == 1
|
||||
|
||||
|
||||
class TestFileProcessor:
|
||||
"""Test the file processor creation and execution."""
|
||||
|
||||
def setup_method(self):
|
||||
"""Set up test environment."""
|
||||
self.temp_dir = tempfile.mkdtemp()
|
||||
self.test_dir = Path(self.temp_dir)
|
||||
|
||||
def teardown_method(self):
|
||||
"""Clean up test environment."""
|
||||
shutil.rmtree(self.temp_dir)
|
||||
|
||||
@patch('markitect.database.DatabaseManager')
|
||||
def test_ingest_processor(self, mock_db_manager):
|
||||
"""Test file processor for ingestion."""
|
||||
# Create test file
|
||||
test_file = self.test_dir / "test.md"
|
||||
test_file.write_text("# Test content")
|
||||
|
||||
# Mock database manager
|
||||
mock_db = Mock()
|
||||
mock_db_manager.return_value = mock_db
|
||||
|
||||
config = {'database': 'test.db'}
|
||||
processor = create_file_processor(config, ProcessingMode.INGEST)
|
||||
|
||||
result = processor(test_file)
|
||||
|
||||
assert result.success
|
||||
assert result.file_path == test_file
|
||||
assert "Ingested successfully" in result.message
|
||||
mock_db.store_document.assert_called_once()
|
||||
|
||||
@patch('markitect.database.DatabaseManager')
|
||||
def test_status_processor(self, mock_db_manager):
|
||||
"""Test file processor for status checking."""
|
||||
# Create test file
|
||||
test_file = self.test_dir / "test.md"
|
||||
test_file.write_text("# Test content")
|
||||
|
||||
# Mock database manager
|
||||
mock_db = Mock()
|
||||
mock_db.get_metadata.return_value = {'id': 'test123'}
|
||||
mock_db_manager.return_value = mock_db
|
||||
|
||||
config = {'database': 'test.db'}
|
||||
processor = create_file_processor(config, ProcessingMode.STATUS)
|
||||
|
||||
result = processor(test_file)
|
||||
|
||||
assert result.success
|
||||
assert result.file_path == test_file
|
||||
assert "Found in database" in result.message
|
||||
|
||||
def test_validate_processor(self):
|
||||
"""Test file processor for validation."""
|
||||
# Create test file
|
||||
test_file = self.test_dir / "test.md"
|
||||
test_file.write_text("# Test content")
|
||||
|
||||
config = {}
|
||||
processor = create_file_processor(config, ProcessingMode.VALIDATE)
|
||||
|
||||
result = processor(test_file)
|
||||
|
||||
assert result.success
|
||||
assert result.file_path == test_file
|
||||
assert "Valid markdown" in result.message
|
||||
|
||||
def test_validate_processor_empty_file(self):
|
||||
"""Test validation processor with empty file."""
|
||||
# Create empty file
|
||||
test_file = self.test_dir / "empty.md"
|
||||
test_file.write_text("")
|
||||
|
||||
config = {}
|
||||
processor = create_file_processor(config, ProcessingMode.VALIDATE)
|
||||
|
||||
result = processor(test_file)
|
||||
|
||||
assert not result.success
|
||||
assert "File is empty" in result.error
|
||||
|
||||
|
||||
class TestCLIIntegration:
|
||||
"""Test CLI command integration."""
|
||||
|
||||
def setup_method(self):
|
||||
"""Set up test environment."""
|
||||
self.temp_dir = tempfile.mkdtemp()
|
||||
self.test_dir = Path(self.temp_dir)
|
||||
self.runner = CliRunner()
|
||||
|
||||
def teardown_method(self):
|
||||
"""Clean up test environment."""
|
||||
shutil.rmtree(self.temp_dir)
|
||||
|
||||
def create_test_files(self, structure):
|
||||
"""Create test file structure from dict."""
|
||||
for path, content in structure.items():
|
||||
file_path = self.test_dir / path
|
||||
file_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
file_path.write_text(content)
|
||||
|
||||
@patch('markitect.database.DatabaseManager')
|
||||
def test_ingest_dir_command(self, mock_db_manager):
|
||||
"""Test ingest-dir CLI command."""
|
||||
# Create test files
|
||||
self.create_test_files({
|
||||
'file1.md': '# Test 1',
|
||||
'file2.md': '# Test 2',
|
||||
'subdir/file3.md': '# Test 3'
|
||||
})
|
||||
|
||||
# Mock database
|
||||
mock_db = Mock()
|
||||
mock_db_manager.return_value = mock_db
|
||||
|
||||
result = self.runner.invoke(cli, [
|
||||
'ingest-dir', str(self.test_dir),
|
||||
'--quiet'
|
||||
])
|
||||
|
||||
assert result.exit_code == 0
|
||||
# Should process 2 files (non-recursive by default)
|
||||
assert mock_db.store_document.call_count == 2
|
||||
|
||||
@patch('markitect.database.DatabaseManager')
|
||||
def test_ingest_dir_recursive(self, mock_db_manager):
|
||||
"""Test ingest-dir with recursive option."""
|
||||
# Create test files
|
||||
self.create_test_files({
|
||||
'file1.md': '# Test 1',
|
||||
'subdir/file2.md': '# Test 2',
|
||||
'subdir/nested/file3.md': '# Test 3'
|
||||
})
|
||||
|
||||
# Mock database
|
||||
mock_db = Mock()
|
||||
mock_db_manager.return_value = mock_db
|
||||
|
||||
result = self.runner.invoke(cli, [
|
||||
'ingest-dir', str(self.test_dir),
|
||||
'--recursive',
|
||||
'--quiet'
|
||||
])
|
||||
|
||||
assert result.exit_code == 0
|
||||
# Should process all 3 files
|
||||
assert mock_db.store_document.call_count == 3
|
||||
|
||||
@patch('markitect.database.DatabaseManager')
|
||||
def test_batch_process_command(self, mock_db_manager):
|
||||
"""Test batch-process CLI command."""
|
||||
# Create test files
|
||||
self.create_test_files({
|
||||
'docs/file1.md': '# Test 1',
|
||||
'docs/file2.md': '# Test 2',
|
||||
'src/file3.md': '# Test 3'
|
||||
})
|
||||
|
||||
# Mock database
|
||||
mock_db = Mock()
|
||||
mock_db_manager.return_value = mock_db
|
||||
|
||||
# Test glob pattern
|
||||
pattern = str(self.test_dir / "docs/*.md")
|
||||
result = self.runner.invoke(cli, [
|
||||
'batch-process', pattern,
|
||||
'--operation', 'ingest',
|
||||
'--quiet'
|
||||
])
|
||||
|
||||
assert result.exit_code == 0
|
||||
# Should process 2 files from docs directory
|
||||
assert mock_db.store_document.call_count == 2
|
||||
|
||||
@patch('markitect.database.DatabaseManager')
|
||||
def test_recursive_command(self, mock_db_manager):
|
||||
"""Test recursive CLI command."""
|
||||
# Create test files
|
||||
self.create_test_files({
|
||||
'level1/file1.md': '# Test 1',
|
||||
'level1/level2/file2.md': '# Test 2',
|
||||
'level1/level2/level3/file3.md': '# Test 3'
|
||||
})
|
||||
|
||||
# Mock database
|
||||
mock_db = Mock()
|
||||
mock_db.get_metadata.side_effect = Exception("Not found")
|
||||
mock_db_manager.return_value = mock_db
|
||||
|
||||
result = self.runner.invoke(cli, [
|
||||
'recursive', str(self.test_dir),
|
||||
'--depth', '2',
|
||||
'--operation', 'status',
|
||||
'--quiet'
|
||||
])
|
||||
|
||||
assert result.exit_code == 0
|
||||
# Should check status for files up to depth 2
|
||||
assert mock_db.get_metadata.call_count == 2
|
||||
|
||||
def test_error_handling_stop(self):
|
||||
"""Test error handling with stop strategy."""
|
||||
# Create test directory with no files
|
||||
result = self.runner.invoke(cli, [
|
||||
'ingest-dir', str(self.test_dir),
|
||||
'--error-handling', 'stop',
|
||||
'--quiet'
|
||||
])
|
||||
|
||||
# Should exit cleanly when no files found
|
||||
assert result.exit_code == 0
|
||||
|
||||
def test_invalid_directory(self):
|
||||
"""Test handling of invalid directory."""
|
||||
result = self.runner.invoke(cli, [
|
||||
'ingest-dir', '/nonexistent/directory',
|
||||
'--quiet'
|
||||
])
|
||||
|
||||
# Should exit with error
|
||||
assert result.exit_code == 2 # Click argument validation error
|
||||
|
||||
@patch('markitect.database.DatabaseManager')
|
||||
def test_custom_pattern(self, mock_db_manager):
|
||||
"""Test custom file pattern matching."""
|
||||
# Create test files with different extensions
|
||||
self.create_test_files({
|
||||
'file1.md': '# Test 1',
|
||||
'file2.markdown': '# Test 2',
|
||||
'file3.txt': 'Not markdown'
|
||||
})
|
||||
|
||||
# Mock database
|
||||
mock_db = Mock()
|
||||
mock_db_manager.return_value = mock_db
|
||||
|
||||
result = self.runner.invoke(cli, [
|
||||
'ingest-dir', str(self.test_dir),
|
||||
'--pattern', '*.markdown',
|
||||
'--quiet'
|
||||
])
|
||||
|
||||
assert result.exit_code == 0
|
||||
# Should process only .markdown files
|
||||
assert mock_db.store_document.call_count == 1
|
||||
|
||||
|
||||
class TestErrorHandling:
|
||||
"""Test error handling scenarios."""
|
||||
|
||||
def setup_method(self):
|
||||
"""Set up test environment."""
|
||||
self.temp_dir = tempfile.mkdtemp()
|
||||
self.test_dir = Path(self.temp_dir)
|
||||
|
||||
def teardown_method(self):
|
||||
"""Clean up test environment."""
|
||||
shutil.rmtree(self.temp_dir)
|
||||
|
||||
def test_permission_error_handling(self):
|
||||
"""Test handling of permission errors."""
|
||||
processor = BatchProcessor(show_progress=False)
|
||||
|
||||
# Mock os.listdir to raise PermissionError
|
||||
with patch('pathlib.Path.iterdir') as mock_iterdir:
|
||||
mock_iterdir.side_effect = PermissionError("Permission denied")
|
||||
|
||||
files = processor.find_markdown_files(self.test_dir)
|
||||
# Should return empty list without crashing
|
||||
assert files == []
|
||||
|
||||
def test_nonexistent_directory(self):
|
||||
"""Test handling of nonexistent directories."""
|
||||
processor = BatchProcessor()
|
||||
|
||||
with pytest.raises(FileNotFoundError):
|
||||
processor.find_markdown_files(Path("/nonexistent/directory"))
|
||||
|
||||
def test_file_as_directory(self):
|
||||
"""Test handling when a file is passed as directory."""
|
||||
# Create a file
|
||||
test_file = self.test_dir / "test.md"
|
||||
test_file.write_text("# Test")
|
||||
|
||||
processor = BatchProcessor()
|
||||
|
||||
with pytest.raises(NotADirectoryError):
|
||||
processor.find_markdown_files(test_file)
|
||||
|
||||
|
||||
class TestEdgeCases:
|
||||
"""Test edge cases and boundary conditions."""
|
||||
|
||||
def setup_method(self):
|
||||
"""Set up test environment."""
|
||||
self.temp_dir = tempfile.mkdtemp()
|
||||
self.test_dir = Path(self.temp_dir)
|
||||
|
||||
def teardown_method(self):
|
||||
"""Clean up test environment."""
|
||||
shutil.rmtree(self.temp_dir)
|
||||
|
||||
def test_empty_directory(self):
|
||||
"""Test processing empty directory."""
|
||||
processor = BatchProcessor()
|
||||
files = processor.find_markdown_files(self.test_dir)
|
||||
assert files == []
|
||||
|
||||
def test_hidden_directories(self):
|
||||
"""Test that hidden directories are skipped."""
|
||||
# Create hidden directory
|
||||
hidden_dir = self.test_dir / ".hidden"
|
||||
hidden_dir.mkdir()
|
||||
(hidden_dir / "test.md").write_text("# Hidden")
|
||||
|
||||
processor = BatchProcessor()
|
||||
files = processor.find_markdown_files(self.test_dir, recursive=True)
|
||||
|
||||
# Should not find files in hidden directories
|
||||
assert len(files) == 0
|
||||
|
||||
def test_depth_zero(self):
|
||||
"""Test depth=0 behavior."""
|
||||
# Create nested structure
|
||||
(self.test_dir / "file1.md").write_text("# Test 1")
|
||||
subdir = self.test_dir / "subdir"
|
||||
subdir.mkdir()
|
||||
(subdir / "file2.md").write_text("# Test 2")
|
||||
|
||||
processor = BatchProcessor()
|
||||
files = processor.find_markdown_files(self.test_dir, recursive=True, depth=0)
|
||||
|
||||
# Depth 0 should only include files in the starting directory
|
||||
# With our corrected logic, this should only find file1.md
|
||||
assert len(files) == 1
|
||||
assert files[0].name == "file1.md"
|
||||
|
||||
def test_very_deep_structure(self):
|
||||
"""Test with very deep directory structure."""
|
||||
# Create 10-level deep structure
|
||||
# Start with a file at the root level
|
||||
(self.test_dir / "file_root.md").write_text("# Root Test")
|
||||
|
||||
current_dir = self.test_dir
|
||||
for i in range(10):
|
||||
current_dir = current_dir / f"level{i}"
|
||||
current_dir.mkdir()
|
||||
(current_dir / f"file{i}.md").write_text(f"# Test {i}")
|
||||
|
||||
processor = BatchProcessor()
|
||||
files = processor.find_markdown_files(self.test_dir, recursive=True, depth=5)
|
||||
|
||||
# Should find files up to depth 5
|
||||
# Root (depth 0) + levels 0-4 (depths 1-5) = 6 files
|
||||
assert len(files) == 6
|
||||
|
||||
def test_glob_with_no_matches(self):
|
||||
"""Test glob pattern with no matches."""
|
||||
processor = BatchProcessor()
|
||||
files = processor.find_files_by_glob(str(self.test_dir / "*.nonexistent"))
|
||||
assert files == []
|
||||
|
||||
def test_file_deleted_during_processing(self):
|
||||
"""Test handling file deletion during processing."""
|
||||
# Create test file
|
||||
test_file = self.test_dir / "test.md"
|
||||
test_file.write_text("# Test")
|
||||
|
||||
def mock_processor(file_path):
|
||||
# This test is actually checking the file existence in the process_files loop
|
||||
# not the processor function itself
|
||||
return ProcessingResult(file_path, True, "Processed")
|
||||
|
||||
processor = BatchProcessor(show_progress=False)
|
||||
files = [test_file]
|
||||
|
||||
# Delete the file after creating the file list but before processing
|
||||
test_file.unlink()
|
||||
|
||||
result = processor.process_files(files, mock_processor, "Testing")
|
||||
|
||||
# Should handle gracefully - file should be skipped
|
||||
assert result.skipped == 1
|
||||
assert result.processed == 0
|
||||
897
tests/test_issue_18_config_management.py
Normal file
897
tests/test_issue_18_config_management.py
Normal file
@@ -0,0 +1,897 @@
|
||||
"""
|
||||
Tests for Issue #18: Configuration and Environment Management CLI
|
||||
|
||||
This test suite verifies the configuration management functionality including:
|
||||
- Configuration display and management
|
||||
- Project initialization functionality
|
||||
- Configuration validation
|
||||
- Integration with existing config system
|
||||
- Environment variable handling
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import tempfile
|
||||
import shutil
|
||||
import os
|
||||
import json
|
||||
import yaml
|
||||
from pathlib import Path
|
||||
from unittest.mock import Mock, patch, MagicMock
|
||||
from click.testing import CliRunner
|
||||
|
||||
from markitect.config_manager import ConfigurationManager
|
||||
from markitect.cli import cli
|
||||
|
||||
|
||||
class TestConfigurationManager:
|
||||
"""Test the core ConfigurationManager functionality."""
|
||||
|
||||
def setup_method(self):
|
||||
"""Set up test environment."""
|
||||
self.temp_dir = tempfile.mkdtemp()
|
||||
self.test_dir = Path(self.temp_dir)
|
||||
self.original_cwd = os.getcwd()
|
||||
os.chdir(self.temp_dir)
|
||||
|
||||
def teardown_method(self):
|
||||
"""Clean up test environment."""
|
||||
os.chdir(self.original_cwd)
|
||||
shutil.rmtree(self.temp_dir)
|
||||
|
||||
def test_get_current_config(self):
|
||||
"""Test getting current configuration."""
|
||||
config_manager = ConfigurationManager()
|
||||
config = config_manager.get_current_config()
|
||||
|
||||
# Should have basic configuration keys
|
||||
expected_keys = [
|
||||
'gitea_url', 'repo_owner', 'repo_name', 'api_token',
|
||||
'workspace_dir', 'database_path', 'cache_dir', 'tests_dir'
|
||||
]
|
||||
|
||||
for key in expected_keys:
|
||||
assert key in config
|
||||
|
||||
# Should have metadata
|
||||
assert '_meta' in config
|
||||
assert 'config_sources' in config['_meta']
|
||||
assert 'env_variables' in config['_meta']
|
||||
assert 'working_directory' in config['_meta']
|
||||
|
||||
def test_display_config_yaml_format(self):
|
||||
"""Test configuration display in YAML format."""
|
||||
config_manager = ConfigurationManager()
|
||||
output = config_manager.display_config(output_format='yaml')
|
||||
|
||||
# Should be valid YAML
|
||||
parsed = yaml.safe_load(output)
|
||||
assert isinstance(parsed, dict)
|
||||
assert 'gitea_url' in parsed
|
||||
|
||||
def test_display_config_json_format(self):
|
||||
"""Test configuration display in JSON format."""
|
||||
config_manager = ConfigurationManager()
|
||||
output = config_manager.display_config(output_format='json')
|
||||
|
||||
# Should be valid JSON
|
||||
parsed = json.loads(output)
|
||||
assert isinstance(parsed, dict)
|
||||
assert 'gitea_url' in parsed
|
||||
|
||||
def test_display_config_simple_format(self):
|
||||
"""Test configuration display in simple format."""
|
||||
config_manager = ConfigurationManager()
|
||||
output = config_manager.display_config(output_format='simple')
|
||||
|
||||
# Should contain key=value pairs
|
||||
lines = output.split('\n')
|
||||
assert any('gitea_url =' in line for line in lines)
|
||||
assert any('repo_owner =' in line for line in lines)
|
||||
|
||||
def test_mask_sensitive_data(self):
|
||||
"""Test masking of sensitive configuration data."""
|
||||
config_manager = ConfigurationManager()
|
||||
|
||||
# Create config with sensitive data
|
||||
config = {
|
||||
'api_token': 'secret123',
|
||||
'password': 'mypassword',
|
||||
'gitea_url': 'http://localhost:3000',
|
||||
'repo_name': 'test-repo'
|
||||
}
|
||||
|
||||
masked = config_manager._mask_sensitive_data(config)
|
||||
|
||||
# Sensitive fields should be masked
|
||||
assert masked['api_token'] == '***MASKED***'
|
||||
assert masked['password'] == '***MASKED***'
|
||||
|
||||
# Non-sensitive fields should remain
|
||||
assert masked['gitea_url'] == 'http://localhost:3000'
|
||||
assert masked['repo_name'] == 'test-repo'
|
||||
|
||||
def test_mask_sensitive_data_preserves_empty_values(self):
|
||||
"""Test that empty sensitive values are not masked."""
|
||||
config_manager = ConfigurationManager()
|
||||
|
||||
config = {
|
||||
'api_token': '',
|
||||
'secret': None,
|
||||
'repo_name': 'test-repo'
|
||||
}
|
||||
|
||||
masked = config_manager._mask_sensitive_data(config)
|
||||
|
||||
# Empty/None values should not be masked
|
||||
assert masked['api_token'] == ''
|
||||
assert masked['secret'] is None
|
||||
assert masked['repo_name'] == 'test-repo'
|
||||
|
||||
def test_set_config_value_simple(self):
|
||||
"""Test setting a simple configuration value."""
|
||||
config_manager = ConfigurationManager()
|
||||
|
||||
success = config_manager.set_config_value('repo_name', 'test-repository')
|
||||
assert success
|
||||
|
||||
# Verify file was created
|
||||
config_file = self.test_dir / '.markitect.yml'
|
||||
assert config_file.exists()
|
||||
|
||||
# Verify content
|
||||
content = yaml.safe_load(config_file.read_text())
|
||||
assert content['repo_name'] == 'test-repository'
|
||||
|
||||
def test_set_config_value_nested(self):
|
||||
"""Test setting nested configuration value with dot notation."""
|
||||
config_manager = ConfigurationManager()
|
||||
|
||||
success = config_manager.set_config_value('gitea.url', 'http://example.com')
|
||||
assert success
|
||||
|
||||
# Verify nested structure
|
||||
config_file = self.test_dir / '.markitect.yml'
|
||||
content = yaml.safe_load(config_file.read_text())
|
||||
assert content['gitea']['url'] == 'http://example.com'
|
||||
|
||||
def test_set_config_value_type_conversion(self):
|
||||
"""Test automatic type conversion for configuration values."""
|
||||
config_manager = ConfigurationManager()
|
||||
|
||||
# Test boolean conversion
|
||||
config_manager.set_config_value('debug', 'true')
|
||||
config_manager.set_config_value('verbose', 'false')
|
||||
|
||||
# Test number conversion
|
||||
config_manager.set_config_value('port', '3000')
|
||||
config_manager.set_config_value('timeout', '30.5')
|
||||
|
||||
config_file = self.test_dir / '.markitect.yml'
|
||||
content = yaml.safe_load(config_file.read_text())
|
||||
|
||||
assert content['debug'] is True
|
||||
assert content['verbose'] is False
|
||||
assert content['port'] == 3000
|
||||
assert content['timeout'] == 30.5
|
||||
|
||||
def test_set_config_value_validation_error(self):
|
||||
"""Test configuration validation during value setting."""
|
||||
config_manager = ConfigurationManager()
|
||||
|
||||
# Test invalid URL
|
||||
with pytest.raises(ValueError, match="not a valid URL format"):
|
||||
config_manager.set_config_value('gitea_url', 'invalid-url')
|
||||
|
||||
def test_set_config_value_existing_file(self):
|
||||
"""Test setting values in existing configuration file."""
|
||||
# Create existing config file
|
||||
existing_config = {
|
||||
'repo_name': 'old-name',
|
||||
'gitea_url': 'http://localhost:3000'
|
||||
}
|
||||
config_file = self.test_dir / '.markitect.yml'
|
||||
config_file.write_text(yaml.dump(existing_config))
|
||||
|
||||
config_manager = ConfigurationManager()
|
||||
success = config_manager.set_config_value('repo_name', 'new-name')
|
||||
assert success
|
||||
|
||||
# Verify update
|
||||
content = yaml.safe_load(config_file.read_text())
|
||||
assert content['repo_name'] == 'new-name'
|
||||
assert content['gitea_url'] == 'http://localhost:3000' # Should be preserved
|
||||
|
||||
def test_set_config_value_custom_file(self):
|
||||
"""Test setting values in custom configuration file."""
|
||||
custom_file = self.test_dir / 'custom.yml'
|
||||
|
||||
config_manager = ConfigurationManager()
|
||||
success = config_manager.set_config_value('repo_name', 'test', str(custom_file))
|
||||
assert success
|
||||
|
||||
assert custom_file.exists()
|
||||
content = yaml.safe_load(custom_file.read_text())
|
||||
assert content['repo_name'] == 'test'
|
||||
|
||||
def test_initialize_project_config(self):
|
||||
"""Test project configuration initialization."""
|
||||
config_manager = ConfigurationManager()
|
||||
result = config_manager.initialize_project_config(self.test_dir, interactive=False)
|
||||
|
||||
# Verify config file created
|
||||
config_file = Path(result['config_file'])
|
||||
assert config_file.exists()
|
||||
assert config_file.name == '.markitect.yml'
|
||||
|
||||
# Verify directories created
|
||||
assert len(result['created_directories']) > 0
|
||||
for directory in result['created_directories']:
|
||||
assert Path(directory).exists()
|
||||
|
||||
# Verify config structure
|
||||
config = result['config']
|
||||
assert 'gitea_url' in config
|
||||
assert 'repo_name' in config
|
||||
assert config['repo_name'] == self.test_dir.name
|
||||
|
||||
def test_initialize_project_config_custom_dir(self):
|
||||
"""Test project initialization in custom directory."""
|
||||
project_dir = self.test_dir / 'my-project'
|
||||
|
||||
config_manager = ConfigurationManager()
|
||||
result = config_manager.initialize_project_config(project_dir, interactive=False)
|
||||
|
||||
# Verify correct location
|
||||
config_file = Path(result['config_file'])
|
||||
assert config_file.parent == project_dir
|
||||
assert project_dir.exists()
|
||||
|
||||
def test_validate_configuration_success(self):
|
||||
"""Test configuration validation with valid config."""
|
||||
config = {
|
||||
'gitea_url': 'http://localhost:3000',
|
||||
'database_path': str(self.test_dir / 'db.sqlite'),
|
||||
'workspace_dir': str(self.test_dir / 'workspace'),
|
||||
'cache_dir': str(self.test_dir / 'cache'),
|
||||
'tests_dir': str(self.test_dir / 'tests')
|
||||
}
|
||||
|
||||
config_manager = ConfigurationManager()
|
||||
results = config_manager.validate_configuration(config)
|
||||
|
||||
# Should have validation results
|
||||
assert len(results) > 0
|
||||
|
||||
# Check for required field validations
|
||||
required_checks = [r for r in results if r['key'] in ['gitea_url', 'database_path']]
|
||||
assert len(required_checks) > 0
|
||||
|
||||
def test_validate_configuration_missing_required(self):
|
||||
"""Test configuration validation with missing required fields."""
|
||||
config = {
|
||||
'repo_name': 'test'
|
||||
# Missing gitea_url and database_path
|
||||
}
|
||||
|
||||
config_manager = ConfigurationManager()
|
||||
results = config_manager.validate_configuration(config)
|
||||
|
||||
# Should have errors for missing required fields
|
||||
errors = [r for r in results if r['status'] == 'error']
|
||||
assert len(errors) > 0
|
||||
|
||||
error_keys = [r['key'] for r in errors]
|
||||
assert 'gitea_url' in error_keys or 'database_path' in error_keys
|
||||
|
||||
def test_validate_configuration_path_creation(self):
|
||||
"""Test configuration validation creates missing directories."""
|
||||
config = {
|
||||
'gitea_url': 'http://localhost:3000',
|
||||
'database_path': str(self.test_dir / 'data' / 'db.sqlite'),
|
||||
'workspace_dir': str(self.test_dir / 'new_workspace'),
|
||||
'cache_dir': str(self.test_dir / 'new_cache'),
|
||||
'tests_dir': str(self.test_dir / 'new_tests')
|
||||
}
|
||||
|
||||
config_manager = ConfigurationManager()
|
||||
results = config_manager.validate_configuration(config)
|
||||
|
||||
# Directories should be created
|
||||
assert (self.test_dir / 'new_workspace').exists()
|
||||
assert (self.test_dir / 'new_cache').exists()
|
||||
assert (self.test_dir / 'new_tests').exists()
|
||||
assert (self.test_dir / 'data').exists() # Database parent directory
|
||||
|
||||
# Should have warnings for created directories
|
||||
warnings = [r for r in results if r['status'] == 'warning']
|
||||
assert len(warnings) > 0
|
||||
|
||||
def test_list_config_keys(self):
|
||||
"""Test listing available configuration keys."""
|
||||
config_manager = ConfigurationManager()
|
||||
keys = config_manager.list_config_keys()
|
||||
|
||||
# Should return list of tuples
|
||||
assert isinstance(keys, list)
|
||||
assert len(keys) > 0
|
||||
|
||||
# Each item should be (key, description, default)
|
||||
for item in keys:
|
||||
assert len(item) == 3
|
||||
assert isinstance(item[0], str) # key
|
||||
assert isinstance(item[1], str) # description
|
||||
|
||||
# Should include expected keys
|
||||
key_names = [item[0] for item in keys]
|
||||
assert 'gitea_url' in key_names
|
||||
assert 'repo_name' in key_names
|
||||
assert 'api_token' in key_names
|
||||
|
||||
def test_get_config_help_specific_key(self):
|
||||
"""Test getting help for specific configuration key."""
|
||||
config_manager = ConfigurationManager()
|
||||
help_text = config_manager.get_config_help('gitea_url')
|
||||
|
||||
assert 'gitea_url' in help_text
|
||||
assert 'description' in help_text.lower() or ':' in help_text
|
||||
|
||||
def test_get_config_help_unknown_key(self):
|
||||
"""Test getting help for unknown configuration key."""
|
||||
config_manager = ConfigurationManager()
|
||||
help_text = config_manager.get_config_help('unknown_key')
|
||||
|
||||
assert 'unknown' in help_text.lower()
|
||||
|
||||
def test_get_config_help_general(self):
|
||||
"""Test getting general configuration help."""
|
||||
config_manager = ConfigurationManager()
|
||||
help_text = config_manager.get_config_help()
|
||||
|
||||
assert 'available' in help_text.lower() or 'configuration' in help_text.lower()
|
||||
assert 'gitea_url' in help_text
|
||||
assert 'repo_name' in help_text
|
||||
|
||||
def test_convert_value_booleans(self):
|
||||
"""Test value conversion for boolean types."""
|
||||
config_manager = ConfigurationManager()
|
||||
|
||||
# True values
|
||||
for true_val in ['true', 'True', 'TRUE', 'yes', 'YES', 'on', 'ON', '1']:
|
||||
assert config_manager._convert_value(true_val) is True
|
||||
|
||||
# False values
|
||||
for false_val in ['false', 'False', 'FALSE', 'no', 'NO', 'off', 'OFF', '0']:
|
||||
assert config_manager._convert_value(false_val) is False
|
||||
|
||||
def test_convert_value_numbers(self):
|
||||
"""Test value conversion for numeric types."""
|
||||
config_manager = ConfigurationManager()
|
||||
|
||||
# Integers
|
||||
assert config_manager._convert_value('42') == 42
|
||||
assert config_manager._convert_value('-10') == -10
|
||||
|
||||
# Floats
|
||||
assert config_manager._convert_value('3.14') == 3.14
|
||||
assert config_manager._convert_value('-2.5') == -2.5
|
||||
|
||||
# Strings that look like numbers but aren't
|
||||
assert config_manager._convert_value('not-a-number') == 'not-a-number'
|
||||
|
||||
def test_is_valid_url(self):
|
||||
"""Test URL validation."""
|
||||
config_manager = ConfigurationManager()
|
||||
|
||||
# Valid URLs
|
||||
valid_urls = [
|
||||
'http://localhost:3000',
|
||||
'https://example.com',
|
||||
'http://192.168.1.1:8080',
|
||||
'https://api.github.com/repos'
|
||||
]
|
||||
for url in valid_urls:
|
||||
assert config_manager._is_valid_url(url), f"Should be valid: {url}"
|
||||
|
||||
# Invalid URLs
|
||||
invalid_urls = [
|
||||
'not-a-url',
|
||||
'ftp://example.com',
|
||||
'localhost:3000',
|
||||
'http://',
|
||||
''
|
||||
]
|
||||
for url in invalid_urls:
|
||||
assert not config_manager._is_valid_url(url), f"Should be invalid: {url}"
|
||||
|
||||
def test_is_valid_path(self):
|
||||
"""Test path validation."""
|
||||
config_manager = ConfigurationManager()
|
||||
|
||||
# Valid paths
|
||||
valid_paths = [
|
||||
'/absolute/path',
|
||||
'./relative/path',
|
||||
'~/home/path',
|
||||
'simple-path',
|
||||
str(self.test_dir / 'subdir')
|
||||
]
|
||||
for path in valid_paths:
|
||||
assert config_manager._is_valid_path(path), f"Should be valid: {path}"
|
||||
|
||||
# Edge case: empty string (should be considered valid as it can represent current directory)
|
||||
assert config_manager._is_valid_path('')
|
||||
|
||||
|
||||
class TestConfigFileParsing:
|
||||
"""Test configuration file parsing and saving."""
|
||||
|
||||
def setup_method(self):
|
||||
"""Set up test environment."""
|
||||
self.temp_dir = tempfile.mkdtemp()
|
||||
self.test_dir = Path(self.temp_dir)
|
||||
self.original_cwd = os.getcwd()
|
||||
os.chdir(self.temp_dir)
|
||||
|
||||
def teardown_method(self):
|
||||
"""Clean up test environment."""
|
||||
os.chdir(self.original_cwd)
|
||||
shutil.rmtree(self.temp_dir)
|
||||
|
||||
def test_load_yaml_config_file(self):
|
||||
"""Test loading YAML configuration file."""
|
||||
config_content = {
|
||||
'gitea_url': 'http://localhost:3000',
|
||||
'repo_name': 'test-repo',
|
||||
'nested': {
|
||||
'key': 'value'
|
||||
}
|
||||
}
|
||||
|
||||
config_file = self.test_dir / 'test.yml'
|
||||
config_file.write_text(yaml.dump(config_content))
|
||||
|
||||
config_manager = ConfigurationManager()
|
||||
loaded = config_manager._load_config_file(config_file)
|
||||
|
||||
assert loaded == config_content
|
||||
|
||||
def test_load_json_config_file(self):
|
||||
"""Test loading JSON configuration file."""
|
||||
config_content = {
|
||||
'gitea_url': 'http://localhost:3000',
|
||||
'repo_name': 'test-repo',
|
||||
'debug': True
|
||||
}
|
||||
|
||||
config_file = self.test_dir / 'test.json'
|
||||
config_file.write_text(json.dumps(config_content))
|
||||
|
||||
config_manager = ConfigurationManager()
|
||||
loaded = config_manager._load_config_file(config_file)
|
||||
|
||||
assert loaded == config_content
|
||||
|
||||
def test_save_yaml_config_file(self):
|
||||
"""Test saving YAML configuration file."""
|
||||
config_content = {
|
||||
'gitea_url': 'http://localhost:3000',
|
||||
'repo_name': 'test-repo'
|
||||
}
|
||||
|
||||
config_file = self.test_dir / 'output.yml'
|
||||
config_manager = ConfigurationManager()
|
||||
config_manager._save_config_file(config_content, config_file)
|
||||
|
||||
assert config_file.exists()
|
||||
loaded = yaml.safe_load(config_file.read_text())
|
||||
assert loaded == config_content
|
||||
|
||||
def test_save_json_config_file(self):
|
||||
"""Test saving JSON configuration file."""
|
||||
config_content = {
|
||||
'gitea_url': 'http://localhost:3000',
|
||||
'repo_name': 'test-repo'
|
||||
}
|
||||
|
||||
config_file = self.test_dir / 'output.json'
|
||||
config_manager = ConfigurationManager()
|
||||
config_manager._save_config_file(config_content, config_file)
|
||||
|
||||
assert config_file.exists()
|
||||
loaded = json.loads(config_file.read_text())
|
||||
assert loaded == config_content
|
||||
|
||||
def test_load_invalid_config_file(self):
|
||||
"""Test loading invalid configuration file."""
|
||||
config_file = self.test_dir / 'invalid.yml'
|
||||
config_file.write_text('invalid: yaml: content: [')
|
||||
|
||||
config_manager = ConfigurationManager()
|
||||
with pytest.raises(ValueError, match="Failed to load config file"):
|
||||
config_manager._load_config_file(config_file)
|
||||
|
||||
def test_get_target_config_file_existing(self):
|
||||
"""Test getting target config file when one exists."""
|
||||
# Create an existing config file
|
||||
existing_file = self.test_dir / '.markitect.yml'
|
||||
existing_file.write_text('test: value')
|
||||
|
||||
config_manager = ConfigurationManager()
|
||||
target = config_manager._get_target_config_file()
|
||||
|
||||
# Should be relative path that matches the existing file
|
||||
assert target.name == existing_file.name
|
||||
assert target.exists()
|
||||
|
||||
def test_get_target_config_file_default(self):
|
||||
"""Test getting target config file when none exists."""
|
||||
config_manager = ConfigurationManager()
|
||||
target = config_manager._get_target_config_file()
|
||||
|
||||
# Should be the default config file name
|
||||
assert target.name == '.markitect.yml'
|
||||
|
||||
def test_get_target_config_file_custom(self):
|
||||
"""Test getting custom target config file."""
|
||||
custom_file = 'custom-config.yml'
|
||||
|
||||
config_manager = ConfigurationManager()
|
||||
target = config_manager._get_target_config_file(custom_file)
|
||||
|
||||
assert target == Path(custom_file)
|
||||
|
||||
|
||||
class TestCLIIntegration:
|
||||
"""Test CLI command integration."""
|
||||
|
||||
def setup_method(self):
|
||||
"""Set up test environment."""
|
||||
self.temp_dir = tempfile.mkdtemp()
|
||||
self.test_dir = Path(self.temp_dir)
|
||||
self.original_cwd = os.getcwd()
|
||||
os.chdir(self.temp_dir)
|
||||
self.runner = CliRunner()
|
||||
|
||||
def teardown_method(self):
|
||||
"""Clean up test environment."""
|
||||
os.chdir(self.original_cwd)
|
||||
shutil.rmtree(self.temp_dir)
|
||||
|
||||
def test_config_show_command(self):
|
||||
"""Test config-show CLI command."""
|
||||
result = self.runner.invoke(cli, ['config-show'])
|
||||
|
||||
assert result.exit_code == 0
|
||||
assert 'gitea_url' in result.output
|
||||
|
||||
def test_config_show_command_json_format(self):
|
||||
"""Test config-show with JSON format."""
|
||||
result = self.runner.invoke(cli, ['config-show', '--format', 'json'])
|
||||
|
||||
assert result.exit_code == 0
|
||||
# Output should be valid JSON
|
||||
try:
|
||||
json.loads(result.output)
|
||||
except json.JSONDecodeError:
|
||||
pytest.fail("Output is not valid JSON")
|
||||
|
||||
def test_config_show_command_simple_format(self):
|
||||
"""Test config-show with simple format."""
|
||||
result = self.runner.invoke(cli, ['config-show', '--format', 'simple'])
|
||||
|
||||
assert result.exit_code == 0
|
||||
assert '=' in result.output # Should contain key=value pairs
|
||||
|
||||
def test_config_set_command(self):
|
||||
"""Test config-set CLI command."""
|
||||
result = self.runner.invoke(cli, [
|
||||
'config-set', 'repo_name', 'test-repository'
|
||||
])
|
||||
|
||||
assert result.exit_code == 0
|
||||
assert 'Configuration updated' in result.output
|
||||
|
||||
# Verify file was created
|
||||
config_file = self.test_dir / '.markitect.yml'
|
||||
assert config_file.exists()
|
||||
|
||||
content = yaml.safe_load(config_file.read_text())
|
||||
assert content['repo_name'] == 'test-repository'
|
||||
|
||||
def test_config_set_command_nested_key(self):
|
||||
"""Test config-set with nested key."""
|
||||
result = self.runner.invoke(cli, [
|
||||
'config-set', 'gitea.url', 'http://example.com'
|
||||
])
|
||||
|
||||
assert result.exit_code == 0
|
||||
|
||||
config_file = self.test_dir / '.markitect.yml'
|
||||
content = yaml.safe_load(config_file.read_text())
|
||||
assert content['gitea']['url'] == 'http://example.com'
|
||||
|
||||
def test_config_set_command_custom_file(self):
|
||||
"""Test config-set with custom config file."""
|
||||
custom_file = 'custom.yml'
|
||||
result = self.runner.invoke(cli, [
|
||||
'config-set', 'repo_name', 'test',
|
||||
'--config-file', custom_file
|
||||
])
|
||||
|
||||
assert result.exit_code == 0
|
||||
assert Path(custom_file).exists()
|
||||
|
||||
def test_config_set_command_no_validate(self):
|
||||
"""Test config-set with validation disabled."""
|
||||
result = self.runner.invoke(cli, [
|
||||
'config-set', 'repo_name', 'test',
|
||||
'--no-validate'
|
||||
])
|
||||
|
||||
assert result.exit_code == 0
|
||||
|
||||
def test_config_set_command_validation_error(self):
|
||||
"""Test config-set with validation error."""
|
||||
result = self.runner.invoke(cli, [
|
||||
'config-set', 'gitea_url', 'invalid-url'
|
||||
])
|
||||
|
||||
assert result.exit_code == 1
|
||||
assert 'Configuration error' in result.output
|
||||
|
||||
def test_config_init_command_non_interactive(self):
|
||||
"""Test config-init CLI command in non-interactive mode."""
|
||||
result = self.runner.invoke(cli, [
|
||||
'config-init', '--no-interactive'
|
||||
])
|
||||
|
||||
assert result.exit_code == 0
|
||||
assert 'initialized successfully' in result.output
|
||||
|
||||
# Verify config file created
|
||||
config_file = self.test_dir / '.markitect.yml'
|
||||
assert config_file.exists()
|
||||
|
||||
# Verify directories created
|
||||
assert (self.test_dir / '.markitect_workspace').exists()
|
||||
assert (self.test_dir / '.ast_cache').exists()
|
||||
assert (self.test_dir / 'tests').exists()
|
||||
|
||||
def test_config_init_command_custom_directory(self):
|
||||
"""Test config-init with custom project directory."""
|
||||
project_dir = self.test_dir / 'my-project'
|
||||
|
||||
result = self.runner.invoke(cli, [
|
||||
'config-init',
|
||||
'--project-dir', str(project_dir),
|
||||
'--no-interactive'
|
||||
])
|
||||
|
||||
assert result.exit_code == 0
|
||||
|
||||
# Verify in correct location
|
||||
config_file = project_dir / '.markitect.yml'
|
||||
assert config_file.exists()
|
||||
|
||||
def test_config_init_command_existing_file_no_force(self):
|
||||
"""Test config-init with existing file without force."""
|
||||
# Create existing config file
|
||||
config_file = self.test_dir / '.markitect.yml'
|
||||
config_file.write_text('existing: config')
|
||||
|
||||
result = self.runner.invoke(cli, [
|
||||
'config-init', '--no-interactive'
|
||||
])
|
||||
|
||||
assert result.exit_code == 1
|
||||
assert 'already exists' in result.output
|
||||
|
||||
def test_config_init_command_existing_file_with_force(self):
|
||||
"""Test config-init with existing file with force."""
|
||||
# Create existing config file
|
||||
config_file = self.test_dir / '.markitect.yml'
|
||||
config_file.write_text('existing: config')
|
||||
|
||||
result = self.runner.invoke(cli, [
|
||||
'config-init', '--no-interactive', '--force'
|
||||
])
|
||||
|
||||
assert result.exit_code == 0
|
||||
assert 'initialized successfully' in result.output
|
||||
|
||||
def test_config_validate_command(self):
|
||||
"""Test config-validate CLI command."""
|
||||
result = self.runner.invoke(cli, ['config-validate'])
|
||||
|
||||
assert result.exit_code in [0, 1] # May have warnings/errors
|
||||
assert 'Configuration Validation Summary' in result.output
|
||||
|
||||
def test_config_validate_command_verbose(self):
|
||||
"""Test config-validate with verbose output."""
|
||||
result = self.runner.invoke(cli, ['config-validate', '--verbose'])
|
||||
|
||||
assert result.exit_code in [0, 1]
|
||||
assert 'Configuration Validation Summary' in result.output
|
||||
|
||||
def test_config_help_command_general(self):
|
||||
"""Test config-help CLI command."""
|
||||
result = self.runner.invoke(cli, ['config-help'])
|
||||
|
||||
assert result.exit_code == 0
|
||||
assert 'available' in result.output.lower() or 'configuration' in result.output.lower()
|
||||
assert 'gitea_url' in result.output
|
||||
|
||||
def test_config_help_command_specific_key(self):
|
||||
"""Test config-help for specific key."""
|
||||
result = self.runner.invoke(cli, ['config-help', 'gitea_url'])
|
||||
|
||||
assert result.exit_code == 0
|
||||
assert 'gitea_url' in result.output
|
||||
|
||||
def test_config_help_command_unknown_key(self):
|
||||
"""Test config-help for unknown key."""
|
||||
result = self.runner.invoke(cli, ['config-help', 'unknown_key'])
|
||||
|
||||
assert result.exit_code == 0
|
||||
assert 'unknown' in result.output.lower()
|
||||
|
||||
|
||||
class TestEnvironmentVariables:
|
||||
"""Test environment variable handling."""
|
||||
|
||||
def setup_method(self):
|
||||
"""Set up test environment."""
|
||||
self.temp_dir = tempfile.mkdtemp()
|
||||
self.test_dir = Path(self.temp_dir)
|
||||
self.original_cwd = os.getcwd()
|
||||
os.chdir(self.temp_dir)
|
||||
|
||||
# Store original environment
|
||||
self.original_env = dict(os.environ)
|
||||
|
||||
def teardown_method(self):
|
||||
"""Clean up test environment."""
|
||||
os.chdir(self.original_cwd)
|
||||
shutil.rmtree(self.temp_dir)
|
||||
|
||||
# Restore original environment
|
||||
os.environ.clear()
|
||||
os.environ.update(self.original_env)
|
||||
|
||||
def test_get_relevant_env_vars(self):
|
||||
"""Test getting MARKITECT-related environment variables."""
|
||||
# Set some test environment variables
|
||||
os.environ['MARKITECT_GITEA_URL'] = 'http://test.com'
|
||||
os.environ['MARKITECT_REPO_NAME'] = 'test-repo'
|
||||
os.environ['OTHER_VAR'] = 'should-be-ignored'
|
||||
|
||||
config_manager = ConfigurationManager()
|
||||
env_vars = config_manager._get_relevant_env_vars()
|
||||
|
||||
assert 'MARKITECT_GITEA_URL' in env_vars
|
||||
assert 'MARKITECT_REPO_NAME' in env_vars
|
||||
assert 'OTHER_VAR' not in env_vars
|
||||
assert env_vars['MARKITECT_GITEA_URL'] == 'http://test.com'
|
||||
|
||||
def test_config_with_env_vars(self):
|
||||
"""Test configuration loading with environment variables."""
|
||||
# Set environment variables
|
||||
os.environ['MARKITECT_GITEA_URL'] = 'http://env-test.com'
|
||||
os.environ['MARKITECT_REPO_NAME'] = 'env-repo'
|
||||
|
||||
config_manager = ConfigurationManager()
|
||||
config = config_manager.get_current_config()
|
||||
|
||||
# Environment variables should be reflected in config
|
||||
assert config['gitea_url'] == 'http://env-test.com'
|
||||
assert config['repo_name'] == 'env-repo'
|
||||
|
||||
# Should have env vars in metadata
|
||||
assert 'MARKITECT_GITEA_URL' in config['_meta']['env_variables']
|
||||
assert 'MARKITECT_REPO_NAME' in config['_meta']['env_variables']
|
||||
|
||||
|
||||
class TestEdgeCases:
|
||||
"""Test edge cases and error conditions."""
|
||||
|
||||
def setup_method(self):
|
||||
"""Set up test environment."""
|
||||
self.temp_dir = tempfile.mkdtemp()
|
||||
self.test_dir = Path(self.temp_dir)
|
||||
self.original_cwd = os.getcwd()
|
||||
os.chdir(self.temp_dir)
|
||||
|
||||
def teardown_method(self):
|
||||
"""Clean up test environment."""
|
||||
os.chdir(self.original_cwd)
|
||||
shutil.rmtree(self.temp_dir)
|
||||
|
||||
def test_config_manager_with_permission_error(self):
|
||||
"""Test handling permission errors."""
|
||||
# Create a directory we can't write to
|
||||
restricted_dir = self.test_dir / 'restricted'
|
||||
restricted_dir.mkdir(mode=0o444) # Read-only
|
||||
|
||||
config_manager = ConfigurationManager()
|
||||
|
||||
# This should not crash, but may warn about permission issues
|
||||
try:
|
||||
config = config_manager.get_current_config()
|
||||
assert isinstance(config, dict)
|
||||
except PermissionError:
|
||||
# Acceptable to fail with permission error
|
||||
pass
|
||||
|
||||
def test_set_config_value_invalid_file_path(self):
|
||||
"""Test setting config value with invalid file path."""
|
||||
config_manager = ConfigurationManager()
|
||||
|
||||
# Try to write to a path that doesn't exist and can't be created
|
||||
# Use a more reliable invalid path that doesn't depend on system permissions
|
||||
invalid_path = '/nonexistent_directory_12345/config.yml'
|
||||
with pytest.raises(ValueError):
|
||||
config_manager.set_config_value('test', 'value', invalid_path)
|
||||
|
||||
def test_validate_configuration_with_none(self):
|
||||
"""Test configuration validation with None input."""
|
||||
config_manager = ConfigurationManager()
|
||||
|
||||
# Should not crash with None input
|
||||
results = config_manager.validate_configuration(None)
|
||||
assert isinstance(results, list)
|
||||
|
||||
def test_mask_sensitive_data_with_complex_structure(self):
|
||||
"""Test masking sensitive data in complex nested structure."""
|
||||
config_manager = ConfigurationManager()
|
||||
|
||||
complex_config = {
|
||||
'database': {
|
||||
'password': 'secret123',
|
||||
'host': 'localhost'
|
||||
},
|
||||
'apis': [
|
||||
{'name': 'github', 'token': 'ghp_secret'},
|
||||
{'name': 'gitea', 'url': 'http://localhost:3000'}
|
||||
],
|
||||
'secrets': {
|
||||
'nested': {
|
||||
'api_key': 'very_secret'
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
masked = config_manager._mask_sensitive_data(complex_config)
|
||||
|
||||
# Sensitive fields should be masked at any level
|
||||
assert masked['database']['password'] == '***MASKED***'
|
||||
assert masked['apis'][0]['token'] == '***MASKED***'
|
||||
|
||||
# The 'secrets' key itself gets masked because it's a sensitive keyword
|
||||
# This is actually correct behavior
|
||||
assert masked['secrets'] == '***MASKED***'
|
||||
|
||||
# Non-sensitive fields should remain
|
||||
assert masked['database']['host'] == 'localhost'
|
||||
assert masked['apis'][1]['url'] == 'http://localhost:3000'
|
||||
|
||||
def test_get_config_sources_empty_directory(self):
|
||||
"""Test getting config sources in empty directory."""
|
||||
config_manager = ConfigurationManager()
|
||||
sources = config_manager._get_config_sources()
|
||||
|
||||
# Should always include defaults
|
||||
assert len(sources) > 0
|
||||
assert any('defaults' in source.lower() for source in sources)
|
||||
|
||||
def test_initialize_project_config_existing_directories(self):
|
||||
"""Test project initialization with existing directories."""
|
||||
# Pre-create some directories
|
||||
(self.test_dir / '.markitect_workspace').mkdir()
|
||||
(self.test_dir / 'tests').mkdir()
|
||||
|
||||
config_manager = ConfigurationManager()
|
||||
result = config_manager.initialize_project_config(self.test_dir, interactive=False)
|
||||
|
||||
# Should succeed even with existing directories
|
||||
assert 'config_file' in result
|
||||
assert Path(result['config_file']).exists()
|
||||
Reference in New Issue
Block a user