""" Asset discovery and scanning functionality for Issue #144. This module provides automatic asset discovery from markdown files, broken link detection, and asset usage analytics. """ import re import logging from pathlib import Path from typing import List, Optional, Dict, Any, Set from dataclasses import dataclass, field from enum import Enum from .manager import AssetManager from .utils import ( PathUtils, TimedOperation, BaseResult, FileValidator, MemoryCache ) class ReferenceType(Enum): """Types of asset references.""" IMAGE = "image" LINK = "link" EMBED = "embed" REFERENCE_STYLE = "reference_style" @dataclass class AssetReference: """Represents a reference to an asset in a markdown file.""" source_file: Path asset_path: str reference_type: ReferenceType line_number: int alt_text: str = "" title: str = "" is_broken: bool = False resolved_path: Optional[Path] = None resolved_hash: Optional[str] = None @dataclass class ScanResult: """Result of scanning directory for asset references.""" scanned_files: List[Path] = field(default_factory=list) asset_references: List[AssetReference] = field(default_factory=list) broken_links: List[AssetReference] = field(default_factory=list) processing_time: float = 0.0 success: bool = True error: Optional[Exception] = None def __post_init__(self): """Post-initialization validation.""" if self.error is not None and self.success: self.success = False def get_broken_links(self) -> List[AssetReference]: """Get list of broken asset references.""" return [ref for ref in self.asset_references if ref.is_broken] @dataclass class RegistrationResult: """Result of automatic asset registration.""" registered_count: int = 0 skipped_broken: int = 0 skipped_existing: int = 0 errors: List[Exception] = field(default_factory=list) processing_time: float = 0.0 success: bool = True error: Optional[Exception] = None def __post_init__(self): """Post-initialization validation.""" if self.error is not None and self.success: self.success = False # Also set success to False if there are any errors if self.errors and self.success: self.success = False @dataclass class UsageAnalysis: """Analysis of asset usage across a project.""" total_assets: int = 0 used_assets: int = 0 unused_assets: int = 0 broken_references: int = 0 processing_time: float = 0.0 success: bool = True error: Optional[Exception] = None def __post_init__(self): """Post-initialization validation.""" if self.error is not None and self.success: self.success = False def get_unused_assets(self) -> List[Any]: """Get list of unused assets.""" # Placeholder implementation return [] class MarkdownScanner: """Scanner for asset references in markdown files.""" def __init__(self, scan_patterns: Optional[List[str]] = None, ignore_patterns: Optional[List[str]] = None, enable_caching: bool = True): """Initialize markdown scanner.""" self.scan_patterns = scan_patterns or ["*.md", "*.mdx"] self.ignore_patterns = ignore_patterns or [] self.logger = logging.getLogger(f'{__name__}.{self.__class__.__name__}') # Optional caching for repeated scans self.cache = MemoryCache(default_ttl=300.0) if enable_caching else None # Regex patterns for finding asset references self.image_pattern = re.compile( r'!\[([^\]]*)\]\(([^)]+)(?:\s+"([^"]*)")?\)', re.MULTILINE ) self.link_pattern = re.compile( r'(? List[AssetReference]: """Scan a single markdown file for asset references.""" # Normalize path file_path = PathUtils.normalize_path(file_path) # Validate file if not FileValidator.is_readable_file(file_path): self.logger.debug(f"Skipping unreadable file: {file_path}") return [] # Check cache if enabled cache_key = f"scan:{file_path}:{file_path.stat().st_mtime}" if self.cache: cached_result = self.cache.get(cache_key) if cached_result is not None: self.logger.debug(f"Using cached scan result for {file_path}") return cached_result try: content = file_path.read_text(encoding='utf-8') except Exception as e: self.logger.warning(f"Failed to read file {file_path}: {e}") return [] references = [] lines = content.splitlines() # Find image references for match in self.image_pattern.finditer(content): alt_text, asset_path, title = match.groups() line_num = self._get_line_number(content, match.start(), lines) ref = AssetReference( source_file=file_path, asset_path=asset_path, reference_type=ReferenceType.IMAGE, line_number=line_num, alt_text=alt_text or "", title=title or "" ) references.append(ref) # Find link references for match in self.link_pattern.finditer(content): link_text, asset_path, title = match.groups() line_num = self._get_line_number(content, match.start(), lines) # Skip URLs if asset_path.startswith(('http:', 'https:', 'mailto:', 'data:')): continue ref = AssetReference( source_file=file_path, asset_path=asset_path, reference_type=ReferenceType.LINK, line_number=line_num, alt_text=link_text or "", title=title or "" ) references.append(ref) # Find reference-style links for match in self.reference_pattern.finditer(content): ref_id, asset_path = match.groups() line_num = self._get_line_number(content, match.start(), lines) ref = AssetReference( source_file=file_path, asset_path=asset_path, reference_type=ReferenceType.REFERENCE_STYLE, line_number=line_num, alt_text=ref_id ) references.append(ref) # Cache result if caching is enabled if self.cache: self.cache.set(cache_key, references) return references def _get_line_number(self, content: str, position: int, lines: List[str]) -> int: """Get line number for a position in the content.""" line_start = 0 for i, line in enumerate(lines): line_end = line_start + len(line) + 1 # +1 for newline if position < line_end: return i + 1 line_start = line_end return len(lines) class AssetDiscoveryEngine: """Main engine for asset discovery and analysis.""" def __init__(self, asset_manager: AssetManager, enable_caching: bool = True): """Initialize discovery engine.""" self.asset_manager = asset_manager self.scanner = MarkdownScanner(enable_caching=enable_caching) self.logger = logging.getLogger(f'{__name__}.{self.__class__.__name__}') def scan_directory(self, directory: Path, recursive: bool = True, file_patterns: Optional[List[str]] = None) -> ScanResult: """Scan directory for asset references.""" # Normalize and validate directory directory = PathUtils.normalize_path(directory) if not directory.exists() or not directory.is_dir(): error = ValueError(f"Directory {directory} does not exist or is not a directory") return ScanResult(success=False, error=error) with TimedOperation(f"directory scan of {directory}") as timer: result = ScanResult() patterns = file_patterns or ["*.md", "*.mdx"] try: # Find markdown files if recursive: for pattern in patterns: result.scanned_files.extend(directory.rglob(pattern)) else: for pattern in patterns: result.scanned_files.extend(directory.glob(pattern)) self.logger.info(f"Found {len(result.scanned_files)} markdown files to scan") # Scan each file for file_path in result.scanned_files: try: references = self.scanner.scan_file(file_path) result.asset_references.extend(references) except Exception as e: self.logger.warning(f"Failed to scan file {file_path}: {e}") # Check for broken links broken_count = 0 for ref in result.asset_references: ref.is_broken = self._is_reference_broken(ref) if ref.is_broken: result.broken_links.append(ref) broken_count += 1 result.processing_time = timer.elapsed_time self.logger.info(f"Scan completed: {len(result.asset_references)} references found, " f"{broken_count} broken links detected") except Exception as e: self.logger.error(f"Failed to scan directory {directory}: {e}") result.success = False result.error = e result.processing_time = timer.elapsed_time return result def _is_reference_broken(self, reference: AssetReference) -> bool: """Check if an asset reference is broken.""" if reference.asset_path.startswith(('http:', 'https:', 'data:')): return False # Skip external URLs and data URLs # Resolve relative path try: resolved_path = (reference.source_file.parent / reference.asset_path).resolve() return not resolved_path.exists() except Exception: return True def auto_register_assets(self, directory: Path, register_existing: bool = True, skip_broken: bool = True) -> RegistrationResult: """Automatically register discovered assets.""" with TimedOperation("asset auto-registration") as timer: scan_result = self.scan_directory(directory, recursive=True) registration_result = RegistrationResult() if not scan_result.success: return RegistrationResult( success=False, error=scan_result.error, processing_time=timer.elapsed_time ) self.logger.info(f"Starting auto-registration of {len(scan_result.asset_references)} discovered assets") for ref in scan_result.asset_references: if ref.is_broken and skip_broken: registration_result.skipped_broken += 1 continue try: # Resolve asset path using utility asset_path = PathUtils.get_relative_path( (ref.source_file.parent / ref.asset_path).resolve(), ref.source_file.parent ) # Use absolute path for the resolved asset abs_asset_path = (ref.source_file.parent / ref.asset_path).resolve() if abs_asset_path.exists() and FileValidator.is_readable_file(abs_asset_path): # Check if already registered # (simplified - would check content hash in reality) if register_existing: self.asset_manager.add_asset(abs_asset_path) registration_result.registered_count += 1 self.logger.debug(f"Registered asset: {abs_asset_path}") else: registration_result.skipped_existing += 1 else: # Asset file doesn't exist or isn't readable registration_result.skipped_broken += 1 except Exception as e: registration_result.errors.append(e) self.logger.warning(f"Failed to register asset {ref.asset_path}: {e}") registration_result.processing_time = timer.elapsed_time self.logger.info(f"Auto-registration completed: {registration_result.registered_count} assets registered") return registration_result def analyze_asset_usage(self, directory: Path) -> UsageAnalysis: """Analyze asset usage patterns across the project.""" with TimedOperation("asset usage analysis") as timer: analysis = UsageAnalysis() try: # Get all registered assets all_assets = self.asset_manager.registry.list_assets() analysis.total_assets = len(all_assets) # Scan for references scan_result = self.scan_directory(directory, recursive=True) if not scan_result.success: return UsageAnalysis( success=False, error=scan_result.error, processing_time=timer.elapsed_time ) analysis.broken_references = len(scan_result.broken_links) # Determine which assets are used referenced_assets = set() for ref in scan_result.asset_references: if not ref.is_broken: referenced_assets.add(ref.asset_path) analysis.used_assets = len(referenced_assets) analysis.unused_assets = analysis.total_assets - analysis.used_assets analysis.processing_time = timer.elapsed_time self.logger.info(f"Usage analysis completed: {analysis.used_assets}/{analysis.total_assets} " f"assets in use, {analysis.broken_references} broken references") except Exception as e: self.logger.error(f"Failed to analyze asset usage: {e}") analysis.success = False analysis.error = e analysis.processing_time = timer.elapsed_time return analysis