""" Asset discovery and scanning functionality for Issue #144. This module provides automatic asset discovery from markdown files, broken link detection, and asset usage analytics. """ import re import logging from pathlib import Path from typing import List, Optional, Dict, Any, Set from dataclasses import dataclass, field from enum import Enum from .manager import AssetManager from .utils import ( PathUtils, TimedOperation, BaseResult, FileValidator, MemoryCache ) class ReferenceType(Enum): """Types of asset references.""" IMAGE = "image" LINK = "link" EMBED = "embed" REFERENCE_STYLE = "reference_style" @dataclass class AssetReference: """Represents a reference to an asset in a markdown file.""" source_file: Path asset_path: str reference_type: ReferenceType line_number: int alt_text: str = "" title: str = "" is_broken: bool = False resolved_path: Optional[Path] = None resolved_hash: Optional[str] = None @dataclass class ScanResult: """Result of scanning directory for asset references.""" scanned_files: List[Path] = field(default_factory=list) asset_references: List[AssetReference] = field(default_factory=list) broken_links: List[AssetReference] = field(default_factory=list) processing_time: float = 0.0 success: bool = True error: Optional[Exception] = None def __post_init__(self): """Post-initialization validation.""" if self.error is not None and self.success: self.success = False def get_broken_links(self) -> List[AssetReference]: """Get list of broken asset references.""" return [ref for ref in self.asset_references if ref.is_broken] @dataclass class RegistrationResult: """Result of automatic asset registration.""" registered_count: int = 0 skipped_broken: int = 0 skipped_existing: int = 0 errors: List[Exception] = field(default_factory=list) processing_time: float = 0.0 success: bool = True error: Optional[Exception] = None def __post_init__(self): """Post-initialization validation.""" if self.error is not None and self.success: self.success = False # Also set success to False if there are any errors if self.errors and self.success: self.success = False @dataclass class UsageAnalysis: """Analysis of asset usage across a project.""" total_assets: int = 0 used_assets: int = 0 unused_assets: int = 0 broken_references: int = 0 processing_time: float = 0.0 success: bool = True error: Optional[Exception] = None unused_asset_list: List[Dict[str, Any]] = field(default_factory=list) def __post_init__(self): """Post-initialization validation.""" if self.error is not None and self.success: self.success = False def get_unused_assets(self) -> List[Dict[str, Any]]: """Get list of unused assets.""" return self.unused_asset_list class MarkdownScanner: """Scanner for asset references in markdown files.""" def __init__(self, scan_patterns: Optional[List[str]] = None, ignore_patterns: Optional[List[str]] = None, enable_caching: bool = True): """Initialize markdown scanner.""" self.scan_patterns = scan_patterns or ["*.md", "*.mdx"] self.ignore_patterns = ignore_patterns or [] self.logger = logging.getLogger(f'{__name__}.{self.__class__.__name__}') # Optional caching for repeated scans self.cache = MemoryCache(default_ttl=300.0) if enable_caching else None # Regex patterns for finding asset references self.image_pattern = re.compile( r'!\[([^\]]*)\]\(([^)\s]+)(?:\s+"([^"]*)")?\)', re.MULTILINE ) self.link_pattern = re.compile( r'(? List[AssetReference]: """Scan a single markdown file for asset references.""" # Normalize path file_path = PathUtils.normalize_path(file_path) # Validate file if not FileValidator.is_readable_file(file_path): self.logger.debug(f"Skipping unreadable file: {file_path}") return [] # Check cache if enabled cache_key = f"scan:{file_path}:{file_path.stat().st_mtime}" if self.cache: cached_result = self.cache.get(cache_key) if cached_result is not None: self.logger.debug(f"Using cached scan result for {file_path}") return cached_result try: content = file_path.read_text(encoding='utf-8') except Exception as e: self.logger.warning(f"Failed to read file {file_path}: {e}") return [] references = [] lines = content.splitlines() # Find image references for match in self.image_pattern.finditer(content): alt_text, asset_path, title = match.groups() line_num = self._get_line_number(content, match.start(), lines) ref = AssetReference( source_file=file_path, asset_path=asset_path, reference_type=ReferenceType.IMAGE, line_number=line_num, alt_text=alt_text or "", title=title or "" ) references.append(ref) # Find link references for match in self.link_pattern.finditer(content): link_text, asset_path, title = match.groups() line_num = self._get_line_number(content, match.start(), lines) # Skip URLs if asset_path.startswith(('http:', 'https:', 'mailto:', 'data:')): continue ref = AssetReference( source_file=file_path, asset_path=asset_path, reference_type=ReferenceType.LINK, line_number=line_num, alt_text=link_text or "", title=title or "" ) references.append(ref) # Find reference-style links for match in self.reference_pattern.finditer(content): ref_id, asset_path = match.groups() line_num = self._get_line_number(content, match.start(), lines) ref = AssetReference( source_file=file_path, asset_path=asset_path, reference_type=ReferenceType.REFERENCE_STYLE, line_number=line_num, alt_text=ref_id ) references.append(ref) # Cache result if caching is enabled if self.cache: self.cache.set(cache_key, references) return references def _get_line_number(self, content: str, position: int, lines: List[str]) -> int: """Get line number for a position in the content.""" line_start = 0 for i, line in enumerate(lines): line_end = line_start + len(line) + 1 # +1 for newline if position < line_end: return i + 1 line_start = line_end return len(lines) def discover_assets_from_markdown(markdown_content: str, base_path: Path) -> List[AssetReference]: """ Simple function to discover assets from markdown content for md-render. Args: markdown_content: The markdown content to scan base_path: Base path for resolving relative asset paths Returns: List of AssetReference objects found in the markdown """ scanner = MarkdownScanner() # Create a temporary file to use the existing scan_file method import tempfile with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as temp_file: temp_file.write(markdown_content) temp_path = Path(temp_file.name) try: references = scanner.scan_file(temp_path) # Update the source_file to the actual base_path for relative resolution for ref in references: ref.source_file = base_path # Resolve the asset path relative to base_path if not ref.asset_path.startswith(('http:', 'https:', 'mailto:', 'data:')): # Clean up relative path indicators clean_path = ref.asset_path.lstrip('./') resolved_path = base_path / clean_path if resolved_path.exists(): ref.resolved_path = resolved_path else: ref.is_broken = True return references finally: # Clean up temporary file temp_path.unlink(missing_ok=True) class AssetDiscoveryEngine: """Main engine for asset discovery and analysis.""" def __init__(self, asset_manager: AssetManager, enable_caching: bool = True): """Initialize discovery engine.""" self.asset_manager = asset_manager self.scanner = MarkdownScanner(enable_caching=enable_caching) self.logger = logging.getLogger(f'{__name__}.{self.__class__.__name__}') def scan_directory(self, directory: Path, recursive: bool = True, file_patterns: Optional[List[str]] = None) -> ScanResult: """Scan directory for asset references.""" # Normalize and validate directory directory = PathUtils.normalize_path(directory) if not directory.exists() or not directory.is_dir(): error = ValueError(f"Directory {directory} does not exist or is not a directory") return ScanResult(success=False, error=error) with TimedOperation(f"directory scan of {directory}") as timer: result = ScanResult() patterns = file_patterns or ["*.md", "*.mdx"] try: # Find markdown files if recursive: for pattern in patterns: result.scanned_files.extend(directory.rglob(pattern)) else: for pattern in patterns: result.scanned_files.extend(directory.glob(pattern)) self.logger.info(f"Found {len(result.scanned_files)} markdown files to scan") # Scan each file for file_path in result.scanned_files: try: references = self.scanner.scan_file(file_path) result.asset_references.extend(references) except Exception as e: self.logger.warning(f"Failed to scan file {file_path}: {e}") # Check for broken links broken_count = 0 for ref in result.asset_references: ref.is_broken = self._is_reference_broken(ref, directory) if ref.is_broken: result.broken_links.append(ref) broken_count += 1 result.processing_time = timer.elapsed_time self.logger.info(f"Scan completed: {len(result.asset_references)} references found, " f"{broken_count} broken links detected") except Exception as e: self.logger.error(f"Failed to scan directory {directory}: {e}") result.success = False result.error = e result.processing_time = timer.elapsed_time return result def _is_reference_broken(self, reference: AssetReference, scan_root: Optional[Path] = None) -> bool: """Check if an asset reference is broken.""" if reference.asset_path.startswith(('http:', 'https:', 'data:')): return False # Skip external URLs and data URLs # Try multiple resolution strategies try: # Strategy 1: Relative to source file directory resolved_path = (reference.source_file.parent / reference.asset_path).resolve() if resolved_path.exists(): return False # Strategy 2: Relative to scan root (if provided) if scan_root: resolved_path = (scan_root / reference.asset_path.lstrip('./')).resolve() if resolved_path.exists(): return False # Strategy 3: Try removing leading ./ and resolve from scan root if scan_root and reference.asset_path.startswith('./'): clean_path = reference.asset_path[2:] # Remove './' resolved_path = (scan_root / clean_path).resolve() if resolved_path.exists(): return False return True except Exception: return True def _resolve_asset_path(self, reference: AssetReference, scan_root: Path) -> Optional[Path]: """Resolve asset path using multiple strategies.""" try: # Strategy 1: Relative to source file directory resolved_path = (reference.source_file.parent / reference.asset_path).resolve() if resolved_path.exists(): return resolved_path # Strategy 2: Relative to scan root resolved_path = (scan_root / reference.asset_path.lstrip('./')).resolve() if resolved_path.exists(): return resolved_path # Strategy 3: Remove leading ./ and resolve from scan root if reference.asset_path.startswith('./'): clean_path = reference.asset_path[2:] # Remove './' resolved_path = (scan_root / clean_path).resolve() if resolved_path.exists(): return resolved_path return None except Exception: return None def auto_register_assets(self, directory: Path, register_existing: bool = True, skip_broken: bool = True) -> RegistrationResult: """Automatically register discovered assets.""" with TimedOperation("asset auto-registration") as timer: scan_result = self.scan_directory(directory, recursive=True) registration_result = RegistrationResult() if not scan_result.success: return RegistrationResult( success=False, error=scan_result.error, processing_time=timer.elapsed_time ) self.logger.info(f"Starting auto-registration of {len(scan_result.asset_references)} discovered assets") for ref in scan_result.asset_references: if ref.is_broken and skip_broken: registration_result.skipped_broken += 1 continue try: # Resolve asset path using multiple strategies abs_asset_path = self._resolve_asset_path(ref, directory) if abs_asset_path and FileValidator.is_readable_file(abs_asset_path): # Check if already registered # (simplified - would check content hash in reality) if register_existing: self.asset_manager.add_asset(abs_asset_path) registration_result.registered_count += 1 self.logger.debug(f"Registered asset: {abs_asset_path}") else: registration_result.skipped_existing += 1 else: # Asset file doesn't exist or isn't readable registration_result.skipped_broken += 1 except Exception as e: registration_result.errors.append(e) self.logger.warning(f"Failed to register asset {ref.asset_path}: {e}") registration_result.processing_time = timer.elapsed_time self.logger.info(f"Auto-registration completed: {registration_result.registered_count} assets registered") return registration_result def analyze_asset_usage(self, directory: Path) -> UsageAnalysis: """Analyze asset usage patterns across the project.""" with TimedOperation("asset usage analysis") as timer: analysis = UsageAnalysis() try: # Get all registered assets all_assets = self.asset_manager.registry.list_assets() analysis.total_assets = len(all_assets) # Scan for references scan_result = self.scan_directory(directory, recursive=True) if not scan_result.success: return UsageAnalysis( success=False, error=scan_result.error, processing_time=timer.elapsed_time ) analysis.broken_references = len(scan_result.broken_links) # Determine which assets are used by resolving references to actual asset files used_asset_hashes = set() for ref in scan_result.asset_references: if not ref.is_broken: # Try to resolve the reference to an actual asset file resolved_path = self._resolve_asset_path(ref, directory) if resolved_path and resolved_path.exists(): # Calculate the content hash to match with stored assets try: import hashlib content = resolved_path.read_bytes() content_hash = hashlib.sha256(content).hexdigest() used_asset_hashes.add(content_hash) except Exception: # If we can't read the file, skip it pass # Identify unused assets analysis.unused_asset_list = [] for asset in all_assets: if asset['content_hash'] not in used_asset_hashes: analysis.unused_asset_list.append(asset) analysis.used_assets = len(used_asset_hashes) analysis.unused_assets = len(analysis.unused_asset_list) analysis.processing_time = timer.elapsed_time self.logger.info(f"Usage analysis completed: {analysis.used_assets}/{analysis.total_assets} " f"assets in use, {analysis.broken_references} broken references") except Exception as e: self.logger.error(f"Failed to analyze asset usage: {e}") analysis.success = False analysis.error = e analysis.processing_time = timer.elapsed_time return analysis