4 Commits

Author SHA1 Message Date
f6c285b774 feat: implement configuration and environment management CLI (issue #18)
Some checks failed
Test Suite / code-quality (push) Has been cancelled
Test Suite / security-scan (push) Has been cancelled
Test Suite / test-summary (push) Has been cancelled
Test Suite / unit-tests (3.11) (push) Has been cancelled
Test Suite / unit-tests (3.12) (push) Has been cancelled
Test Suite / integration-tests (push) Has been cancelled
Test Suite / e2e-tests (push) Has been cancelled
Test Suite / performance-tests (push) Has been cancelled
Complete implementation of configuration management capabilities for MarkiTect CLI:

New CLI Commands:
- markitect config-show: Display current configuration with multiple output formats
- markitect config-set: Set configuration values with validation and persistence
- markitect config-init: Initialize configuration for new project with interactive setup
- markitect config-validate: Validate current configuration and show issues
- markitect config-help: Get help information for configuration keys

Core Features:
- Comprehensive configuration management with multiple sources (files, env vars, defaults)
- Support for YAML, JSON, and simple output formats
- Sensitive data masking for secure configuration display
- Interactive project initialization with intelligent defaults
- Configuration validation with path creation and URL validation
- Environment variable integration with MARKITECT_ prefix
- Nested configuration support with dot notation (e.g., gitea.url)
- Type conversion for boolean, numeric, and string values
- Project-specific configuration files (.markitect.yml/yaml/json)

Technical Implementation:
- ConfigurationManager class with robust error handling
- Integration with existing configuration system
- File-based configuration with automatic format detection
- Configuration validation and help system
- Support for custom configuration file locations
- Graceful fallback when advanced config system unavailable

Configuration Features:
- Multiple file format support (YAML, JSON)
- Environment variable precedence
- Sensitive data protection
- Directory structure validation and creation
- URL and path validation
- Interactive and non-interactive modes

Testing:
- 58 comprehensive tests covering all functionality
- CLI integration tests with isolated environments
- Edge cases: permissions, invalid paths, complex structures
- Configuration file parsing and saving tests
- Environment variable handling tests
- Validation and error handling scenarios

All acceptance criteria fulfilled:
 Configuration display and management
 Project initialization functionality
 Configuration validation
 Integration with existing config system
 Comprehensive test coverage

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 10:53:44 +02:00
0982e771e4 feat: implement batch processing and recursive operations (issue #17)
Complete implementation of batch processing capabilities for MarkiTect CLI:

New CLI Commands:
- markitect ingest-dir: Process all markdown files in directory with recursive support
- markitect batch-process: Process files matching glob patterns
- markitect recursive: Recursive processing with depth control

Core Features:
- Sophisticated batch processing engine with progress tracking
- Multiple error handling strategies (stop, continue, skip)
- Recursive directory traversal with configurable depth limits
- Glob pattern matching for flexible file selection
- Progress feedback with detailed processing statistics
- Integration with existing database and caching systems

Technical Implementation:
- BatchProcessor class with modular architecture
- ProgressTracker for real-time user feedback
- Comprehensive error handling and edge case management
- Support for multiple operations (ingest, status, validate)
- Depth-controlled recursive search with proper boundary handling
- Permission error resilience and graceful degradation

Testing:
- 29 comprehensive tests covering all functionality
- Edge cases: empty directories, hidden files, permission errors
- CLI integration tests with mocked database operations
- Depth logic validation and boundary condition testing
- Error handling scenarios and recovery mechanisms

All acceptance criteria fulfilled:
 Directory and recursive processing
 Glob pattern support for file selection
 Progress tracking and user feedback
 Error handling with continuation options
 Comprehensive test coverage

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 10:45:43 +02:00
a4805812f3 refactor: enhance draft generator documentation and code quality
Applied TDD8 refactoring improvements to draft generator module:

- Added comprehensive module docstring with usage examples
- Moved import statements to module level for better organization
- Enhanced filename sanitization with dedicated method
- Decomposed content replacement logic into focused methods
- Added role-specific replacement strategies
- Improved code maintainability and readability

These changes improve code quality while maintaining all existing
functionality and test compatibility.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 10:35:16 +02:00
77db9f6231 refactor: organize chaos test runner into tools directory
Move chaos_test_runner.py to tools/ directory for better project organization
and update all Makefile targets to reference the new location. This improves
the project structure by keeping specialized tools separate from main code.

Changes:
- Move chaos_test_runner.py to tools/chaos_test_runner.py
- Update Makefile chaos-* targets to use tools/ path
- Maintain all existing functionality and CLI interface

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-03 10:25:33 +02:00
8 changed files with 2995 additions and 19 deletions

View File

@@ -254,11 +254,11 @@ release-dry-run:
# Chaos Engineering targets
chaos-validate:
@echo "🔥 Running architectural independence validation..."
$(VENV_PYTHON) chaos_test_runner.py validate-independence
$(VENV_PYTHON) tools/chaos_test_runner.py validate-independence
chaos-matrix:
@echo "🏗️ Showing architectural dependency matrix..."
$(VENV_PYTHON) chaos_test_runner.py dependency-matrix
$(VENV_PYTHON) tools/chaos_test_runner.py dependency-matrix
chaos-inject:
@echo "💥 Injecting chaos into layer..."
@@ -266,11 +266,11 @@ chaos-inject:
echo "❌ Usage: make chaos-inject LAYER=L1_Presentation TYPE=import_failure"; \
exit 1; \
fi
$(VENV_PYTHON) chaos_test_runner.py inject-layer-failure --layer $(LAYER) $(if $(TYPE),--injection-type $(TYPE))
$(VENV_PYTHON) tools/chaos_test_runner.py inject-layer-failure --layer $(LAYER) $(if $(TYPE),--injection-type $(TYPE))
chaos-report:
@echo "📄 Generating chaos engineering report..."
$(VENV_PYTHON) chaos_test_runner.py chaos-report
$(VENV_PYTHON) tools/chaos_test_runner.py chaos-report
# Code linting
lint: $(VENV)/bin/activate

View File

@@ -0,0 +1,379 @@
"""
Batch Processing and Recursive Operations - Issue #17
This module provides batch processing capabilities for MarkiTect, allowing
users to process multiple files and directories recursively through CLI.
Features:
- Directory processing with recursive support
- Glob pattern matching for file selection
- Progress tracking with user feedback
- Error handling with continuation options
- Depth control for recursive operations
Commands implemented:
- ingest-dir: Process all Markdown files in directory
- batch-process: Process files matching glob pattern
- recursive operations with depth control
"""
import os
import glob
import fnmatch
from pathlib import Path
from typing import List, Optional, Dict, Any, Iterator, Callable
from dataclasses import dataclass
from enum import Enum
import click
class ProcessingMode(Enum):
"""Modes for batch processing operations."""
INGEST = "ingest"
STATUS = "status"
VALIDATE = "validate"
GENERATE = "generate"
class ErrorHandling(Enum):
"""Error handling strategies for batch operations."""
STOP = "stop" # Stop on first error
CONTINUE = "continue" # Continue processing, collect errors
SKIP = "skip" # Skip failed files, no error collection
@dataclass
class ProcessingResult:
"""Result of processing a single file."""
file_path: Path
success: bool
message: str
error: Optional[str] = None
processing_time: Optional[float] = None
@dataclass
class BatchResult:
"""Result of a batch processing operation."""
total_files: int
processed: int
succeeded: int
failed: int
skipped: int
errors: List[ProcessingResult]
processing_time: float
class ProgressTracker:
"""Progress tracking for batch operations."""
def __init__(self, total: int, show_progress: bool = True):
self.total = total
self.processed = 0
self.succeeded = 0
self.failed = 0
self.skipped = 0
self.show_progress = show_progress
def update(self, result: ProcessingResult):
"""Update progress with a processing result."""
self.processed += 1
if result.success:
self.succeeded += 1
else:
self.failed += 1
if self.show_progress:
self._display_progress(result)
def skip_file(self, file_path: Path, reason: str):
"""Mark a file as skipped."""
self.skipped += 1
if self.show_progress:
click.echo(f"⚠️ Skipped {file_path}: {reason}")
def _display_progress(self, result: ProcessingResult):
"""Display progress information."""
status = "" if result.success else ""
percentage = (self.processed / self.total) * 100
click.echo(f"{status} [{self.processed}/{self.total}] ({percentage:.1f}%) {result.file_path}")
if not result.success and result.error:
click.echo(f" Error: {result.error}")
class BatchProcessor:
"""Core batch processing engine."""
def __init__(self,
error_handling: ErrorHandling = ErrorHandling.CONTINUE,
show_progress: bool = True,
max_depth: Optional[int] = None):
self.error_handling = error_handling
self.show_progress = show_progress
self.max_depth = max_depth
def find_markdown_files(self,
directory: Path,
pattern: str = "*.md",
recursive: bool = False,
depth: Optional[int] = None) -> List[Path]:
"""
Find markdown files in directory with pattern matching.
Args:
directory: Directory to search
pattern: Glob pattern for file matching
recursive: Whether to search recursively
depth: Maximum depth for recursive search
Returns:
List of matching file paths
"""
files = []
if not directory.exists():
raise FileNotFoundError(f"Directory not found: {directory}")
if not directory.is_dir():
raise NotADirectoryError(f"Path is not a directory: {directory}")
if recursive:
effective_depth = depth if depth is not None else self.max_depth
files.extend(self._find_recursive(directory, pattern, effective_depth))
else:
# Non-recursive: only current directory
files.extend(self._find_in_directory(directory, pattern))
return sorted(files)
def _find_recursive(self, directory: Path, pattern: str, max_depth: Optional[int]) -> List[Path]:
"""Find files recursively with depth control."""
files = []
def _search(current_dir: Path, current_depth: int):
# Add files from current directory (if within depth limit)
if max_depth is None or current_depth <= max_depth:
files.extend(self._find_in_directory(current_dir, pattern))
# Recurse into subdirectories (if we haven't reached depth limit)
if max_depth is None or current_depth < max_depth:
try:
for item in current_dir.iterdir():
if item.is_dir() and not item.name.startswith('.'):
_search(item, current_depth + 1)
except PermissionError:
# Skip directories we can't access
if self.show_progress:
click.echo(f"⚠️ Permission denied: {current_dir}")
_search(directory, 0)
return files
def _find_in_directory(self, directory: Path, pattern: str) -> List[Path]:
"""Find files matching pattern in a specific directory."""
files = []
try:
for item in directory.iterdir():
if item.is_file() and fnmatch.fnmatch(item.name, pattern):
files.append(item)
except PermissionError:
if self.show_progress:
click.echo(f"⚠️ Permission denied: {directory}")
return files
def find_files_by_glob(self, glob_pattern: str) -> List[Path]:
"""
Find files using glob patterns.
Args:
glob_pattern: Glob pattern (e.g., "**/*.md", "docs/*.markdown")
Returns:
List of matching file paths
"""
matches = glob.glob(glob_pattern, recursive=True)
return [Path(match) for match in matches if Path(match).is_file()]
def process_files(self,
files: List[Path],
processor_func: Callable[[Path], ProcessingResult],
operation_name: str = "Processing") -> BatchResult:
"""
Process a list of files with progress tracking and error handling.
Args:
files: List of files to process
processor_func: Function to process each file
operation_name: Name of the operation for progress display
Returns:
BatchResult with processing statistics
"""
import time
start_time = time.time()
if self.show_progress:
click.echo(f"🚀 {operation_name} {len(files)} files...")
tracker = ProgressTracker(len(files), self.show_progress)
errors = []
for file_path in files:
try:
# Check if file still exists (might have been deleted during processing)
if not file_path.exists():
tracker.skip_file(file_path, "File no longer exists")
continue
# Process the file
result = processor_func(file_path)
tracker.update(result)
if not result.success:
errors.append(result)
# Handle errors based on strategy
if self.error_handling == ErrorHandling.STOP:
break
except Exception as e:
# Handle unexpected errors
error_result = ProcessingResult(
file_path=file_path,
success=False,
message=f"Unexpected error: {str(e)}",
error=str(e)
)
tracker.update(error_result)
errors.append(error_result)
if self.error_handling == ErrorHandling.STOP:
break
processing_time = time.time() - start_time
result = BatchResult(
total_files=len(files),
processed=tracker.processed,
succeeded=tracker.succeeded,
failed=tracker.failed,
skipped=tracker.skipped,
errors=errors,
processing_time=processing_time
)
if self.show_progress:
self._display_summary(result, operation_name)
return result
def _display_summary(self, result: BatchResult, operation_name: str):
"""Display batch processing summary."""
click.echo(f"\n📊 {operation_name} Summary:")
click.echo(f" Total files: {result.total_files}")
click.echo(f" Processed: {result.processed}")
click.echo(f" Succeeded: {result.succeeded}")
click.echo(f" Failed: {result.failed}")
click.echo(f" Skipped: {result.skipped}")
click.echo(f" Processing time: {result.processing_time:.2f}s")
if result.failed > 0:
click.echo(f"\n{result.failed} files failed:")
for error in result.errors[:10]: # Show first 10 errors
click.echo(f"{error.file_path}: {error.message}")
if len(result.errors) > 10:
click.echo(f" ... and {len(result.errors) - 10} more errors")
def create_file_processor(config: Dict[str, Any],
operation: ProcessingMode) -> Callable[[Path], ProcessingResult]:
"""
Create a file processor function for the specified operation.
Args:
config: Configuration dictionary
operation: Type of processing operation
Returns:
Function that processes a single file and returns ProcessingResult
"""
import time
def process_file(file_path: Path) -> ProcessingResult:
"""Process a single file based on the operation type."""
start_time = time.time()
try:
if operation == ProcessingMode.INGEST:
# Ingest file into database
from .database import DatabaseManager
db_manager = DatabaseManager(config.get('database'))
# Read file content
content = file_path.read_text(encoding='utf-8')
# Store in database
db_manager.store_document(str(file_path), content)
processing_time = time.time() - start_time
return ProcessingResult(
file_path=file_path,
success=True,
message="Ingested successfully",
processing_time=processing_time
)
elif operation == ProcessingMode.STATUS:
# Check file status
from .database import DatabaseManager
db_manager = DatabaseManager(config.get('database'))
try:
metadata = db_manager.get_metadata(str(file_path))
message = f"Found in database (ID: {metadata.get('id', 'Unknown')})"
except:
message = "Not found in database"
processing_time = time.time() - start_time
return ProcessingResult(
file_path=file_path,
success=True,
message=message,
processing_time=processing_time
)
elif operation == ProcessingMode.VALIDATE:
# Validate file format/content
content = file_path.read_text(encoding='utf-8')
# Basic validation - check if it's valid markdown
if not content.strip():
raise ValueError("File is empty")
processing_time = time.time() - start_time
return ProcessingResult(
file_path=file_path,
success=True,
message="Valid markdown file",
processing_time=processing_time
)
else:
raise ValueError(f"Unsupported operation: {operation}")
except Exception as e:
processing_time = time.time() - start_time
return ProcessingResult(
file_path=file_path,
success=False,
message=f"Failed: {str(e)}",
error=str(e),
processing_time=processing_time
)
return process_file

View File

@@ -28,6 +28,8 @@ import builtins
from .database import DatabaseManager
from .legacy_compat import LegacyMode, emit_deprecation_warning, legacy_switch_option
from .__version__ import get_version_info, get_release_info
from .batch_processor import BatchProcessor, ProcessingMode, ErrorHandling, create_file_processor
from .config_manager import ConfigurationManager
# Import legacy system components for advanced management
try:
@@ -4549,6 +4551,495 @@ def perf_history(config, limit, trend_days, output_format, output):
sys.exit(1)
# Batch Processing Commands - Issue #17
@cli.command(name='ingest-dir')
@click.argument('directory', type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path))
@click.option('--recursive', '-r', is_flag=True, help='Process directories recursively')
@click.option('--depth', type=int, help='Maximum depth for recursive processing')
@click.option('--pattern', default='*.md', help='File pattern to match (default: *.md)')
@click.option('--error-handling', type=click.Choice(['stop', 'continue', 'skip']),
default='continue', help='Error handling strategy')
@click.option('--quiet', '-q', is_flag=True, help='Suppress progress output')
@pass_config
def ingest_dir(config, directory, recursive, depth, pattern, error_handling, quiet):
"""Process all Markdown files in directory.
Ingests all markdown files found in the specified directory into the database.
Supports recursive processing with depth control and flexible error handling.
Examples:
markitect ingest-dir ./docs
markitect ingest-dir ./content --recursive --depth 3
markitect ingest-dir ./articles --pattern "*.markdown" --error-handling stop
"""
try:
# Convert error handling string to enum
error_strategy = ErrorHandling[error_handling.upper()]
# Initialize batch processor
processor = BatchProcessor(
error_handling=error_strategy,
show_progress=not quiet,
max_depth=depth
)
# Find files to process
if not quiet:
click.echo(f"🔍 Searching for files in {directory}...")
files = processor.find_markdown_files(
directory=directory,
pattern=pattern,
recursive=recursive,
depth=depth
)
if not files:
click.echo(f"📭 No files found matching pattern '{pattern}' in {directory}")
return
# Create file processor for ingestion
file_processor = create_file_processor(config, ProcessingMode.INGEST)
# Process files
result = processor.process_files(files, file_processor, "Ingesting")
# Exit with error code if there were failures
if result.failed > 0 and error_strategy == ErrorHandling.STOP:
sys.exit(1)
except Exception as e:
click.echo(f"Directory ingestion failed: {e}", err=True)
if config.get('verbose'):
import traceback
click.echo(traceback.format_exc(), err=True)
sys.exit(1)
@cli.command(name='batch-process')
@click.argument('pattern', type=str)
@click.option('--operation', type=click.Choice(['ingest', 'status', 'validate']),
default='ingest', help='Operation to perform on matched files')
@click.option('--error-handling', type=click.Choice(['stop', 'continue', 'skip']),
default='continue', help='Error handling strategy')
@click.option('--quiet', '-q', is_flag=True, help='Suppress progress output')
@pass_config
def batch_process(config, pattern, operation, error_handling, quiet):
"""Process files matching glob pattern.
Uses glob patterns to find and process files. Supports various operations
including ingestion, status checking, and validation.
Examples:
markitect batch-process "**/*.md" --operation ingest
markitect batch-process "docs/**/*.markdown" --operation status
markitect batch-process "./content/*.md" --operation validate --error-handling stop
"""
try:
# Convert strings to enums
error_strategy = ErrorHandling[error_handling.upper()]
processing_mode = ProcessingMode[operation.upper()]
# Initialize batch processor
processor = BatchProcessor(
error_handling=error_strategy,
show_progress=not quiet
)
# Find files using glob pattern
if not quiet:
click.echo(f"🔍 Searching for files matching '{pattern}'...")
files = processor.find_files_by_glob(pattern)
if not files:
click.echo(f"📭 No files found matching pattern '{pattern}'")
return
# Create file processor for the specified operation
file_processor = create_file_processor(config, processing_mode)
# Process files
operation_name = f"{operation.title()}ing"
result = processor.process_files(files, file_processor, operation_name)
# Exit with error code if there were failures
if result.failed > 0 and error_strategy == ErrorHandling.STOP:
sys.exit(1)
except Exception as e:
click.echo(f"Batch processing failed: {e}", err=True)
if config.get('verbose'):
import traceback
click.echo(traceback.format_exc(), err=True)
sys.exit(1)
@cli.command(name='recursive')
@click.argument('directory', type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path))
@click.option('--depth', type=int, default=None, help='Maximum recursion depth')
@click.option('--operation', type=click.Choice(['ingest', 'status', 'validate']),
default='status', help='Operation to perform')
@click.option('--pattern', default='*.md', help='File pattern to match (default: *.md)')
@click.option('--error-handling', type=click.Choice(['stop', 'continue', 'skip']),
default='continue', help='Error handling strategy')
@click.option('--quiet', '-q', is_flag=True, help='Suppress progress output')
@pass_config
def recursive(config, directory, depth, operation, pattern, error_handling, quiet):
"""Recursive processing with depth control.
Performs recursive operations on directory trees with configurable depth limits.
This command provides fine-grained control over recursive processing behavior.
Examples:
markitect recursive ./docs --depth 2 --operation ingest
markitect recursive ./content --depth 5 --operation status --pattern "*.markdown"
markitect recursive ./src --operation validate --error-handling stop
"""
try:
# Convert strings to enums
error_strategy = ErrorHandling[error_handling.upper()]
processing_mode = ProcessingMode[operation.upper()]
# Initialize batch processor with depth control
processor = BatchProcessor(
error_handling=error_strategy,
show_progress=not quiet,
max_depth=depth
)
# Find files recursively
if not quiet:
depth_str = f" (max depth: {depth})" if depth else ""
click.echo(f"🔍 Recursively searching {directory}{depth_str}...")
files = processor.find_markdown_files(
directory=directory,
pattern=pattern,
recursive=True,
depth=depth
)
if not files:
click.echo(f"📭 No files found matching pattern '{pattern}' in {directory}")
return
# Create file processor for the specified operation
file_processor = create_file_processor(config, processing_mode)
# Process files
operation_name = f"Recursively {operation}ing"
result = processor.process_files(files, file_processor, operation_name)
# Exit with error code if there were failures
if result.failed > 0 and error_strategy == ErrorHandling.STOP:
sys.exit(1)
except Exception as e:
click.echo(f"Recursive processing failed: {e}", err=True)
if config.get('verbose'):
import traceback
click.echo(traceback.format_exc(), err=True)
sys.exit(1)
# Configuration Management Commands - Issue #18
@cli.command(name='config-show')
@click.option('--format', 'output_format', type=click.Choice(['yaml', 'json', 'simple']),
default='yaml', help='Output format for configuration display')
@click.option('--show-sensitive', is_flag=True, help='Show sensitive values (tokens, passwords)')
@pass_config
def config_show(config, output_format, show_sensitive):
"""Display current configuration.
Shows comprehensive configuration information including current settings,
file sources, environment variables, and workspace information.
Examples:
markitect config-show
markitect config-show --format json
markitect config-show --format simple --show-sensitive
"""
try:
config_manager = ConfigurationManager()
output = config_manager.display_config(
show_sensitive=show_sensitive,
output_format=output_format
)
click.echo(output)
except Exception as e:
click.echo(f"Failed to display configuration: {e}", err=True)
if config.get('verbose'):
import traceback
click.echo(traceback.format_exc(), err=True)
sys.exit(1)
@cli.command(name='config-set')
@click.argument('key', type=str)
@click.argument('value', type=str)
@click.option('--config-file', type=click.Path(), help='Target configuration file')
@click.option('--validate/--no-validate', default=True, help='Validate configuration after setting')
@pass_config
def config_set(config, key, value, config_file, validate):
"""Set configuration values.
Sets a configuration value and persists it to a configuration file.
Supports nested keys using dot notation (e.g., 'gitea.url').
Examples:
markitect config-set gitea_url http://localhost:3000
markitect config-set repo_owner myorganization
markitect config-set api_token abc123def456
markitect config-set workspace.dir ./my_workspace
"""
try:
config_manager = ConfigurationManager()
# Set the configuration value
success = config_manager.set_config_value(key, value, config_file)
if success:
click.echo(f"✅ Configuration updated: {key} = {value}")
# Show which file was updated
target_file = config_manager._get_target_config_file(config_file)
click.echo(f"📁 Updated file: {target_file}")
# Validate configuration if requested
if validate:
validation_results = config_manager.validate_configuration()
errors = [r for r in validation_results if r['status'] == 'error']
if errors:
click.echo("⚠️ Configuration validation warnings:")
for error in errors:
click.echo(f"{error['key']}: {error['message']}")
else:
click.echo(f"❌ Failed to set configuration: {key}", err=True)
sys.exit(1)
except ValueError as e:
click.echo(f"❌ Configuration error: {e}", err=True)
sys.exit(1)
except Exception as e:
click.echo(f"❌ Failed to set configuration: {e}", err=True)
if config.get('verbose'):
import traceback
click.echo(traceback.format_exc(), err=True)
sys.exit(1)
@cli.command(name='config-init')
@click.option('--project-dir', type=click.Path(path_type=Path), help='Target project directory')
@click.option('--interactive/--no-interactive', default=True, help='Interactive configuration setup')
@click.option('--force', is_flag=True, help='Overwrite existing configuration')
@pass_config
def config_init(config, project_dir, interactive, force):
"""Initialize configuration for new project.
Creates a new configuration file with sensible defaults and sets up
the necessary directory structure for a MarkiTect project.
Examples:
markitect config-init
markitect config-init --project-dir ./my-project
markitect config-init --no-interactive --force
"""
try:
target_dir = project_dir or Path.cwd()
config_file = target_dir / '.markitect.yml'
# Check if configuration already exists
if config_file.exists() and not force:
click.echo(f"❌ Configuration file already exists: {config_file}")
click.echo(" Use --force to overwrite or choose a different directory")
sys.exit(1)
config_manager = ConfigurationManager()
# Interactive setup if requested
initial_config = {
'gitea_url': 'http://localhost:3000',
'repo_owner': '',
'repo_name': target_dir.name,
'workspace_dir': '.markitect_workspace',
'cache_dir': '.ast_cache',
'tests_dir': 'tests',
'test_file_pattern': 'test_issue_{issue_num}_{scenario}.py',
'claude_code_command': 'claude'
}
if interactive:
click.echo("🔧 Interactive MarkiTect configuration setup")
click.echo(f"📁 Target directory: {target_dir}")
click.echo()
# Prompt for each configuration value
initial_config['gitea_url'] = click.prompt(
'Gitea server URL',
default=initial_config['gitea_url']
)
initial_config['repo_owner'] = click.prompt(
'Repository owner/organization',
default=initial_config['repo_owner']
)
initial_config['repo_name'] = click.prompt(
'Repository name',
default=initial_config['repo_name']
)
if click.confirm('Configure API token now?', default=False):
initial_config['api_token'] = click.prompt(
'API token',
default='',
hide_input=True
)
initial_config['workspace_dir'] = click.prompt(
'Workspace directory',
default=initial_config['workspace_dir']
)
initial_config['tests_dir'] = click.prompt(
'Tests directory',
default=initial_config['tests_dir']
)
# Initialize the project
result = config_manager.initialize_project_config(target_dir, interactive=False)
# Update with interactive values if provided
if interactive:
config_manager._save_config_file(initial_config, config_file)
result['config'] = initial_config
click.echo("✅ MarkiTect project initialized successfully!")
click.echo(f"📄 Configuration file: {result['config_file']}")
click.echo("📁 Created directories:")
for directory in result['created_directories']:
click.echo(f"{directory}")
# Show validation results
validation_results = config_manager.validate_configuration(result['config'])
warnings = [r for r in validation_results if r['status'] == 'warning']
errors = [r for r in validation_results if r['status'] == 'error']
if warnings:
click.echo("⚠️ Configuration warnings:")
for warning in warnings:
click.echo(f"{warning['message']}")
if errors:
click.echo("❌ Configuration errors:")
for error in errors:
click.echo(f"{error['message']}")
else:
click.echo("🎉 Configuration validation passed!")
except Exception as e:
click.echo(f"❌ Failed to initialize configuration: {e}", err=True)
if config.get('verbose'):
import traceback
click.echo(traceback.format_exc(), err=True)
sys.exit(1)
@cli.command(name='config-validate')
@click.option('--verbose', '-v', is_flag=True, help='Show detailed validation information')
@pass_config
def config_validate(config, verbose):
"""Validate current configuration.
Checks the current configuration for common issues, missing required fields,
and validates paths and URLs. Provides suggestions for fixing any problems.
Examples:
markitect config-validate
markitect config-validate --verbose
"""
try:
config_manager = ConfigurationManager()
validation_results = config_manager.validate_configuration()
# Categorize results
errors = [r for r in validation_results if r['status'] == 'error']
warnings = [r for r in validation_results if r['status'] == 'warning']
ok_results = [r for r in validation_results if r['status'] == 'ok']
# Display summary
click.echo(f"📊 Configuration Validation Summary:")
click.echo(f" ✅ OK: {len(ok_results)}")
click.echo(f" ⚠️ Warnings: {len(warnings)}")
click.echo(f" ❌ Errors: {len(errors)}")
click.echo()
# Show errors
if errors:
click.echo("❌ Configuration Errors:")
for error in errors:
click.echo(f"{error['key']}: {error['message']}")
click.echo()
# Show warnings
if warnings:
click.echo("⚠️ Configuration Warnings:")
for warning in warnings:
click.echo(f"{warning['key']}: {warning['message']}")
click.echo()
# Show OK results in verbose mode
if verbose and ok_results:
click.echo("✅ Valid Configuration:")
for ok_result in ok_results:
click.echo(f"{ok_result['key']}: {ok_result['message']}")
click.echo()
# Overall status
if errors:
click.echo("❌ Configuration validation failed")
sys.exit(1)
elif warnings:
click.echo("⚠️ Configuration validation passed with warnings")
else:
click.echo("✅ Configuration validation passed")
except Exception as e:
click.echo(f"❌ Configuration validation failed: {e}", err=True)
if config.get('verbose'):
import traceback
click.echo(traceback.format_exc(), err=True)
sys.exit(1)
@cli.command(name='config-help')
@click.argument('key', required=False)
@pass_config
def config_help(config, key):
"""Get help information for configuration keys.
Provides detailed information about available configuration options,
their purposes, and example values.
Examples:
markitect config-help
markitect config-help gitea_url
markitect config-help api_token
"""
try:
config_manager = ConfigurationManager()
help_text = config_manager.get_config_help(key)
click.echo(help_text)
except Exception as e:
click.echo(f"❌ Failed to get configuration help: {e}", err=True)
sys.exit(1)
# Register issue management commands
cli.add_command(issues_group)

