Files
markitect-main/infrastructure/config.py
tegwick f782ac1f69 fix: Add missing infrastructure files from data access improvements
Add infrastructure components that were created during issue #24
but not properly committed:

- Data access repositories and interfaces
- Connection management infrastructure
- Exception handling framework
- Configuration management
- Documentation from data access pattern improvements

These files are essential infrastructure components that enable
the repository pattern and improved data access strategies.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-27 08:35:34 +02:00

440 lines
15 KiB
Python

"""
Configuration management for infrastructure components.
Provides centralized configuration for data sources, connection settings,
and operational parameters with environment variable support.
"""
import os
from typing import Optional, Dict, Any
from dataclasses import dataclass, field
from pathlib import Path
@dataclass
class DatabaseConfig:
"""Configuration for database connections."""
path: str = "markitect.db"
pool_size: int = 10
timeout: int = 30
journal_mode: str = "WAL"
synchronous: str = "NORMAL"
cache_size: int = 10000
temp_store: str = "MEMORY"
@classmethod
def from_env(cls) -> "DatabaseConfig":
"""Create configuration from environment variables."""
return cls(
path=os.getenv("MARKITECT_DB_PATH", cls.path),
pool_size=int(os.getenv("MARKITECT_DB_POOL_SIZE", str(cls.pool_size))),
timeout=int(os.getenv("MARKITECT_DB_TIMEOUT", str(cls.timeout))),
journal_mode=os.getenv("MARKITECT_DB_JOURNAL_MODE", cls.journal_mode),
synchronous=os.getenv("MARKITECT_DB_SYNCHRONOUS", cls.synchronous),
cache_size=int(os.getenv("MARKITECT_DB_CACHE_SIZE", str(cls.cache_size))),
temp_store=os.getenv("MARKITECT_DB_TEMP_STORE", cls.temp_store)
)
@dataclass
class GiteaConfig:
"""Configuration for Gitea API connections."""
base_url: str = "http://localhost:3000"
token: str = ""
repo_owner: str = "owner"
repo_name: str = "repo"
connection_pool_size: int = 20
connection_per_host: int = 5
request_timeout: int = 30
keepalive_timeout: int = 60
@classmethod
def from_env(cls) -> "GiteaConfig":
"""Create configuration from environment variables."""
return cls(
base_url=os.getenv("MARKITECT_GITEA_URL", cls.base_url),
token=os.getenv("MARKITECT_GITEA_TOKEN", cls.token),
repo_owner=os.getenv("MARKITECT_REPO_OWNER", cls.repo_owner),
repo_name=os.getenv("MARKITECT_REPO_NAME", cls.repo_name),
connection_pool_size=int(os.getenv("MARKITECT_HTTP_POOL_SIZE", str(cls.connection_pool_size))),
connection_per_host=int(os.getenv("MARKITECT_HTTP_PER_HOST", str(cls.connection_per_host))),
request_timeout=int(os.getenv("MARKITECT_HTTP_TIMEOUT", str(cls.request_timeout))),
keepalive_timeout=int(os.getenv("MARKITECT_HTTP_KEEPALIVE", str(cls.keepalive_timeout)))
)
@property
def api_base_url(self) -> str:
"""Get the base URL for API calls."""
return f"{self.base_url}/api/v1/repos/{self.repo_owner}/{self.repo_name}"
@dataclass
class CacheConfig:
"""Configuration for caching systems."""
backend: str = "memory" # memory, redis, file
redis_host: str = "localhost"
redis_port: int = 6379
redis_db: int = 0
redis_password: Optional[str] = None
file_cache_dir: str = ".cache"
default_ttl: int = 3600 # 1 hour
max_size: int = 1000
@classmethod
def from_env(cls) -> "CacheConfig":
"""Create configuration from environment variables."""
return cls(
backend=os.getenv("MARKITECT_CACHE_BACKEND", cls.backend),
redis_host=os.getenv("MARKITECT_REDIS_HOST", cls.redis_host),
redis_port=int(os.getenv("MARKITECT_REDIS_PORT", str(cls.redis_port))),
redis_db=int(os.getenv("MARKITECT_REDIS_DB", str(cls.redis_db))),
redis_password=os.getenv("MARKITECT_REDIS_PASSWORD"),
file_cache_dir=os.getenv("MARKITECT_CACHE_DIR", cls.file_cache_dir),
default_ttl=int(os.getenv("MARKITECT_CACHE_TTL", str(cls.default_ttl))),
max_size=int(os.getenv("MARKITECT_CACHE_MAX_SIZE", str(cls.max_size)))
)
@dataclass
class WorkspaceConfig:
"""Configuration for workspace management."""
base_dir: str = ".markitect_workspace"
max_workspaces: int = 100
cleanup_after_days: int = 30
max_file_size_mb: int = 100
allowed_extensions: tuple = (".md", ".txt", ".py", ".js", ".json", ".yaml", ".yml")
@classmethod
def from_env(cls) -> "WorkspaceConfig":
"""Create configuration from environment variables."""
return cls(
base_dir=os.getenv("MARKITECT_WORKSPACE_DIR", cls.base_dir),
max_workspaces=int(os.getenv("MARKITECT_MAX_WORKSPACES", str(cls.max_workspaces))),
cleanup_after_days=int(os.getenv("MARKITECT_WORKSPACE_CLEANUP_DAYS", str(cls.cleanup_after_days))),
max_file_size_mb=int(os.getenv("MARKITECT_MAX_FILE_SIZE_MB", str(cls.max_file_size_mb))),
allowed_extensions=tuple(
os.getenv("MARKITECT_ALLOWED_EXTENSIONS", ",".join(cls.allowed_extensions)).split(",")
)
)
@property
def base_path(self) -> Path:
"""Get the base workspace directory as a Path object."""
return Path(self.base_dir)
@dataclass
class RetryConfig:
"""Configuration for retry mechanisms."""
max_attempts: int = 3
base_delay: float = 1.0
backoff_factor: float = 2.0
max_delay: float = 60.0
jitter: bool = True
@classmethod
def from_env(cls) -> "RetryConfig":
"""Create configuration from environment variables."""
return cls(
max_attempts=int(os.getenv("MARKITECT_RETRY_MAX_ATTEMPTS", str(cls.max_attempts))),
base_delay=float(os.getenv("MARKITECT_RETRY_BASE_DELAY", str(cls.base_delay))),
backoff_factor=float(os.getenv("MARKITECT_RETRY_BACKOFF_FACTOR", str(cls.backoff_factor))),
max_delay=float(os.getenv("MARKITECT_RETRY_MAX_DELAY", str(cls.max_delay))),
jitter=os.getenv("MARKITECT_RETRY_JITTER", "true").lower() == "true"
)
@dataclass
class MonitoringConfig:
"""Configuration for monitoring and observability."""
enabled: bool = True
log_level: str = "INFO"
log_format: str = "%(asctime)s [%(levelname)8s] %(name)s: %(message)s"
metrics_enabled: bool = True
performance_tracking: bool = True
error_tracking: bool = True
@classmethod
def from_env(cls) -> "MonitoringConfig":
"""Create configuration from environment variables."""
return cls(
enabled=os.getenv("MARKITECT_MONITORING_ENABLED", "true").lower() == "true",
log_level=os.getenv("MARKITECT_LOG_LEVEL", cls.log_level),
log_format=os.getenv("MARKITECT_LOG_FORMAT", cls.log_format),
metrics_enabled=os.getenv("MARKITECT_METRICS_ENABLED", "true").lower() == "true",
performance_tracking=os.getenv("MARKITECT_PERFORMANCE_TRACKING", "true").lower() == "true",
error_tracking=os.getenv("MARKITECT_ERROR_TRACKING", "true").lower() == "true"
)
@dataclass
class InfrastructureConfig:
"""Complete infrastructure configuration."""
database: DatabaseConfig = field(default_factory=DatabaseConfig)
gitea: GiteaConfig = field(default_factory=GiteaConfig)
cache: CacheConfig = field(default_factory=CacheConfig)
workspace: WorkspaceConfig = field(default_factory=WorkspaceConfig)
retry: RetryConfig = field(default_factory=RetryConfig)
monitoring: MonitoringConfig = field(default_factory=MonitoringConfig)
@classmethod
def from_env(cls) -> "InfrastructureConfig":
"""Create complete configuration from environment variables."""
return cls(
database=DatabaseConfig.from_env(),
gitea=GiteaConfig.from_env(),
cache=CacheConfig.from_env(),
workspace=WorkspaceConfig.from_env(),
retry=RetryConfig.from_env(),
monitoring=MonitoringConfig.from_env()
)
def validate(self) -> Dict[str, Any]:
"""
Validate configuration and return status.
Returns:
Dictionary with validation results and any errors.
"""
errors = []
warnings = []
# Validate Gitea configuration
if not self.gitea.token:
errors.append("MARKITECT_GITEA_TOKEN is required")
if not self.gitea.base_url.startswith(("http://", "https://")):
errors.append("MARKITECT_GITEA_URL must be a valid HTTP(S) URL")
# Validate database path
db_path = Path(self.database.path)
if not db_path.parent.exists():
try:
db_path.parent.mkdir(parents=True, exist_ok=True)
except Exception as e:
errors.append(f"Cannot create database directory: {e}")
# Validate workspace directory
workspace_path = self.workspace.base_path
if not workspace_path.exists():
try:
workspace_path.mkdir(parents=True, exist_ok=True)
except Exception as e:
errors.append(f"Cannot create workspace directory: {e}")
# Validate cache configuration
if self.cache.backend == "redis":
if not self.cache.redis_host:
errors.append("Redis host is required when using redis cache backend")
elif self.cache.backend == "file":
cache_dir = Path(self.cache.file_cache_dir)
if not cache_dir.exists():
try:
cache_dir.mkdir(parents=True, exist_ok=True)
except Exception as e:
errors.append(f"Cannot create cache directory: {e}")
# Performance warnings
if self.gitea.connection_pool_size > 50:
warnings.append("Large HTTP connection pool size may consume excessive resources")
if self.database.cache_size > 50000:
warnings.append("Large database cache size may consume excessive memory")
return {
"valid": len(errors) == 0,
"errors": errors,
"warnings": warnings,
"config_sources": self._get_config_sources()
}
def _get_config_sources(self) -> Dict[str, str]:
"""Get information about where configuration values came from."""
env_vars = {
"MARKITECT_GITEA_URL": self.gitea.base_url,
"MARKITECT_GITEA_TOKEN": "***" if self.gitea.token else "(not set)",
"MARKITECT_REPO_OWNER": self.gitea.repo_owner,
"MARKITECT_REPO_NAME": self.gitea.repo_name,
"MARKITECT_DB_PATH": self.database.path,
"MARKITECT_WORKSPACE_DIR": self.workspace.base_dir,
"MARKITECT_CACHE_BACKEND": self.cache.backend,
"MARKITECT_LOG_LEVEL": self.monitoring.log_level
}
return {
key: f"{value} ({'from env' if key in os.environ else 'default'})"
for key, value in env_vars.items()
}
def to_connection_manager_config(self):
"""Convert to ConnectionManager configuration format."""
from infrastructure.connection_manager import DataSourceConfig
return DataSourceConfig(
gitea_base_url=self.gitea.base_url,
gitea_token=self.gitea.token,
connection_pool_size=self.gitea.connection_pool_size,
connection_per_host=self.gitea.connection_per_host,
request_timeout=self.gitea.request_timeout,
keepalive_timeout=self.gitea.keepalive_timeout,
database_path=self.database.path,
database_pool_size=self.database.pool_size,
database_timeout=self.database.timeout,
max_retries=self.retry.max_attempts,
retry_backoff_factor=self.retry.backoff_factor,
retry_base_delay=self.retry.base_delay
)
# Global configuration instance
_config_instance: Optional[InfrastructureConfig] = None
def get_infrastructure_config() -> InfrastructureConfig:
"""
Get the global infrastructure configuration instance.
This function implements a singleton pattern to ensure
configuration is loaded once and reused throughout the application.
Returns:
InfrastructureConfig instance
"""
global _config_instance
if _config_instance is None:
_config_instance = InfrastructureConfig.from_env()
return _config_instance
def reload_config() -> InfrastructureConfig:
"""
Force reload of configuration from environment.
Useful for testing or when environment variables change.
Returns:
New InfrastructureConfig instance
"""
global _config_instance
_config_instance = InfrastructureConfig.from_env()
return _config_instance
def configure_logging(config: Optional[MonitoringConfig] = None) -> None:
"""
Configure logging based on monitoring configuration.
DEPRECATED: Use infrastructure.logging.setup_logging() instead.
This function is maintained for backward compatibility.
Args:
config: Optional monitoring configuration. If None, uses global config.
"""
# Import the new logging system
try:
from infrastructure.logging import setup_logging, get_logging_config, LoggingConfig, LogLevel, LogFormat
if config is None:
config = get_infrastructure_config().monitoring
if not config.enabled:
import logging
logging.disable(logging.CRITICAL)
return
# Convert old config to new logging config
new_config = LoggingConfig(
level=LogLevel(config.log_level.upper()),
format_type=LogFormat.DEVELOPMENT, # Default to development format
enable_console=True,
enable_file=False,
enable_context=True,
enable_performance=False
)
# Set up using new system
setup_logging(new_config)
except ImportError:
# Fallback to old system if new logging not available
import logging
if config is None:
config = get_infrastructure_config().monitoring
if not config.enabled:
logging.disable(logging.CRITICAL)
return
# Set up basic logging configuration
logging.basicConfig(
level=getattr(logging, config.log_level.upper()),
format=config.log_format,
force=True
)
# Configure specific loggers for infrastructure components
loggers = [
"infrastructure.connection_manager",
"infrastructure.repositories",
"infrastructure.caching",
"infrastructure.monitoring"
]
for logger_name in loggers:
logger = logging.getLogger(logger_name)
logger.setLevel(getattr(logging, config.log_level.upper()))
# Configuration validation utilities
def validate_environment() -> Dict[str, Any]:
"""
Validate the current environment configuration.
Returns:
Validation results with status and any issues found.
"""
config = get_infrastructure_config()
return config.validate()
def print_config_status() -> None:
"""Print current configuration status for debugging."""
config = get_infrastructure_config()
validation = config.validate()
print("MarkiTect Infrastructure Configuration")
print("=" * 40)
print(f"Status: {'✅ Valid' if validation['valid'] else '❌ Invalid'}")
if validation['errors']:
print("\nErrors:")
for error in validation['errors']:
print(f"{error}")
if validation['warnings']:
print("\nWarnings:")
for warning in validation['warnings']:
print(f" ⚠️ {warning}")
print("\nConfiguration Sources:")
for key, value in validation['config_sources'].items():
print(f" {key}: {value}")
print()
if __name__ == "__main__":
# Allow running this module directly to check configuration
print_config_status()