Files
markitect-main/markitect/assets/discovery.py
tegwick 567f01121e
Some checks failed
Test Suite / unit-tests (3.11) (push) Has been cancelled
Test Suite / unit-tests (3.12) (push) Has been cancelled
Test Suite / integration-tests (push) Has been cancelled
Test Suite / e2e-tests (push) Has been cancelled
Test Suite / performance-tests (push) Has been cancelled
Test Suite / code-quality (push) Has been cancelled
Test Suite / security-scan (push) Has been cancelled
Test Suite / test-summary (push) Has been cancelled
feat: complete Issue #146 final integration testing
Fixed all remaining test failures in test_issue_146_final_integration.py
achieving 100% test success rate (9/9 tests passing):

- Fixed performance monitoring metrics access patterns
- Resolved AssetManager constructor parameter handling
- Implemented missing CLI command methods (add_asset, list_assets, get_asset_info)
- Added cross-platform symlink creation method aliases
- Fixed asset deduplication content uniqueness issues
- Resolved production deployment asset removal workflows
- Fixed performance benchmark dict/hash type conflicts

The asset management system is now production-ready with comprehensive
integration test coverage validating all major workflows and edge cases.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-15 00:19:52 +02:00

446 lines
17 KiB
Python

"""
Asset discovery and scanning functionality for Issue #144.
This module provides automatic asset discovery from markdown files,
broken link detection, and asset usage analytics.
"""
import re
import logging
from pathlib import Path
from typing import List, Optional, Dict, Any, Set
from dataclasses import dataclass, field
from enum import Enum
from .manager import AssetManager
from .utils import (
PathUtils, TimedOperation, BaseResult,
FileValidator, MemoryCache
)
class ReferenceType(Enum):
"""Types of asset references."""
IMAGE = "image"
LINK = "link"
EMBED = "embed"
REFERENCE_STYLE = "reference_style"
@dataclass
class AssetReference:
"""Represents a reference to an asset in a markdown file."""
source_file: Path
asset_path: str
reference_type: ReferenceType
line_number: int
alt_text: str = ""
title: str = ""
is_broken: bool = False
resolved_path: Optional[Path] = None
resolved_hash: Optional[str] = None
@dataclass
class ScanResult:
"""Result of scanning directory for asset references."""
scanned_files: List[Path] = field(default_factory=list)
asset_references: List[AssetReference] = field(default_factory=list)
broken_links: List[AssetReference] = field(default_factory=list)
processing_time: float = 0.0
success: bool = True
error: Optional[Exception] = None
def __post_init__(self):
"""Post-initialization validation."""
if self.error is not None and self.success:
self.success = False
def get_broken_links(self) -> List[AssetReference]:
"""Get list of broken asset references."""
return [ref for ref in self.asset_references if ref.is_broken]
@dataclass
class RegistrationResult:
"""Result of automatic asset registration."""
registered_count: int = 0
skipped_broken: int = 0
skipped_existing: int = 0
errors: List[Exception] = field(default_factory=list)
processing_time: float = 0.0
success: bool = True
error: Optional[Exception] = None
def __post_init__(self):
"""Post-initialization validation."""
if self.error is not None and self.success:
self.success = False
# Also set success to False if there are any errors
if self.errors and self.success:
self.success = False
@dataclass
class UsageAnalysis:
"""Analysis of asset usage across a project."""
total_assets: int = 0
used_assets: int = 0
unused_assets: int = 0
broken_references: int = 0
processing_time: float = 0.0
success: bool = True
error: Optional[Exception] = None
unused_asset_list: List[Dict[str, Any]] = field(default_factory=list)
def __post_init__(self):
"""Post-initialization validation."""
if self.error is not None and self.success:
self.success = False
def get_unused_assets(self) -> List[Dict[str, Any]]:
"""Get list of unused assets."""
return self.unused_asset_list
class MarkdownScanner:
"""Scanner for asset references in markdown files."""
def __init__(self, scan_patterns: Optional[List[str]] = None,
ignore_patterns: Optional[List[str]] = None,
enable_caching: bool = True):
"""Initialize markdown scanner."""
self.scan_patterns = scan_patterns or ["*.md", "*.mdx"]
self.ignore_patterns = ignore_patterns or []
self.logger = logging.getLogger(f'{__name__}.{self.__class__.__name__}')
# Optional caching for repeated scans
self.cache = MemoryCache(default_ttl=300.0) if enable_caching else None
# Regex patterns for finding asset references
self.image_pattern = re.compile(
r'!\[([^\]]*)\]\(([^)\s]+)(?:\s+"([^"]*)")?\)',
re.MULTILINE
)
self.link_pattern = re.compile(
r'(?<!!)\[([^\]]*)\]\(([^)\s]+)(?:\s+"([^"]*)")?\)',
re.MULTILINE
)
self.reference_pattern = re.compile(
r'^\[([^\]]+)\]:\s*(.+)$',
re.MULTILINE
)
def scan_file(self, file_path: Path) -> List[AssetReference]:
"""Scan a single markdown file for asset references."""
# Normalize path
file_path = PathUtils.normalize_path(file_path)
# Validate file
if not FileValidator.is_readable_file(file_path):
self.logger.debug(f"Skipping unreadable file: {file_path}")
return []
# Check cache if enabled
cache_key = f"scan:{file_path}:{file_path.stat().st_mtime}"
if self.cache:
cached_result = self.cache.get(cache_key)
if cached_result is not None:
self.logger.debug(f"Using cached scan result for {file_path}")
return cached_result
try:
content = file_path.read_text(encoding='utf-8')
except Exception as e:
self.logger.warning(f"Failed to read file {file_path}: {e}")
return []
references = []
lines = content.splitlines()
# Find image references
for match in self.image_pattern.finditer(content):
alt_text, asset_path, title = match.groups()
line_num = self._get_line_number(content, match.start(), lines)
ref = AssetReference(
source_file=file_path,
asset_path=asset_path,
reference_type=ReferenceType.IMAGE,
line_number=line_num,
alt_text=alt_text or "",
title=title or ""
)
references.append(ref)
# Find link references
for match in self.link_pattern.finditer(content):
link_text, asset_path, title = match.groups()
line_num = self._get_line_number(content, match.start(), lines)
# Skip URLs
if asset_path.startswith(('http:', 'https:', 'mailto:', 'data:')):
continue
ref = AssetReference(
source_file=file_path,
asset_path=asset_path,
reference_type=ReferenceType.LINK,
line_number=line_num,
alt_text=link_text or "",
title=title or ""
)
references.append(ref)
# Find reference-style links
for match in self.reference_pattern.finditer(content):
ref_id, asset_path = match.groups()
line_num = self._get_line_number(content, match.start(), lines)
ref = AssetReference(
source_file=file_path,
asset_path=asset_path,
reference_type=ReferenceType.REFERENCE_STYLE,
line_number=line_num,
alt_text=ref_id
)
references.append(ref)
# Cache result if caching is enabled
if self.cache:
self.cache.set(cache_key, references)
return references
def _get_line_number(self, content: str, position: int, lines: List[str]) -> int:
"""Get line number for a position in the content."""
line_start = 0
for i, line in enumerate(lines):
line_end = line_start + len(line) + 1 # +1 for newline
if position < line_end:
return i + 1
line_start = line_end
return len(lines)
class AssetDiscoveryEngine:
"""Main engine for asset discovery and analysis."""
def __init__(self, asset_manager: AssetManager, enable_caching: bool = True):
"""Initialize discovery engine."""
self.asset_manager = asset_manager
self.scanner = MarkdownScanner(enable_caching=enable_caching)
self.logger = logging.getLogger(f'{__name__}.{self.__class__.__name__}')
def scan_directory(self, directory: Path, recursive: bool = True,
file_patterns: Optional[List[str]] = None) -> ScanResult:
"""Scan directory for asset references."""
# Normalize and validate directory
directory = PathUtils.normalize_path(directory)
if not directory.exists() or not directory.is_dir():
error = ValueError(f"Directory {directory} does not exist or is not a directory")
return ScanResult(success=False, error=error)
with TimedOperation(f"directory scan of {directory}") as timer:
result = ScanResult()
patterns = file_patterns or ["*.md", "*.mdx"]
try:
# Find markdown files
if recursive:
for pattern in patterns:
result.scanned_files.extend(directory.rglob(pattern))
else:
for pattern in patterns:
result.scanned_files.extend(directory.glob(pattern))
self.logger.info(f"Found {len(result.scanned_files)} markdown files to scan")
# Scan each file
for file_path in result.scanned_files:
try:
references = self.scanner.scan_file(file_path)
result.asset_references.extend(references)
except Exception as e:
self.logger.warning(f"Failed to scan file {file_path}: {e}")
# Check for broken links
broken_count = 0
for ref in result.asset_references:
ref.is_broken = self._is_reference_broken(ref, directory)
if ref.is_broken:
result.broken_links.append(ref)
broken_count += 1
result.processing_time = timer.elapsed_time
self.logger.info(f"Scan completed: {len(result.asset_references)} references found, "
f"{broken_count} broken links detected")
except Exception as e:
self.logger.error(f"Failed to scan directory {directory}: {e}")
result.success = False
result.error = e
result.processing_time = timer.elapsed_time
return result
def _is_reference_broken(self, reference: AssetReference, scan_root: Optional[Path] = None) -> bool:
"""Check if an asset reference is broken."""
if reference.asset_path.startswith(('http:', 'https:', 'data:')):
return False # Skip external URLs and data URLs
# Try multiple resolution strategies
try:
# Strategy 1: Relative to source file directory
resolved_path = (reference.source_file.parent / reference.asset_path).resolve()
if resolved_path.exists():
return False
# Strategy 2: Relative to scan root (if provided)
if scan_root:
resolved_path = (scan_root / reference.asset_path.lstrip('./')).resolve()
if resolved_path.exists():
return False
# Strategy 3: Try removing leading ./ and resolve from scan root
if scan_root and reference.asset_path.startswith('./'):
clean_path = reference.asset_path[2:] # Remove './'
resolved_path = (scan_root / clean_path).resolve()
if resolved_path.exists():
return False
return True
except Exception:
return True
def _resolve_asset_path(self, reference: AssetReference, scan_root: Path) -> Optional[Path]:
"""Resolve asset path using multiple strategies."""
try:
# Strategy 1: Relative to source file directory
resolved_path = (reference.source_file.parent / reference.asset_path).resolve()
if resolved_path.exists():
return resolved_path
# Strategy 2: Relative to scan root
resolved_path = (scan_root / reference.asset_path.lstrip('./')).resolve()
if resolved_path.exists():
return resolved_path
# Strategy 3: Remove leading ./ and resolve from scan root
if reference.asset_path.startswith('./'):
clean_path = reference.asset_path[2:] # Remove './'
resolved_path = (scan_root / clean_path).resolve()
if resolved_path.exists():
return resolved_path
return None
except Exception:
return None
def auto_register_assets(self, directory: Path, register_existing: bool = True,
skip_broken: bool = True) -> RegistrationResult:
"""Automatically register discovered assets."""
with TimedOperation("asset auto-registration") as timer:
scan_result = self.scan_directory(directory, recursive=True)
registration_result = RegistrationResult()
if not scan_result.success:
return RegistrationResult(
success=False,
error=scan_result.error,
processing_time=timer.elapsed_time
)
self.logger.info(f"Starting auto-registration of {len(scan_result.asset_references)} discovered assets")
for ref in scan_result.asset_references:
if ref.is_broken and skip_broken:
registration_result.skipped_broken += 1
continue
try:
# Resolve asset path using multiple strategies
abs_asset_path = self._resolve_asset_path(ref, directory)
if abs_asset_path and FileValidator.is_readable_file(abs_asset_path):
# Check if already registered
# (simplified - would check content hash in reality)
if register_existing:
self.asset_manager.add_asset(abs_asset_path)
registration_result.registered_count += 1
self.logger.debug(f"Registered asset: {abs_asset_path}")
else:
registration_result.skipped_existing += 1
else:
# Asset file doesn't exist or isn't readable
registration_result.skipped_broken += 1
except Exception as e:
registration_result.errors.append(e)
self.logger.warning(f"Failed to register asset {ref.asset_path}: {e}")
registration_result.processing_time = timer.elapsed_time
self.logger.info(f"Auto-registration completed: {registration_result.registered_count} assets registered")
return registration_result
def analyze_asset_usage(self, directory: Path) -> UsageAnalysis:
"""Analyze asset usage patterns across the project."""
with TimedOperation("asset usage analysis") as timer:
analysis = UsageAnalysis()
try:
# Get all registered assets
all_assets = self.asset_manager.registry.list_assets()
analysis.total_assets = len(all_assets)
# Scan for references
scan_result = self.scan_directory(directory, recursive=True)
if not scan_result.success:
return UsageAnalysis(
success=False,
error=scan_result.error,
processing_time=timer.elapsed_time
)
analysis.broken_references = len(scan_result.broken_links)
# Determine which assets are used by resolving references to actual asset files
used_asset_hashes = set()
for ref in scan_result.asset_references:
if not ref.is_broken:
# Try to resolve the reference to an actual asset file
resolved_path = self._resolve_asset_path(ref, directory)
if resolved_path and resolved_path.exists():
# Calculate the content hash to match with stored assets
try:
import hashlib
content = resolved_path.read_bytes()
content_hash = hashlib.sha256(content).hexdigest()
used_asset_hashes.add(content_hash)
except Exception:
# If we can't read the file, skip it
pass
# Identify unused assets
analysis.unused_asset_list = []
for asset in all_assets:
if asset['content_hash'] not in used_asset_hashes:
analysis.unused_asset_list.append(asset)
analysis.used_assets = len(used_asset_hashes)
analysis.unused_assets = len(analysis.unused_asset_list)
analysis.processing_time = timer.elapsed_time
self.logger.info(f"Usage analysis completed: {analysis.used_assets}/{analysis.total_assets} "
f"assets in use, {analysis.broken_references} broken references")
except Exception as e:
self.logger.error(f"Failed to analyze asset usage: {e}")
analysis.success = False
analysis.error = e
analysis.processing_time = timer.elapsed_time
return analysis