490
markitect/config_manager.py Normal file
View File

@@ -0,0 +1,490 @@
"""
Configuration and Environment Management - Issue #18
This module provides comprehensive configuration management capabilities for MarkiTect,
allowing users to manage configuration and environment settings through CLI commands.
Features:
- Display current configuration with optional sensitive data masking
- Set configuration values with validation
- Initialize configuration for new projects
- Configuration validation and help
- Integration with existing configuration system
- Environment variable management
- Project-specific configuration support
Commands implemented:
- config-show: Display current configuration
- config-set: Set configuration values
- config-init: Initialize configuration for new project
"""
import os
import json
import yaml
from pathlib import Path
from typing import Dict, Any, Optional, List, Union, Tuple
from dataclasses import asdict, fields
from copy import deepcopy
import click
class ConfigurationManager:
"""Core configuration management functionality."""
def __init__(self):
self.config_file_names = [
'.markitect.yml',
'.markitect.yaml',
'.markitect.json',
'markitect.config.yml',
'markitect.config.yaml',
'markitect.config.json'
]
self.sensitive_keys = {
'api_token', 'token', 'password', 'secret', 'key',
'gitea_token', 'github_token', 'auth_token'
}
def get_current_config(self) -> Dict[str, Any]:
"""Get current configuration from all sources."""
config = {}
# Load from existing configuration system
try:
from config.manager import MarkitectConfig
markitect_config = MarkitectConfig()
config = asdict(markitect_config)
# Convert Path objects to strings for JSON serialization
for key, value in config.items():
if isinstance(value, Path):
config[key] = str(value)
except (ImportError, Exception):
# Fallback to basic configuration
config = {
'gitea_url': os.getenv('MARKITECT_GITEA_URL', 'http://localhost:3000'),
'repo_owner': os.getenv('MARKITECT_REPO_OWNER', ''),
'repo_name': os.getenv('MARKITECT_REPO_NAME', ''),
'api_token': os.getenv('MARKITECT_API_TOKEN', ''),
'workspace_dir': os.getenv('MARKITECT_WORKSPACE_DIR', '.markitect_workspace'),
'database_path': os.getenv('MARKITECT_DATABASE_PATH', str(Path.home() / '.markitect' / 'markitect.db')),
'cache_dir': os.getenv('MARKITECT_CACHE_DIR', '.ast_cache'),
'tests_dir': os.getenv('MARKITECT_TESTS_DIR', 'tests'),
'test_file_pattern': os.getenv('MARKITECT_TEST_FILE_PATTERN', 'test_issue_{issue_num}_{scenario}.py'),
'claude_code_command': os.getenv('MARKITECT_CLAUDE_CODE_COMMAND', 'claude')
}
# Add metadata about configuration sources
config['_meta'] = {
'config_sources': self._get_config_sources(),
'env_variables': self._get_relevant_env_vars(),
'working_directory': str(Path.cwd()),
'config_file_locations': self._get_config_file_locations()
}
return config
def _get_config_sources(self) -> List[str]:
"""Get list of configuration sources in order of precedence."""
sources = []
# Check for config files
for config_name in self.config_file_names:
if Path(config_name).exists():
sources.append(f"File: {config_name}")
# Check for environment variables
env_vars = [key for key in os.environ.keys() if key.startswith('MARKITECT_')]
if env_vars:
sources.append(f"Environment variables: {len(env_vars)} found")
# Always include defaults
sources.append("Built-in defaults")
return sources
def _get_relevant_env_vars(self) -> Dict[str, str]:
"""Get MARKITECT-related environment variables."""
env_vars = {}
for key, value in os.environ.items():
if key.startswith('MARKITECT_'):
env_vars[key] = value
return env_vars
def _get_config_file_locations(self) -> Dict[str, bool]:
"""Get status of potential config file locations."""
locations = {}
# Current directory
for config_name in self.config_file_names:
current_dir_path = Path.cwd() / config_name
locations[str(current_dir_path)] = current_dir_path.exists()
# Home directory
home_config = Path.home() / '.markitect' / 'config.yml'
locations[str(home_config)] = home_config.exists()
return locations
def display_config(self, show_sensitive: bool = False, output_format: str = 'yaml') -> str:
"""Display current configuration in specified format."""
config = self.get_current_config()
if not show_sensitive:
config = self._mask_sensitive_data(config)
if output_format == 'json':
return json.dumps(config, indent=2, default=str)
elif output_format == 'yaml':
return yaml.dump(config, default_flow_style=False, sort_keys=True)
else:
# Simple format
return self._format_simple(config)
def _mask_sensitive_data(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""Mask sensitive configuration values."""
masked_config = deepcopy(config)
def mask_recursive(obj):
if isinstance(obj, dict):
for key, value in obj.items():
if any(sensitive in key.lower() for sensitive in self.sensitive_keys):
if value and str(value).strip():
obj[key] = '***MASKED***'
else:
mask_recursive(value)
elif isinstance(obj, list):
for item in obj:
mask_recursive(item)
mask_recursive(masked_config)
return masked_config
def _format_simple(self, config: Dict[str, Any]) -> str:
"""Format configuration in simple key=value style."""
lines = []
def format_recursive(obj, prefix=''):
if isinstance(obj, dict):
for key, value in sorted(obj.items()):
full_key = f"{prefix}{key}" if prefix else key
if isinstance(value, (dict, list)):
if key != '_meta': # Skip meta in simple format
format_recursive(value, f"{full_key}.")
else:
lines.append(f"{full_key} = {value}")
elif isinstance(obj, list):
for i, item in enumerate(obj):
format_recursive(item, f"{prefix}{i}.")
format_recursive(config)
return '\n'.join(lines)
def set_config_value(self, key: str, value: str, config_file: Optional[str] = None) -> bool:
"""Set a configuration value and persist it."""
# Determine target config file
target_file = self._get_target_config_file(config_file)
# Load existing config from file
existing_config = self._load_config_file(target_file) if target_file.exists() else {}
# Set the value (support nested keys with dot notation)
self._set_nested_value(existing_config, key, value)
# Validate the new configuration
validation_errors = self._validate_config_value(key, value)
if validation_errors:
raise ValueError(f"Configuration validation failed: {'; '.join(validation_errors)}")
# Save back to file
self._save_config_file(existing_config, target_file)
return True
def _get_target_config_file(self, config_file: Optional[str] = None) -> Path:
"""Determine target configuration file."""
if config_file:
return Path(config_file)
# Look for existing config file
for config_name in self.config_file_names:
config_path = Path(config_name)
if config_path.exists():
return config_path
# Default to .markitect.yml in current directory
return Path('.markitect.yml')
def _load_config_file(self, config_file: Path) -> Dict[str, Any]:
"""Load configuration from file."""
try:
content = config_file.read_text()
if config_file.suffix in ['.yml', '.yaml']:
return yaml.safe_load(content) or {}
elif config_file.suffix == '.json':
return json.loads(content)
else:
# Try YAML first, then JSON
try:
return yaml.safe_load(content) or {}
except yaml.YAMLError:
return json.loads(content)
except Exception as e:
raise ValueError(f"Failed to load config file {config_file}: {e}")
def _save_config_file(self, config: Dict[str, Any], config_file: Path) -> None:
"""Save configuration to file."""
try:
# Ensure directory exists
config_file.parent.mkdir(parents=True, exist_ok=True)
if config_file.suffix in ['.yml', '.yaml']:
content = yaml.dump(config, default_flow_style=False, sort_keys=True)
elif config_file.suffix == '.json':
content = json.dumps(config, indent=2, sort_keys=True)
else:
# Default to YAML
content = yaml.dump(config, default_flow_style=False, sort_keys=True)
config_file.write_text(content)
except Exception as e:
raise ValueError(f"Failed to save config file {config_file}: {e}")
def _set_nested_value(self, config: Dict[str, Any], key: str, value: str) -> None:
"""Set a nested configuration value using dot notation."""
keys = key.split('.')
current = config
# Navigate to the parent of the target key
for k in keys[:-1]:
if k not in current:
current[k] = {}
current = current[k]
# Set the final value
final_key = keys[-1]
# Try to convert value to appropriate type
converted_value = self._convert_value(value)
current[final_key] = converted_value
def _convert_value(self, value: str) -> Any:
"""Convert string value to appropriate type."""
# Handle boolean values
if value.lower() in ('true', 'yes', 'on', '1'):
return True
elif value.lower() in ('false', 'no', 'off', '0'):
return False
# Try to convert to number
try:
if '.' in value:
return float(value)
else:
return int(value)
except ValueError:
pass
# Return as string
return value
def _validate_config_value(self, key: str, value: str) -> List[str]:
"""Validate a configuration value."""
errors = []
# Path validation
if any(path_key in key.lower() for path_key in ['dir', 'path', 'file']):
if value and not self._is_valid_path(value):
errors.append(f"'{value}' is not a valid path format")
# URL validation
if 'url' in key.lower():
if value and not self._is_valid_url(value):
errors.append(f"'{value}' is not a valid URL format")
# Required field validation for setting values (not for display)
# We only enforce required fields when explicitly setting them
if key == 'gitea_url' and not value.strip():
errors.append(f"'{key}' is required and cannot be empty")
elif key == 'database_path' and not value.strip():
errors.append(f"'{key}' is required and cannot be empty")
return errors
def _is_valid_path(self, path_str: str) -> bool:
"""Check if a string represents a valid path."""
try:
Path(path_str)
return True
except (ValueError, OSError):
return False
def _is_valid_url(self, url_str: str) -> bool:
"""Check if a string represents a valid URL."""
import re
url_pattern = re.compile(
r'^https?://' # http:// or https://
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' # domain...
r'localhost|' # localhost...
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip
r'(?::\d+)?' # optional port
r'(?:/?|[/?]\S+)$', re.IGNORECASE)
return url_pattern.match(url_str) is not None
def initialize_project_config(self, project_dir: Optional[Path] = None, interactive: bool = True) -> Dict[str, Any]:
"""Initialize configuration for a new project."""
target_dir = project_dir or Path.cwd()
config_file = target_dir / '.markitect.yml'
# Default configuration template
default_config = {
'gitea_url': 'http://localhost:3000',
'repo_owner': '',
'repo_name': target_dir.name,
'workspace_dir': '.markitect_workspace',
'cache_dir': '.ast_cache',
'tests_dir': 'tests',
'test_file_pattern': 'test_issue_{issue_num}_{scenario}.py',
'claude_code_command': 'claude'
}
if interactive:
default_config = self._interactive_config_setup(default_config)
# Create necessary directories
workspace_dir = target_dir / default_config['workspace_dir']
cache_dir = target_dir / default_config['cache_dir']
tests_dir = target_dir / default_config['tests_dir']
for directory in [workspace_dir, cache_dir, tests_dir]:
directory.mkdir(parents=True, exist_ok=True)
# Save configuration
self._save_config_file(default_config, config_file)
return {
'config_file': str(config_file),
'config': default_config,
'created_directories': [str(workspace_dir), str(cache_dir), str(tests_dir)]
}
def _interactive_config_setup(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""Interactive configuration setup (would be used with CLI)."""
# This method would be called by CLI with click.prompt()
# For now, return the default config
# The CLI integration will handle the interactive prompts
return config
def list_config_keys(self) -> List[Tuple[str, str, Any]]:
"""List all available configuration keys with descriptions."""
config_schema = [
('gitea_url', 'Gitea server URL', 'http://localhost:3000'),
('repo_owner', 'Repository owner/organization', ''),
('repo_name', 'Repository name', ''),
('api_token', 'API token for authentication', ''),
('workspace_dir', 'Workspace directory path', '.markitect_workspace'),
('database_path', 'Database file path', '~/.markitect/markitect.db'),
('cache_dir', 'AST cache directory', '.ast_cache'),
('tests_dir', 'Tests directory', 'tests'),
('test_file_pattern', 'Test file naming pattern', 'test_issue_{issue_num}_{scenario}.py'),
('claude_code_command', 'Claude Code command', 'claude'),
]
return config_schema
def get_config_help(self, key: Optional[str] = None) -> str:
"""Get help information for configuration."""
if key:
# Get help for specific key
for config_key, description, default in self.list_config_keys():
if config_key == key:
return f"{config_key}: {description} (default: {default})"
return f"Unknown configuration key: {key}"
else:
# Get general help
lines = ["Available configuration keys:"]
for config_key, description, default in self.list_config_keys():
lines.append(f" {config_key:<20} {description}")
return '\n'.join(lines)
def validate_configuration(self, config: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
"""Validate current or provided configuration."""
if config is None:
config = self.get_current_config()
validation_results = []
# Check required fields
required_fields = ['gitea_url', 'database_path']
for field in required_fields:
if field not in config or not config[field]:
validation_results.append({
'key': field,
'status': 'error',
'message': f'Required field {field} is missing or empty'
})
else:
validation_results.append({
'key': field,
'status': 'ok',
'message': f'{field} is configured'
})
# Check paths exist or are creatable
path_fields = ['workspace_dir', 'cache_dir', 'tests_dir']
for field in path_fields:
if field in config:
path = Path(config[field])
if path.exists():
validation_results.append({
'key': field,
'status': 'ok',
'message': f'{field} exists at {path}'
})
else:
try:
path.mkdir(parents=True, exist_ok=True)
validation_results.append({
'key': field,
'status': 'warning',
'message': f'{field} created at {path}'
})
except OSError as e:
validation_results.append({
'key': field,
'status': 'error',
'message': f'{field} cannot be created: {e}'
})
# Check database path parent directory
if 'database_path' in config:
db_path = Path(config['database_path'])
db_parent = db_path.parent
if not db_parent.exists():
try:
db_parent.mkdir(parents=True, exist_ok=True)
validation_results.append({
'key': 'database_path',
'status': 'warning',
'message': f'Database directory created at {db_parent}'
})
except OSError as e:
validation_results.append({
'key': 'database_path',
'status': 'error',
'message': f'Cannot create database directory: {e}'
})
else:
validation_results.append({
'key': 'database_path',
'status': 'ok',
'message': f'Database directory exists at {db_parent}'
})
return validation_results

View File

@@ -3,11 +3,44 @@ Data-driven Draft Generator for Issue #56: Generate multiple drafts from data so
This module provides functionality to create multiple markdown documents from JSON schemas
and data sources (JSON, CSV) with field mapping support.
Examples:
Basic usage with JSON data:
>>> generator = DraftGenerator()
>>> schema = {...} # JSON schema with field mappings
>>> data = [{"name": "John", "role": "Developer"}]
>>> files = generator.generate_drafts_from_data_source(
... schema, data, Path("./output")
... )
Using with CSV file:
>>> files = generator.generate_drafts_from_data_source(
... schema, Path("data.csv"), Path("./output")
... )
Field mapping is configured in the schema using x-markitect-field-mapping extension:
{
"properties": {
"headings": {
"properties": {
"level_1": {
"x-markitect-field-mapping": {"const": "name"}
}
}
}
}
}
Architecture:
The DraftGenerator extends the existing StubGenerator to add data-driven
capabilities. It processes data sources, validates compatibility with schemas,
and generates multiple document drafts with populated content.
"""
import json
import csv
import io
import copy
from pathlib import Path
from typing import Dict, Any, List, Optional, Union
from .stub_generator import StubGenerator
@@ -163,7 +196,6 @@ class DraftGenerator:
def _apply_field_mapping(self, schema: Dict[str, Any], record: Dict[str, Any]) -> Dict[str, Any]:
"""Apply field mapping to populate schema content areas with data."""
# Create a deep copy of the schema
import copy
populated_schema = copy.deepcopy(schema)
# Apply title mapping if exists
@@ -175,15 +207,26 @@ class DraftGenerator:
def _generate_filename(self, record: Dict[str, Any], index: int) -> str:
"""Generate appropriate filename for the draft."""
# Try to use common identifying fields
for field in ['name', 'title', 'id']:
identifier_fields = ['name', 'title', 'id']
for field in identifier_fields:
if field in record and record[field]:
# Sanitize filename
name = str(record[field]).replace(' ', '_').replace('/', '_')
name = self._sanitize_filename(str(record[field]))
return f"{name}.md"
# Fall back to index-based naming
return f"draft_{index + 1:03d}.md"
def _sanitize_filename(self, filename: str) -> str:
"""Sanitize a string to be safe for use as a filename."""
# Replace problematic characters with underscores
unsafe_chars = [' ', '/', '\\', ':', '*', '?', '"', '<', '>', '|']
sanitized = filename
for char in unsafe_chars:
sanitized = sanitized.replace(char, '_')
return sanitized
def _generate_draft_content(self, schema: Dict[str, Any], record: Dict[str, Any], schema_file_path: Optional[str] = None) -> str:
"""Generate the actual draft content from populated schema."""
# Use the existing stub generator as the base
@@ -194,20 +237,43 @@ class DraftGenerator:
)
# Add data-driven enhancements - replace placeholders with actual data
content = self._apply_data_replacements(content, record)
return content
def _apply_data_replacements(self, content: str, record: Dict[str, Any]) -> str:
"""Apply data replacements to content using various replacement strategies."""
for field_name, field_value in record.items():
# Simple replacement strategy for testing
placeholder_pattern = f"TODO: Add content for {field_name}"
if placeholder_pattern in content:
content = content.replace(placeholder_pattern, str(field_value))
content = self._apply_field_replacements(content, field_name, str(field_value))
# Replace template variables in content instructions (e.g., {role} -> Software Engineer)
template_pattern = f"{{{field_name}}}"
if template_pattern in content:
content = content.replace(template_pattern, str(field_value))
return content
# Also try to replace role-specific content
if field_name == 'role':
content = content.replace("TODO: Add content for introduction section.", f"Role: {field_value}")
content = content.replace("TODO: Add content for section_level_2 section.", f"Department information and role details for {field_value}")
def _apply_field_replacements(self, content: str, field_name: str, field_value: str) -> str:
"""Apply all replacement patterns for a specific field."""
# Simple placeholder replacement
placeholder_pattern = f"TODO: Add content for {field_name}"
if placeholder_pattern in content:
content = content.replace(placeholder_pattern, field_value)
# Template variable replacement (e.g., {role} -> Software Engineer)
template_pattern = f"{{{field_name}}}"
if template_pattern in content:
content = content.replace(template_pattern, field_value)
# Role-specific content replacement (can be extended for other field types)
if field_name == 'role':
content = self._apply_role_specific_replacements(content, field_value)
return content
def _apply_role_specific_replacements(self, content: str, role_value: str) -> str:
"""Apply role-specific content replacements."""
replacements = {
"TODO: Add content for introduction section.": f"Role: {role_value}",
"TODO: Add content for section_level_2 section.": f"Department information and role details for {role_value}"
}
for old_text, new_text in replacements.items():
content = content.replace(old_text, new_text)
return content

View File

@@ -0,0 +1,653 @@
"""
Tests for Issue #17: Batch Processing and Recursive Operations
This test suite verifies the batch processing functionality including:
- Directory processing with recursive support
- Glob pattern matching for file selection
- Progress tracking and error handling
- Depth control for recursive operations
"""
import pytest
import tempfile
import shutil
from pathlib import Path
from unittest.mock import Mock, patch, MagicMock
from click.testing import CliRunner
from markitect.batch_processor import (
BatchProcessor, ProcessingMode, ErrorHandling,
ProcessingResult, BatchResult, ProgressTracker,
create_file_processor
)
from markitect.cli import cli
class TestBatchProcessor:
"""Test the core BatchProcessor functionality."""
def setup_method(self):
"""Set up test environment."""
self.temp_dir = tempfile.mkdtemp()
self.test_dir = Path(self.temp_dir)
def teardown_method(self):
"""Clean up test environment."""
shutil.rmtree(self.temp_dir)
def create_test_files(self, structure):
"""Create test file structure from dict."""
for path, content in structure.items():
file_path = self.test_dir / path
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_text(content)
def test_find_markdown_files_non_recursive(self):
"""Test finding markdown files without recursion."""
# Create test structure
self.create_test_files({
'file1.md': '# Test 1',
'file2.md': '# Test 2',
'file3.txt': 'Not markdown',
'subdir/file4.md': '# Test 4'
})
processor = BatchProcessor()
files = processor.find_markdown_files(self.test_dir, recursive=False)
# Should find only files in root directory
assert len(files) == 2
file_names = [f.name for f in files]
assert 'file1.md' in file_names
assert 'file2.md' in file_names
assert 'file4.md' not in file_names
def test_find_markdown_files_recursive(self):
"""Test finding markdown files with recursion."""
# Create test structure
self.create_test_files({
'file1.md': '# Test 1',
'subdir/file2.md': '# Test 2',
'subdir/nested/file3.md': '# Test 3',
'subdir/file4.txt': 'Not markdown'
})
processor = BatchProcessor()
files = processor.find_markdown_files(self.test_dir, recursive=True)
# Should find all markdown files
assert len(files) == 3
file_names = [f.name for f in files]
assert 'file1.md' in file_names
assert 'file2.md' in file_names
assert 'file3.md' in file_names
def test_find_markdown_files_with_depth_limit(self):
"""Test recursive search with depth limit."""
# Create test structure
self.create_test_files({
'file1.md': '# Test 1',
'level1/file2.md': '# Test 2',
'level1/level2/file3.md': '# Test 3',
'level1/level2/level3/file4.md': '# Test 4'
})
processor = BatchProcessor()
files = processor.find_markdown_files(self.test_dir, recursive=True, depth=1)
# Should find files up to depth 1
assert len(files) == 2
file_names = [f.name for f in files]
assert 'file1.md' in file_names
assert 'file2.md' in file_names
assert 'file3.md' not in file_names
assert 'file4.md' not in file_names
def test_find_markdown_files_with_pattern(self):
"""Test finding files with custom pattern."""
# Create test structure
self.create_test_files({
'file1.md': '# Test 1',
'file2.markdown': '# Test 2',
'file3.txt': 'Not markdown'
})
processor = BatchProcessor()
files = processor.find_markdown_files(self.test_dir, pattern='*.markdown')
# Should find only .markdown files
assert len(files) == 1
assert files[0].name == 'file2.markdown'
def test_find_files_by_glob(self):
"""Test glob pattern file finding."""
# Create test structure
self.create_test_files({
'docs/file1.md': '# Test 1',
'docs/subdir/file2.md': '# Test 2',
'src/file3.md': '# Test 3',
'file4.txt': 'Not markdown'
})
processor = BatchProcessor()
# Test recursive glob
files = processor.find_files_by_glob(str(self.test_dir / "**/*.md"))
assert len(files) == 3
# Test specific directory glob
files = processor.find_files_by_glob(str(self.test_dir / "docs/*.md"))
assert len(files) == 1
assert files[0].name == 'file1.md'
def test_process_files_success(self):
"""Test successful file processing."""
# Create test files
self.create_test_files({
'file1.md': '# Test 1',
'file2.md': '# Test 2'
})
processor = BatchProcessor(show_progress=False)
files = list(self.test_dir.glob('*.md'))
def mock_processor(file_path):
return ProcessingResult(
file_path=file_path,
success=True,
message="Processed successfully"
)
result = processor.process_files(files, mock_processor, "Testing")
assert result.total_files == 2
assert result.processed == 2
assert result.succeeded == 2
assert result.failed == 0
assert result.skipped == 0
def test_process_files_with_errors(self):
"""Test file processing with errors."""
# Create test files
self.create_test_files({
'file1.md': '# Test 1',
'file2.md': '# Test 2',
'file3.md': '# Test 3'
})
processor = BatchProcessor(show_progress=False, error_handling=ErrorHandling.CONTINUE)
files = list(self.test_dir.glob('*.md'))
def mock_processor(file_path):
# Fail on file2.md
if file_path.name == 'file2.md':
return ProcessingResult(
file_path=file_path,
success=False,
message="Processing failed",
error="Mock error"
)
return ProcessingResult(
file_path=file_path,
success=True,
message="Processed successfully"
)
result = processor.process_files(files, mock_processor, "Testing")
assert result.total_files == 3
assert result.processed == 3
assert result.succeeded == 2
assert result.failed == 1
assert len(result.errors) == 1
def test_process_files_stop_on_error(self):
"""Test stop-on-error behavior."""
# Create test files
self.create_test_files({
'file1.md': '# Test 1',
'file2.md': '# Test 2',
'file3.md': '# Test 3'
})
processor = BatchProcessor(show_progress=False, error_handling=ErrorHandling.STOP)
files = sorted(list(self.test_dir.glob('*.md')))
def mock_processor(file_path):
# Fail on second file
if file_path.name == 'file2.md':
return ProcessingResult(
file_path=file_path,
success=False,
message="Processing failed",
error="Mock error"
)
return ProcessingResult(
file_path=file_path,
success=True,
message="Processed successfully"
)
result = processor.process_files(files, mock_processor, "Testing")
# Should stop after the error
assert result.processed == 2 # file1 success, file2 error
assert result.succeeded == 1
assert result.failed == 1
class TestProgressTracker:
"""Test the ProgressTracker functionality."""
def test_progress_tracking(self):
"""Test basic progress tracking."""
tracker = ProgressTracker(total=3, show_progress=False)
# Test successful processing
result1 = ProcessingResult(Path("file1.md"), True, "Success")
tracker.update(result1)
assert tracker.processed == 1
assert tracker.succeeded == 1
assert tracker.failed == 0
# Test failed processing
result2 = ProcessingResult(Path("file2.md"), False, "Failed", "Error message")
tracker.update(result2)
assert tracker.processed == 2
assert tracker.succeeded == 1
assert tracker.failed == 1
# Test skipped file
tracker.skip_file(Path("file3.md"), "Skipped reason")
assert tracker.skipped == 1
class TestFileProcessor:
"""Test the file processor creation and execution."""
def setup_method(self):
"""Set up test environment."""
self.temp_dir = tempfile.mkdtemp()
self.test_dir = Path(self.temp_dir)
def teardown_method(self):
"""Clean up test environment."""
shutil.rmtree(self.temp_dir)
@patch('markitect.database.DatabaseManager')
def test_ingest_processor(self, mock_db_manager):
"""Test file processor for ingestion."""
# Create test file
test_file = self.test_dir / "test.md"
test_file.write_text("# Test content")
# Mock database manager
mock_db = Mock()
mock_db_manager.return_value = mock_db
config = {'database': 'test.db'}
processor = create_file_processor(config, ProcessingMode.INGEST)
result = processor(test_file)
assert result.success
assert result.file_path == test_file
assert "Ingested successfully" in result.message
mock_db.store_document.assert_called_once()
@patch('markitect.database.DatabaseManager')
def test_status_processor(self, mock_db_manager):
"""Test file processor for status checking."""
# Create test file
test_file = self.test_dir / "test.md"
test_file.write_text("# Test content")
# Mock database manager
mock_db = Mock()
mock_db.get_metadata.return_value = {'id': 'test123'}
mock_db_manager.return_value = mock_db
config = {'database': 'test.db'}
processor = create_file_processor(config, ProcessingMode.STATUS)
result = processor(test_file)
assert result.success
assert result.file_path == test_file
assert "Found in database" in result.message
def test_validate_processor(self):
"""Test file processor for validation."""
# Create test file
test_file = self.test_dir / "test.md"
test_file.write_text("# Test content")
config = {}
processor = create_file_processor(config, ProcessingMode.VALIDATE)
result = processor(test_file)
assert result.success
assert result.file_path == test_file
assert "Valid markdown" in result.message
def test_validate_processor_empty_file(self):
"""Test validation processor with empty file."""
# Create empty file
test_file = self.test_dir / "empty.md"
test_file.write_text("")
config = {}
processor = create_file_processor(config, ProcessingMode.VALIDATE)
result = processor(test_file)
assert not result.success
assert "File is empty" in result.error
class TestCLIIntegration:
"""Test CLI command integration."""
def setup_method(self):
"""Set up test environment."""
self.temp_dir = tempfile.mkdtemp()
self.test_dir = Path(self.temp_dir)
self.runner = CliRunner()
def teardown_method(self):
"""Clean up test environment."""
shutil.rmtree(self.temp_dir)
def create_test_files(self, structure):
"""Create test file structure from dict."""
for path, content in structure.items():
file_path = self.test_dir / path
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_text(content)
@patch('markitect.database.DatabaseManager')
def test_ingest_dir_command(self, mock_db_manager):
"""Test ingest-dir CLI command."""
# Create test files
self.create_test_files({
'file1.md': '# Test 1',
'file2.md': '# Test 2',
'subdir/file3.md': '# Test 3'
})
# Mock database
mock_db = Mock()
mock_db_manager.return_value = mock_db
result = self.runner.invoke(cli, [
'ingest-dir', str(self.test_dir),
'--quiet'
])
assert result.exit_code == 0
# Should process 2 files (non-recursive by default)
assert mock_db.store_document.call_count == 2
@patch('markitect.database.DatabaseManager')
def test_ingest_dir_recursive(self, mock_db_manager):
"""Test ingest-dir with recursive option."""
# Create test files
self.create_test_files({
'file1.md': '# Test 1',
'subdir/file2.md': '# Test 2',
'subdir/nested/file3.md': '# Test 3'
})
# Mock database
mock_db = Mock()
mock_db_manager.return_value = mock_db
result = self.runner.invoke(cli, [
'ingest-dir', str(self.test_dir),
'--recursive',
'--quiet'
])
assert result.exit_code == 0
# Should process all 3 files
assert mock_db.store_document.call_count == 3
@patch('markitect.database.DatabaseManager')
def test_batch_process_command(self, mock_db_manager):
"""Test batch-process CLI command."""
# Create test files
self.create_test_files({
'docs/file1.md': '# Test 1',
'docs/file2.md': '# Test 2',
'src/file3.md': '# Test 3'
})
# Mock database
mock_db = Mock()
mock_db_manager.return_value = mock_db
# Test glob pattern
pattern = str(self.test_dir / "docs/*.md")
result = self.runner.invoke(cli, [
'batch-process', pattern,
'--operation', 'ingest',
'--quiet'
])
assert result.exit_code == 0
# Should process 2 files from docs directory
assert mock_db.store_document.call_count == 2
@patch('markitect.database.DatabaseManager')
def test_recursive_command(self, mock_db_manager):
"""Test recursive CLI command."""
# Create test files
self.create_test_files({
'level1/file1.md': '# Test 1',
'level1/level2/file2.md': '# Test 2',
'level1/level2/level3/file3.md': '# Test 3'
})
# Mock database
mock_db = Mock()
mock_db.get_metadata.side_effect = Exception("Not found")
mock_db_manager.return_value = mock_db
result = self.runner.invoke(cli, [
'recursive', str(self.test_dir),
'--depth', '2',
'--operation', 'status',
'--quiet'
])
assert result.exit_code == 0
# Should check status for files up to depth 2
assert mock_db.get_metadata.call_count == 2
def test_error_handling_stop(self):
"""Test error handling with stop strategy."""
# Create test directory with no files
result = self.runner.invoke(cli, [
'ingest-dir', str(self.test_dir),
'--error-handling', 'stop',
'--quiet'
])
# Should exit cleanly when no files found
assert result.exit_code == 0
def test_invalid_directory(self):
"""Test handling of invalid directory."""
result = self.runner.invoke(cli, [
'ingest-dir', '/nonexistent/directory',
'--quiet'
])
# Should exit with error
assert result.exit_code == 2 # Click argument validation error
@patch('markitect.database.DatabaseManager')
def test_custom_pattern(self, mock_db_manager):
"""Test custom file pattern matching."""
# Create test files with different extensions
self.create_test_files({
'file1.md': '# Test 1',
'file2.markdown': '# Test 2',
'file3.txt': 'Not markdown'
})
# Mock database
mock_db = Mock()
mock_db_manager.return_value = mock_db
result = self.runner.invoke(cli, [
'ingest-dir', str(self.test_dir),
'--pattern', '*.markdown',
'--quiet'
])
assert result.exit_code == 0
# Should process only .markdown files
assert mock_db.store_document.call_count == 1
class TestErrorHandling:
"""Test error handling scenarios."""
def setup_method(self):
"""Set up test environment."""
self.temp_dir = tempfile.mkdtemp()
self.test_dir = Path(self.temp_dir)
def teardown_method(self):
"""Clean up test environment."""
shutil.rmtree(self.temp_dir)
def test_permission_error_handling(self):
"""Test handling of permission errors."""
processor = BatchProcessor(show_progress=False)
# Mock os.listdir to raise PermissionError
with patch('pathlib.Path.iterdir') as mock_iterdir:
mock_iterdir.side_effect = PermissionError("Permission denied")
files = processor.find_markdown_files(self.test_dir)
# Should return empty list without crashing
assert files == []
def test_nonexistent_directory(self):
"""Test handling of nonexistent directories."""
processor = BatchProcessor()
with pytest.raises(FileNotFoundError):
processor.find_markdown_files(Path("/nonexistent/directory"))
def test_file_as_directory(self):
"""Test handling when a file is passed as directory."""
# Create a file
test_file = self.test_dir / "test.md"
test_file.write_text("# Test")
processor = BatchProcessor()
with pytest.raises(NotADirectoryError):
processor.find_markdown_files(test_file)
class TestEdgeCases:
"""Test edge cases and boundary conditions."""
def setup_method(self):
"""Set up test environment."""
self.temp_dir = tempfile.mkdtemp()
self.test_dir = Path(self.temp_dir)
def teardown_method(self):
"""Clean up test environment."""
shutil.rmtree(self.temp_dir)
def test_empty_directory(self):
"""Test processing empty directory."""
processor = BatchProcessor()
files = processor.find_markdown_files(self.test_dir)
assert files == []
def test_hidden_directories(self):
"""Test that hidden directories are skipped."""
# Create hidden directory
hidden_dir = self.test_dir / ".hidden"
hidden_dir.mkdir()
(hidden_dir / "test.md").write_text("# Hidden")
processor = BatchProcessor()
files = processor.find_markdown_files(self.test_dir, recursive=True)
# Should not find files in hidden directories
assert len(files) == 0
def test_depth_zero(self):
"""Test depth=0 behavior."""
# Create nested structure
(self.test_dir / "file1.md").write_text("# Test 1")
subdir = self.test_dir / "subdir"
subdir.mkdir()
(subdir / "file2.md").write_text("# Test 2")
processor = BatchProcessor()
files = processor.find_markdown_files(self.test_dir, recursive=True, depth=0)
# Depth 0 should only include files in the starting directory
# With our corrected logic, this should only find file1.md
assert len(files) == 1
assert files[0].name == "file1.md"
def test_very_deep_structure(self):
"""Test with very deep directory structure."""
# Create 10-level deep structure
# Start with a file at the root level
(self.test_dir / "file_root.md").write_text("# Root Test")
current_dir = self.test_dir
for i in range(10):
current_dir = current_dir / f"level{i}"
current_dir.mkdir()
(current_dir / f"file{i}.md").write_text(f"# Test {i}")
processor = BatchProcessor()
files = processor.find_markdown_files(self.test_dir, recursive=True, depth=5)
# Should find files up to depth 5
# Root (depth 0) + levels 0-4 (depths 1-5) = 6 files
assert len(files) == 6
def test_glob_with_no_matches(self):
"""Test glob pattern with no matches."""
processor = BatchProcessor()
files = processor.find_files_by_glob(str(self.test_dir / "*.nonexistent"))
assert files == []
def test_file_deleted_during_processing(self):
"""Test handling file deletion during processing."""
# Create test file
test_file = self.test_dir / "test.md"
test_file.write_text("# Test")
def mock_processor(file_path):
# This test is actually checking the file existence in the process_files loop
# not the processor function itself
return ProcessingResult(file_path, True, "Processed")
processor = BatchProcessor(show_progress=False)
files = [test_file]
# Delete the file after creating the file list but before processing
test_file.unlink()
result = processor.process_files(files, mock_processor, "Testing")
# Should handle gracefully - file should be skipped
assert result.skipped == 1
assert result.processed == 0

View File

@@ -0,0 +1,897 @@
"""
Tests for Issue #18: Configuration and Environment Management CLI
This test suite verifies the configuration management functionality including:
- Configuration display and management
- Project initialization functionality
- Configuration validation
- Integration with existing config system
- Environment variable handling
"""
import pytest
import tempfile
import shutil
import os
import json
import yaml
from pathlib import Path
from unittest.mock import Mock, patch, MagicMock
from click.testing import CliRunner
from markitect.config_manager import ConfigurationManager
from markitect.cli import cli
class TestConfigurationManager:
"""Test the core ConfigurationManager functionality."""
def setup_method(self):
"""Set up test environment."""
self.temp_dir = tempfile.mkdtemp()
self.test_dir = Path(self.temp_dir)
self.original_cwd = os.getcwd()
os.chdir(self.temp_dir)
def teardown_method(self):
"""Clean up test environment."""
os.chdir(self.original_cwd)
shutil.rmtree(self.temp_dir)
def test_get_current_config(self):
"""Test getting current configuration."""
config_manager = ConfigurationManager()
config = config_manager.get_current_config()
# Should have basic configuration keys
expected_keys = [
'gitea_url', 'repo_owner', 'repo_name', 'api_token',
'workspace_dir', 'database_path', 'cache_dir', 'tests_dir'
]
for key in expected_keys:
assert key in config
# Should have metadata
assert '_meta' in config
assert 'config_sources' in config['_meta']
assert 'env_variables' in config['_meta']
assert 'working_directory' in config['_meta']
def test_display_config_yaml_format(self):
"""Test configuration display in YAML format."""
config_manager = ConfigurationManager()
output = config_manager.display_config(output_format='yaml')
# Should be valid YAML
parsed = yaml.safe_load(output)
assert isinstance(parsed, dict)
assert 'gitea_url' in parsed
def test_display_config_json_format(self):
"""Test configuration display in JSON format."""
config_manager = ConfigurationManager()
output = config_manager.display_config(output_format='json')
# Should be valid JSON
parsed = json.loads(output)
assert isinstance(parsed, dict)
assert 'gitea_url' in parsed
def test_display_config_simple_format(self):
"""Test configuration display in simple format."""
config_manager = ConfigurationManager()
output = config_manager.display_config(output_format='simple')
# Should contain key=value pairs
lines = output.split('\n')
assert any('gitea_url =' in line for line in lines)
assert any('repo_owner =' in line for line in lines)
def test_mask_sensitive_data(self):
"""Test masking of sensitive configuration data."""
config_manager = ConfigurationManager()
# Create config with sensitive data
config = {
'api_token': 'secret123',
'password': 'mypassword',
'gitea_url': 'http://localhost:3000',
'repo_name': 'test-repo'
}
masked = config_manager._mask_sensitive_data(config)
# Sensitive fields should be masked
assert masked['api_token'] == '***MASKED***'
assert masked['password'] == '***MASKED***'
# Non-sensitive fields should remain
assert masked['gitea_url'] == 'http://localhost:3000'
assert masked['repo_name'] == 'test-repo'
def test_mask_sensitive_data_preserves_empty_values(self):
"""Test that empty sensitive values are not masked."""
config_manager = ConfigurationManager()
config = {
'api_token': '',
'secret': None,
'repo_name': 'test-repo'
}
masked = config_manager._mask_sensitive_data(config)
# Empty/None values should not be masked
assert masked['api_token'] == ''
assert masked['secret'] is None
assert masked['repo_name'] == 'test-repo'
def test_set_config_value_simple(self):
"""Test setting a simple configuration value."""
config_manager = ConfigurationManager()
success = config_manager.set_config_value('repo_name', 'test-repository')
assert success
# Verify file was created
config_file = self.test_dir / '.markitect.yml'
assert config_file.exists()
# Verify content
content = yaml.safe_load(config_file.read_text())
assert content['repo_name'] == 'test-repository'
def test_set_config_value_nested(self):
"""Test setting nested configuration value with dot notation."""
config_manager = ConfigurationManager()
success = config_manager.set_config_value('gitea.url', 'http://example.com')
assert success
# Verify nested structure
config_file = self.test_dir / '.markitect.yml'
content = yaml.safe_load(config_file.read_text())
assert content['gitea']['url'] == 'http://example.com'
def test_set_config_value_type_conversion(self):
"""Test automatic type conversion for configuration values."""
config_manager = ConfigurationManager()
# Test boolean conversion
config_manager.set_config_value('debug', 'true')
config_manager.set_config_value('verbose', 'false')
# Test number conversion
config_manager.set_config_value('port', '3000')
config_manager.set_config_value('timeout', '30.5')
config_file = self.test_dir / '.markitect.yml'
content = yaml.safe_load(config_file.read_text())
assert content['debug'] is True
assert content['verbose'] is False
assert content['port'] == 3000
assert content['timeout'] == 30.5
def test_set_config_value_validation_error(self):
"""Test configuration validation during value setting."""
config_manager = ConfigurationManager()
# Test invalid URL
with pytest.raises(ValueError, match="not a valid URL format"):
config_manager.set_config_value('gitea_url', 'invalid-url')
def test_set_config_value_existing_file(self):
"""Test setting values in existing configuration file."""
# Create existing config file
existing_config = {
'repo_name': 'old-name',
'gitea_url': 'http://localhost:3000'
}
config_file = self.test_dir / '.markitect.yml'
config_file.write_text(yaml.dump(existing_config))
config_manager = ConfigurationManager()
success = config_manager.set_config_value('repo_name', 'new-name')
assert success
# Verify update
content = yaml.safe_load(config_file.read_text())
assert content['repo_name'] == 'new-name'
assert content['gitea_url'] == 'http://localhost:3000' # Should be preserved
def test_set_config_value_custom_file(self):
"""Test setting values in custom configuration file."""
custom_file = self.test_dir / 'custom.yml'
config_manager = ConfigurationManager()
success = config_manager.set_config_value('repo_name', 'test', str(custom_file))
assert success
assert custom_file.exists()
content = yaml.safe_load(custom_file.read_text())
assert content['repo_name'] == 'test'
def test_initialize_project_config(self):
"""Test project configuration initialization."""
config_manager = ConfigurationManager()
result = config_manager.initialize_project_config(self.test_dir, interactive=False)
# Verify config file created
config_file = Path(result['config_file'])
assert config_file.exists()
assert config_file.name == '.markitect.yml'
# Verify directories created
assert len(result['created_directories']) > 0
for directory in result['created_directories']:
assert Path(directory).exists()
# Verify config structure
config = result['config']
assert 'gitea_url' in config
assert 'repo_name' in config
assert config['repo_name'] == self.test_dir.name
def test_initialize_project_config_custom_dir(self):
"""Test project initialization in custom directory."""
project_dir = self.test_dir / 'my-project'
config_manager = ConfigurationManager()
result = config_manager.initialize_project_config(project_dir, interactive=False)
# Verify correct location
config_file = Path(result['config_file'])
assert config_file.parent == project_dir
assert project_dir.exists()
def test_validate_configuration_success(self):
"""Test configuration validation with valid config."""
config = {
'gitea_url': 'http://localhost:3000',
'database_path': str(self.test_dir / 'db.sqlite'),
'workspace_dir': str(self.test_dir / 'workspace'),
'cache_dir': str(self.test_dir / 'cache'),
'tests_dir': str(self.test_dir / 'tests')
}
config_manager = ConfigurationManager()
results = config_manager.validate_configuration(config)
# Should have validation results
assert len(results) > 0
# Check for required field validations
required_checks = [r for r in results if r['key'] in ['gitea_url', 'database_path']]
assert len(required_checks) > 0
def test_validate_configuration_missing_required(self):
"""Test configuration validation with missing required fields."""
config = {
'repo_name': 'test'
# Missing gitea_url and database_path
}
config_manager = ConfigurationManager()
results = config_manager.validate_configuration(config)
# Should have errors for missing required fields
errors = [r for r in results if r['status'] == 'error']
assert len(errors) > 0
error_keys = [r['key'] for r in errors]
assert 'gitea_url' in error_keys or 'database_path' in error_keys
def test_validate_configuration_path_creation(self):
"""Test configuration validation creates missing directories."""
config = {
'gitea_url': 'http://localhost:3000',
'database_path': str(self.test_dir / 'data' / 'db.sqlite'),
'workspace_dir': str(self.test_dir / 'new_workspace'),
'cache_dir': str(self.test_dir / 'new_cache'),
'tests_dir': str(self.test_dir / 'new_tests')
}
config_manager = ConfigurationManager()
results = config_manager.validate_configuration(config)
# Directories should be created
assert (self.test_dir / 'new_workspace').exists()
assert (self.test_dir / 'new_cache').exists()
assert (self.test_dir / 'new_tests').exists()
assert (self.test_dir / 'data').exists() # Database parent directory
# Should have warnings for created directories
warnings = [r for r in results if r['status'] == 'warning']
assert len(warnings) > 0
def test_list_config_keys(self):
"""Test listing available configuration keys."""
config_manager = ConfigurationManager()
keys = config_manager.list_config_keys()
# Should return list of tuples
assert isinstance(keys, list)
assert len(keys) > 0
# Each item should be (key, description, default)
for item in keys:
assert len(item) == 3
assert isinstance(item[0], str) # key
assert isinstance(item[1], str) # description
# Should include expected keys
key_names = [item[0] for item in keys]
assert 'gitea_url' in key_names
assert 'repo_name' in key_names
assert 'api_token' in key_names
def test_get_config_help_specific_key(self):
"""Test getting help for specific configuration key."""
config_manager = ConfigurationManager()
help_text = config_manager.get_config_help('gitea_url')
assert 'gitea_url' in help_text
assert 'description' in help_text.lower() or ':' in help_text
def test_get_config_help_unknown_key(self):
"""Test getting help for unknown configuration key."""
config_manager = ConfigurationManager()
help_text = config_manager.get_config_help('unknown_key')
assert 'unknown' in help_text.lower()
def test_get_config_help_general(self):
"""Test getting general configuration help."""
config_manager = ConfigurationManager()
help_text = config_manager.get_config_help()
assert 'available' in help_text.lower() or 'configuration' in help_text.lower()
assert 'gitea_url' in help_text
assert 'repo_name' in help_text
def test_convert_value_booleans(self):
"""Test value conversion for boolean types."""
config_manager = ConfigurationManager()
# True values
for true_val in ['true', 'True', 'TRUE', 'yes', 'YES', 'on', 'ON', '1']:
assert config_manager._convert_value(true_val) is True
# False values
for false_val in ['false', 'False', 'FALSE', 'no', 'NO', 'off', 'OFF', '0']:
assert config_manager._convert_value(false_val) is False
def test_convert_value_numbers(self):
"""Test value conversion for numeric types."""
config_manager = ConfigurationManager()
# Integers
assert config_manager._convert_value('42') == 42
assert config_manager._convert_value('-10') == -10
# Floats
assert config_manager._convert_value('3.14') == 3.14
assert config_manager._convert_value('-2.5') == -2.5
# Strings that look like numbers but aren't
assert config_manager._convert_value('not-a-number') == 'not-a-number'
def test_is_valid_url(self):
"""Test URL validation."""
config_manager = ConfigurationManager()
# Valid URLs
valid_urls = [
'http://localhost:3000',
'https://example.com',
'http://192.168.1.1:8080',
'https://api.github.com/repos'
]
for url in valid_urls:
assert config_manager._is_valid_url(url), f"Should be valid: {url}"
# Invalid URLs
invalid_urls = [
'not-a-url',
'ftp://example.com',
'localhost:3000',
'http://',
''
]
for url in invalid_urls:
assert not config_manager._is_valid_url(url), f"Should be invalid: {url}"
def test_is_valid_path(self):
"""Test path validation."""
config_manager = ConfigurationManager()
# Valid paths
valid_paths = [
'/absolute/path',
'./relative/path',
'~/home/path',
'simple-path',
str(self.test_dir / 'subdir')
]
for path in valid_paths:
assert config_manager._is_valid_path(path), f"Should be valid: {path}"
# Edge case: empty string (should be considered valid as it can represent current directory)
assert config_manager._is_valid_path('')
class TestConfigFileParsing:
"""Test configuration file parsing and saving."""
def setup_method(self):
"""Set up test environment."""
self.temp_dir = tempfile.mkdtemp()
self.test_dir = Path(self.temp_dir)
self.original_cwd = os.getcwd()
os.chdir(self.temp_dir)
def teardown_method(self):
"""Clean up test environment."""
os.chdir(self.original_cwd)
shutil.rmtree(self.temp_dir)
def test_load_yaml_config_file(self):
"""Test loading YAML configuration file."""
config_content = {
'gitea_url': 'http://localhost:3000',
'repo_name': 'test-repo',
'nested': {
'key': 'value'
}
}
config_file = self.test_dir / 'test.yml'
config_file.write_text(yaml.dump(config_content))
config_manager = ConfigurationManager()
loaded = config_manager._load_config_file(config_file)
assert loaded == config_content
def test_load_json_config_file(self):
"""Test loading JSON configuration file."""
config_content = {
'gitea_url': 'http://localhost:3000',
'repo_name': 'test-repo',
'debug': True
}
config_file = self.test_dir / 'test.json'
config_file.write_text(json.dumps(config_content))
config_manager = ConfigurationManager()
loaded = config_manager._load_config_file(config_file)
assert loaded == config_content
def test_save_yaml_config_file(self):
"""Test saving YAML configuration file."""
config_content = {
'gitea_url': 'http://localhost:3000',
'repo_name': 'test-repo'
}
config_file = self.test_dir / 'output.yml'
config_manager = ConfigurationManager()
config_manager._save_config_file(config_content, config_file)
assert config_file.exists()
loaded = yaml.safe_load(config_file.read_text())
assert loaded == config_content
def test_save_json_config_file(self):
"""Test saving JSON configuration file."""
config_content = {
'gitea_url': 'http://localhost:3000',
'repo_name': 'test-repo'
}
config_file = self.test_dir / 'output.json'
config_manager = ConfigurationManager()
config_manager._save_config_file(config_content, config_file)
assert config_file.exists()
loaded = json.loads(config_file.read_text())
assert loaded == config_content
def test_load_invalid_config_file(self):
"""Test loading invalid configuration file."""
config_file = self.test_dir / 'invalid.yml'
config_file.write_text('invalid: yaml: content: [')
config_manager = ConfigurationManager()
with pytest.raises(ValueError, match="Failed to load config file"):
config_manager._load_config_file(config_file)
def test_get_target_config_file_existing(self):
"""Test getting target config file when one exists."""
# Create an existing config file
existing_file = self.test_dir / '.markitect.yml'
existing_file.write_text('test: value')
config_manager = ConfigurationManager()
target = config_manager._get_target_config_file()
# Should be relative path that matches the existing file
assert target.name == existing_file.name
assert target.exists()
def test_get_target_config_file_default(self):
"""Test getting target config file when none exists."""
config_manager = ConfigurationManager()
target = config_manager._get_target_config_file()
# Should be the default config file name
assert target.name == '.markitect.yml'
def test_get_target_config_file_custom(self):
"""Test getting custom target config file."""
custom_file = 'custom-config.yml'
config_manager = ConfigurationManager()
target = config_manager._get_target_config_file(custom_file)
assert target == Path(custom_file)
class TestCLIIntegration:
"""Test CLI command integration."""
def setup_method(self):
"""Set up test environment."""
self.temp_dir = tempfile.mkdtemp()
self.test_dir = Path(self.temp_dir)
self.original_cwd = os.getcwd()
os.chdir(self.temp_dir)
self.runner = CliRunner()
def teardown_method(self):
"""Clean up test environment."""
os.chdir(self.original_cwd)
shutil.rmtree(self.temp_dir)
def test_config_show_command(self):
"""Test config-show CLI command."""
result = self.runner.invoke(cli, ['config-show'])
assert result.exit_code == 0
assert 'gitea_url' in result.output
def test_config_show_command_json_format(self):
"""Test config-show with JSON format."""
result = self.runner.invoke(cli, ['config-show', '--format', 'json'])
assert result.exit_code == 0
# Output should be valid JSON
try:
json.loads(result.output)
except json.JSONDecodeError:
pytest.fail("Output is not valid JSON")
def test_config_show_command_simple_format(self):
"""Test config-show with simple format."""
result = self.runner.invoke(cli, ['config-show', '--format', 'simple'])
assert result.exit_code == 0
assert '=' in result.output # Should contain key=value pairs
def test_config_set_command(self):
"""Test config-set CLI command."""
result = self.runner.invoke(cli, [
'config-set', 'repo_name', 'test-repository'
])
assert result.exit_code == 0
assert 'Configuration updated' in result.output
# Verify file was created
config_file = self.test_dir / '.markitect.yml'
assert config_file.exists()
content = yaml.safe_load(config_file.read_text())
assert content['repo_name'] == 'test-repository'
def test_config_set_command_nested_key(self):
"""Test config-set with nested key."""
result = self.runner.invoke(cli, [
'config-set', 'gitea.url', 'http://example.com'
])
assert result.exit_code == 0
config_file = self.test_dir / '.markitect.yml'
content = yaml.safe_load(config_file.read_text())
assert content['gitea']['url'] == 'http://example.com'
def test_config_set_command_custom_file(self):
"""Test config-set with custom config file."""
custom_file = 'custom.yml'
result = self.runner.invoke(cli, [
'config-set', 'repo_name', 'test',
'--config-file', custom_file
])
assert result.exit_code == 0
assert Path(custom_file).exists()
def test_config_set_command_no_validate(self):
"""Test config-set with validation disabled."""
result = self.runner.invoke(cli, [
'config-set', 'repo_name', 'test',
'--no-validate'
])
assert result.exit_code == 0
def test_config_set_command_validation_error(self):
"""Test config-set with validation error."""
result = self.runner.invoke(cli, [
'config-set', 'gitea_url', 'invalid-url'
])
assert result.exit_code == 1
assert 'Configuration error' in result.output
def test_config_init_command_non_interactive(self):
"""Test config-init CLI command in non-interactive mode."""
result = self.runner.invoke(cli, [
'config-init', '--no-interactive'
])
assert result.exit_code == 0
assert 'initialized successfully' in result.output
# Verify config file created
config_file = self.test_dir / '.markitect.yml'
assert config_file.exists()
# Verify directories created
assert (self.test_dir / '.markitect_workspace').exists()
assert (self.test_dir / '.ast_cache').exists()
assert (self.test_dir / 'tests').exists()
def test_config_init_command_custom_directory(self):
"""Test config-init with custom project directory."""
project_dir = self.test_dir / 'my-project'
result = self.runner.invoke(cli, [
'config-init',
'--project-dir', str(project_dir),
'--no-interactive'
])
assert result.exit_code == 0
# Verify in correct location
config_file = project_dir / '.markitect.yml'
assert config_file.exists()
def test_config_init_command_existing_file_no_force(self):
"""Test config-init with existing file without force."""
# Create existing config file
config_file = self.test_dir / '.markitect.yml'
config_file.write_text('existing: config')
result = self.runner.invoke(cli, [
'config-init', '--no-interactive'
])
assert result.exit_code == 1
assert 'already exists' in result.output
def test_config_init_command_existing_file_with_force(self):
"""Test config-init with existing file with force."""
# Create existing config file
config_file = self.test_dir / '.markitect.yml'
config_file.write_text('existing: config')
result = self.runner.invoke(cli, [
'config-init', '--no-interactive', '--force'
])
assert result.exit_code == 0
assert 'initialized successfully' in result.output
def test_config_validate_command(self):
"""Test config-validate CLI command."""
result = self.runner.invoke(cli, ['config-validate'])
assert result.exit_code in [0, 1] # May have warnings/errors
assert 'Configuration Validation Summary' in result.output
def test_config_validate_command_verbose(self):
"""Test config-validate with verbose output."""
result = self.runner.invoke(cli, ['config-validate', '--verbose'])
assert result.exit_code in [0, 1]
assert 'Configuration Validation Summary' in result.output
def test_config_help_command_general(self):
"""Test config-help CLI command."""
result = self.runner.invoke(cli, ['config-help'])
assert result.exit_code == 0
assert 'available' in result.output.lower() or 'configuration' in result.output.lower()
assert 'gitea_url' in result.output
def test_config_help_command_specific_key(self):
"""Test config-help for specific key."""
result = self.runner.invoke(cli, ['config-help', 'gitea_url'])
assert result.exit_code == 0
assert 'gitea_url' in result.output
def test_config_help_command_unknown_key(self):
"""Test config-help for unknown key."""
result = self.runner.invoke(cli, ['config-help', 'unknown_key'])
assert result.exit_code == 0
assert 'unknown' in result.output.lower()
class TestEnvironmentVariables:
"""Test environment variable handling."""
def setup_method(self):
"""Set up test environment."""
self.temp_dir = tempfile.mkdtemp()
self.test_dir = Path(self.temp_dir)
self.original_cwd = os.getcwd()
os.chdir(self.temp_dir)
# Store original environment
self.original_env = dict(os.environ)
def teardown_method(self):
"""Clean up test environment."""
os.chdir(self.original_cwd)
shutil.rmtree(self.temp_dir)
# Restore original environment
os.environ.clear()
os.environ.update(self.original_env)
def test_get_relevant_env_vars(self):
"""Test getting MARKITECT-related environment variables."""
# Set some test environment variables
os.environ['MARKITECT_GITEA_URL'] = 'http://test.com'
os.environ['MARKITECT_REPO_NAME'] = 'test-repo'
os.environ['OTHER_VAR'] = 'should-be-ignored'
config_manager = ConfigurationManager()
env_vars = config_manager._get_relevant_env_vars()
assert 'MARKITECT_GITEA_URL' in env_vars
assert 'MARKITECT_REPO_NAME' in env_vars
assert 'OTHER_VAR' not in env_vars
assert env_vars['MARKITECT_GITEA_URL'] == 'http://test.com'
def test_config_with_env_vars(self):
"""Test configuration loading with environment variables."""
# Set environment variables
os.environ['MARKITECT_GITEA_URL'] = 'http://env-test.com'
os.environ['MARKITECT_REPO_NAME'] = 'env-repo'
config_manager = ConfigurationManager()
config = config_manager.get_current_config()
# Environment variables should be reflected in config
assert config['gitea_url'] == 'http://env-test.com'
assert config['repo_name'] == 'env-repo'
# Should have env vars in metadata
assert 'MARKITECT_GITEA_URL' in config['_meta']['env_variables']
assert 'MARKITECT_REPO_NAME' in config['_meta']['env_variables']
class TestEdgeCases:
"""Test edge cases and error conditions."""
def setup_method(self):
"""Set up test environment."""
self.temp_dir = tempfile.mkdtemp()
self.test_dir = Path(self.temp_dir)
self.original_cwd = os.getcwd()
os.chdir(self.temp_dir)
def teardown_method(self):
"""Clean up test environment."""
os.chdir(self.original_cwd)
shutil.rmtree(self.temp_dir)
def test_config_manager_with_permission_error(self):
"""Test handling permission errors."""
# Create a directory we can't write to
restricted_dir = self.test_dir / 'restricted'
restricted_dir.mkdir(mode=0o444) # Read-only
config_manager = ConfigurationManager()
# This should not crash, but may warn about permission issues
try:
config = config_manager.get_current_config()
assert isinstance(config, dict)
except PermissionError:
# Acceptable to fail with permission error
pass
def test_set_config_value_invalid_file_path(self):
"""Test setting config value with invalid file path."""
config_manager = ConfigurationManager()
# Try to write to a path that doesn't exist and can't be created
# Use a more reliable invalid path that doesn't depend on system permissions
invalid_path = '/nonexistent_directory_12345/config.yml'
with pytest.raises(ValueError):
config_manager.set_config_value('test', 'value', invalid_path)
def test_validate_configuration_with_none(self):
"""Test configuration validation with None input."""
config_manager = ConfigurationManager()
# Should not crash with None input
results = config_manager.validate_configuration(None)
assert isinstance(results, list)
def test_mask_sensitive_data_with_complex_structure(self):
"""Test masking sensitive data in complex nested structure."""
config_manager = ConfigurationManager()
complex_config = {
'database': {
'password': 'secret123',
'host': 'localhost'
},
'apis': [
{'name': 'github', 'token': 'ghp_secret'},
{'name': 'gitea', 'url': 'http://localhost:3000'}
],
'secrets': {
'nested': {
'api_key': 'very_secret'
}
}
}
masked = config_manager._mask_sensitive_data(complex_config)
# Sensitive fields should be masked at any level
assert masked['database']['password'] == '***MASKED***'
assert masked['apis'][0]['token'] == '***MASKED***'
# The 'secrets' key itself gets masked because it's a sensitive keyword
# This is actually correct behavior
assert masked['secrets'] == '***MASKED***'
# Non-sensitive fields should remain
assert masked['database']['host'] == 'localhost'
assert masked['apis'][1]['url'] == 'http://localhost:3000'
def test_get_config_sources_empty_directory(self):
"""Test getting config sources in empty directory."""
config_manager = ConfigurationManager()
sources = config_manager._get_config_sources()
# Should always include defaults
assert len(sources) > 0
assert any('defaults' in source.lower() for source in sources)
def test_initialize_project_config_existing_directories(self):
"""Test project initialization with existing directories."""
# Pre-create some directories
(self.test_dir / '.markitect_workspace').mkdir()
(self.test_dir / 'tests').mkdir()
config_manager = ConfigurationManager()
result = config_manager.initialize_project_config(self.test_dir, interactive=False)
# Should succeed even with existing directories
assert 'config_file' in result
assert Path(result['config_file']).exists()