""" Configuration and Environment Management - Issue #18 This module provides comprehensive configuration management capabilities for MarkiTect, allowing users to manage configuration and environment settings through CLI commands. Features: - Display current configuration with optional sensitive data masking - Set configuration values with validation - Initialize configuration for new projects - Configuration validation and help - Integration with existing configuration system - Environment variable management - Project-specific configuration support Commands implemented: - config-show: Display current configuration - config-set: Set configuration values - config-init: Initialize configuration for new project """ import os import json import yaml from pathlib import Path from typing import Dict, Any, Optional, List, Union, Tuple from dataclasses import asdict, fields from copy import deepcopy import click class ConfigurationManager: """Core configuration management functionality.""" def __init__(self): self.config_file_names = [ '.markitect.yml', '.markitect.yaml', '.markitect.json', 'markitect.config.yml', 'markitect.config.yaml', 'markitect.config.json' ] self.sensitive_keys = { 'api_token', 'token', 'password', 'secret', 'key', 'gitea_token', 'github_token', 'auth_token' } def get_current_config(self) -> Dict[str, Any]: """Get current configuration from all sources.""" config = {} # Load from existing configuration system try: from config.manager import MarkitectConfig markitect_config = MarkitectConfig() config = asdict(markitect_config) # Convert Path objects to strings for JSON serialization for key, value in config.items(): if isinstance(value, Path): config[key] = str(value) except (ImportError, Exception): # Fallback to basic configuration config = { 'gitea_url': os.getenv('MARKITECT_GITEA_URL', 'http://localhost:3000'), 'repo_owner': os.getenv('MARKITECT_REPO_OWNER', ''), 'repo_name': os.getenv('MARKITECT_REPO_NAME', ''), 'api_token': os.getenv('MARKITECT_API_TOKEN', ''), 'workspace_dir': os.getenv('MARKITECT_WORKSPACE_DIR', '.markitect_workspace'), 'database_path': os.getenv('MARKITECT_DATABASE_PATH', str(Path.home() / '.markitect' / 'markitect.db')), 'cache_dir': os.getenv('MARKITECT_CACHE_DIR', '.ast_cache'), 'tests_dir': os.getenv('MARKITECT_TESTS_DIR', 'tests'), 'test_file_pattern': os.getenv('MARKITECT_TEST_FILE_PATTERN', 'test_issue_{issue_num}_{scenario}.py'), 'claude_code_command': os.getenv('MARKITECT_CLAUDE_CODE_COMMAND', 'claude') } # Add metadata about configuration sources config['_meta'] = { 'config_sources': self._get_config_sources(), 'env_variables': self._get_relevant_env_vars(), 'working_directory': str(Path.cwd()), 'config_file_locations': self._get_config_file_locations() } return config def _get_config_sources(self) -> List[str]: """Get list of configuration sources in order of precedence.""" sources = [] # Check for config files for config_name in self.config_file_names: if Path(config_name).exists(): sources.append(f"File: {config_name}") # Check for environment variables env_vars = [key for key in os.environ.keys() if key.startswith('MARKITECT_')] if env_vars: sources.append(f"Environment variables: {len(env_vars)} found") # Always include defaults sources.append("Built-in defaults") return sources def _get_relevant_env_vars(self) -> Dict[str, str]: """Get MARKITECT-related environment variables.""" env_vars = {} for key, value in os.environ.items(): if key.startswith('MARKITECT_'): env_vars[key] = value return env_vars def _get_config_file_locations(self) -> Dict[str, bool]: """Get status of potential config file locations.""" locations = {} # Current directory for config_name in self.config_file_names: current_dir_path = Path.cwd() / config_name locations[str(current_dir_path)] = current_dir_path.exists() # Home directory home_config = Path.home() / '.markitect' / 'config.yml' locations[str(home_config)] = home_config.exists() return locations def display_config(self, show_sensitive: bool = False, output_format: str = 'yaml') -> str: """Display current configuration in specified format.""" config = self.get_current_config() if not show_sensitive: config = self._mask_sensitive_data(config) if output_format == 'json': return json.dumps(config, indent=2, default=str) elif output_format == 'yaml': return yaml.dump(config, default_flow_style=False, sort_keys=True) else: # Simple format return self._format_simple(config) def _mask_sensitive_data(self, config: Dict[str, Any]) -> Dict[str, Any]: """Mask sensitive configuration values.""" masked_config = deepcopy(config) def mask_recursive(obj): if isinstance(obj, dict): for key, value in obj.items(): if any(sensitive in key.lower() for sensitive in self.sensitive_keys): if value and str(value).strip(): obj[key] = '***MASKED***' else: mask_recursive(value) elif isinstance(obj, list): for item in obj: mask_recursive(item) mask_recursive(masked_config) return masked_config def _format_simple(self, config: Dict[str, Any]) -> str: """Format configuration in simple key=value style.""" lines = [] def format_recursive(obj, prefix=''): if isinstance(obj, dict): for key, value in sorted(obj.items()): full_key = f"{prefix}{key}" if prefix else key if isinstance(value, (dict, list)): if key != '_meta': # Skip meta in simple format format_recursive(value, f"{full_key}.") else: lines.append(f"{full_key} = {value}") elif isinstance(obj, list): for i, item in enumerate(obj): format_recursive(item, f"{prefix}{i}.") format_recursive(config) return '\n'.join(lines) def set_config_value(self, key: str, value: str, config_file: Optional[str] = None) -> bool: """Set a configuration value and persist it.""" # Determine target config file target_file = self._get_target_config_file(config_file) # Load existing config from file existing_config = self._load_config_file(target_file) if target_file.exists() else {} # Set the value (support nested keys with dot notation) self._set_nested_value(existing_config, key, value) # Validate the new configuration validation_errors = self._validate_config_value(key, value) if validation_errors: raise ValueError(f"Configuration validation failed: {'; '.join(validation_errors)}") # Save back to file self._save_config_file(existing_config, target_file) return True def _get_target_config_file(self, config_file: Optional[str] = None) -> Path: """Determine target configuration file.""" if config_file: return Path(config_file) # Look for existing config file for config_name in self.config_file_names: config_path = Path(config_name) if config_path.exists(): return config_path # Default to .markitect.yml in current directory return Path('.markitect.yml') def _load_config_file(self, config_file: Path) -> Dict[str, Any]: """Load configuration from file.""" try: content = config_file.read_text() if config_file.suffix in ['.yml', '.yaml']: return yaml.safe_load(content) or {} elif config_file.suffix == '.json': return json.loads(content) else: # Try YAML first, then JSON try: return yaml.safe_load(content) or {} except yaml.YAMLError: return json.loads(content) except Exception as e: raise ValueError(f"Failed to load config file {config_file}: {e}") def _save_config_file(self, config: Dict[str, Any], config_file: Path) -> None: """Save configuration to file.""" try: # Ensure directory exists config_file.parent.mkdir(parents=True, exist_ok=True) if config_file.suffix in ['.yml', '.yaml']: content = yaml.dump(config, default_flow_style=False, sort_keys=True) elif config_file.suffix == '.json': content = json.dumps(config, indent=2, sort_keys=True) else: # Default to YAML content = yaml.dump(config, default_flow_style=False, sort_keys=True) config_file.write_text(content) except Exception as e: raise ValueError(f"Failed to save config file {config_file}: {e}") def _set_nested_value(self, config: Dict[str, Any], key: str, value: str) -> None: """Set a nested configuration value using dot notation.""" keys = key.split('.') current = config # Navigate to the parent of the target key for k in keys[:-1]: if k not in current: current[k] = {} current = current[k] # Set the final value final_key = keys[-1] # Try to convert value to appropriate type converted_value = self._convert_value(value) current[final_key] = converted_value def _convert_value(self, value: str) -> Any: """Convert string value to appropriate type.""" # Handle boolean values if value.lower() in ('true', 'yes', 'on', '1'): return True elif value.lower() in ('false', 'no', 'off', '0'): return False # Try to convert to number try: if '.' in value: return float(value) else: return int(value) except ValueError: pass # Return as string return value def _validate_config_value(self, key: str, value: str) -> List[str]: """Validate a configuration value.""" errors = [] # Path validation if any(path_key in key.lower() for path_key in ['dir', 'path', 'file']): if value and not self._is_valid_path(value): errors.append(f"'{value}' is not a valid path format") # URL validation if 'url' in key.lower(): if value and not self._is_valid_url(value): errors.append(f"'{value}' is not a valid URL format") # Required field validation for setting values (not for display) # We only enforce required fields when explicitly setting them if key == 'gitea_url' and not value.strip(): errors.append(f"'{key}' is required and cannot be empty") elif key == 'database_path' and not value.strip(): errors.append(f"'{key}' is required and cannot be empty") return errors def _is_valid_path(self, path_str: str) -> bool: """Check if a string represents a valid path.""" try: Path(path_str) return True except (ValueError, OSError): return False def _is_valid_url(self, url_str: str) -> bool: """Check if a string represents a valid URL.""" import re url_pattern = re.compile( r'^https?://' # http:// or https:// r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' # domain... r'localhost|' # localhost... r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip r'(?::\d+)?' # optional port r'(?:/?|[/?]\S+)$', re.IGNORECASE) return url_pattern.match(url_str) is not None def initialize_project_config(self, project_dir: Optional[Path] = None, interactive: bool = True) -> Dict[str, Any]: """Initialize configuration for a new project.""" target_dir = project_dir or Path.cwd() config_file = target_dir / '.markitect.yml' # Default configuration template default_config = { 'gitea_url': 'http://localhost:3000', 'repo_owner': '', 'repo_name': target_dir.name, 'workspace_dir': '.markitect_workspace', 'cache_dir': '.ast_cache', 'tests_dir': 'tests', 'test_file_pattern': 'test_issue_{issue_num}_{scenario}.py', 'claude_code_command': 'claude' } if interactive: default_config = self._interactive_config_setup(default_config) # Create necessary directories workspace_dir = target_dir / default_config['workspace_dir'] cache_dir = target_dir / default_config['cache_dir'] tests_dir = target_dir / default_config['tests_dir'] for directory in [workspace_dir, cache_dir, tests_dir]: directory.mkdir(parents=True, exist_ok=True) # Save configuration self._save_config_file(default_config, config_file) return { 'config_file': str(config_file), 'config': default_config, 'created_directories': [str(workspace_dir), str(cache_dir), str(tests_dir)] } def _interactive_config_setup(self, config: Dict[str, Any]) -> Dict[str, Any]: """Interactive configuration setup (would be used with CLI).""" # This method would be called by CLI with click.prompt() # For now, return the default config # The CLI integration will handle the interactive prompts return config def list_config_keys(self) -> List[Tuple[str, str, Any]]: """List all available configuration keys with descriptions.""" config_schema = [ ('gitea_url', 'Gitea server URL', 'http://localhost:3000'), ('repo_owner', 'Repository owner/organization', ''), ('repo_name', 'Repository name', ''), ('api_token', 'API token for authentication', ''), ('workspace_dir', 'Workspace directory path', '.markitect_workspace'), ('database_path', 'Database file path', '~/.markitect/markitect.db'), ('cache_dir', 'AST cache directory', '.ast_cache'), ('tests_dir', 'Tests directory', 'tests'), ('test_file_pattern', 'Test file naming pattern', 'test_issue_{issue_num}_{scenario}.py'), ('claude_code_command', 'Claude Code command', 'claude'), ] return config_schema def get_config_help(self, key: Optional[str] = None) -> str: """Get help information for configuration.""" if key: # Get help for specific key for config_key, description, default in self.list_config_keys(): if config_key == key: return f"{config_key}: {description} (default: {default})" return f"Unknown configuration key: {key}" else: # Get general help lines = ["Available configuration keys:"] for config_key, description, default in self.list_config_keys(): lines.append(f" {config_key:<20} {description}") return '\n'.join(lines) def validate_configuration(self, config: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]: """Validate current or provided configuration.""" if config is None: config = self.get_current_config() validation_results = [] # Check required fields required_fields = ['gitea_url', 'database_path'] for field in required_fields: if field not in config or not config[field]: validation_results.append({ 'key': field, 'status': 'error', 'message': f'Required field {field} is missing or empty' }) else: validation_results.append({ 'key': field, 'status': 'ok', 'message': f'{field} is configured' }) # Check paths exist or are creatable path_fields = ['workspace_dir', 'cache_dir', 'tests_dir'] for field in path_fields: if field in config: path = Path(config[field]) if path.exists(): validation_results.append({ 'key': field, 'status': 'ok', 'message': f'{field} exists at {path}' }) else: try: path.mkdir(parents=True, exist_ok=True) validation_results.append({ 'key': field, 'status': 'warning', 'message': f'{field} created at {path}' }) except OSError as e: validation_results.append({ 'key': field, 'status': 'error', 'message': f'{field} cannot be created: {e}' }) # Check database path parent directory if 'database_path' in config: db_path = Path(config['database_path']) db_parent = db_path.parent if not db_parent.exists(): try: db_parent.mkdir(parents=True, exist_ok=True) validation_results.append({ 'key': 'database_path', 'status': 'warning', 'message': f'Database directory created at {db_parent}' }) except OSError as e: validation_results.append({ 'key': 'database_path', 'status': 'error', 'message': f'Cannot create database directory: {e}' }) else: validation_results.append({ 'key': 'database_path', 'status': 'ok', 'message': f'Database directory exists at {db_parent}' }) return validation_results