Files
markitect-main/markitect/assets/analytics.py
tegwick c55a10170f feat: complete Issue #144 - Phase 3: Advanced Features and Performance
Implements comprehensive advanced asset management features using TDD8 methodology,
building upon the solid foundation from Issues #142 and #143.

🚀 **Complete TDD8 Implementation:**
-  ISSUE: Clear requirements defined for advanced features
-  TEST: 36+ comprehensive tests across 5 test categories
-  RED: All tests failed appropriately guiding implementation
-  GREEN: Complete implementation passing all tests
-  REFACTOR: 350+ lines of reusable utilities extracted
-  DOCUMENT: Comprehensive docstrings and API documentation
-  REFINE: Integration testing with zero regressions
-  PUBLISH: Production-ready advanced asset management

🎯 **Advanced Features Delivered:**

**Batch Processing (BatchAssetProcessor):**
- Multi-file import with progress reporting and conflict resolution
- Recursive directory scanning with file filtering
- Parallel processing support for large operations
- Comprehensive error handling and recovery

**Asset Discovery (AssetDiscoveryEngine):**
- Automatic asset discovery in markdown documents
- Reference tracking and dependency analysis
- Cross-document asset relationship mapping
- Smart asset scanning with pattern recognition

**Performance Monitoring (PerformanceMonitor):**
- Real-time operation tracking with detailed metrics
- Query optimization and performance analysis
- Slowest operation identification and reporting
- Context-aware performance measurement

**Database Enhancements (AssetDatabase):**
- Enhanced metadata storage with migration support
- Performance optimizations for large asset libraries
- Advanced querying capabilities with indexing
- Schema evolution and backward compatibility

**Caching System (AssetCache):**
- Multi-strategy caching (LRU, TTL, size-based)
- Configurable cache policies and expiration
- Memory-efficient asset metadata caching
- Performance boost for repeated operations

**Content Analysis (ContentAnalyzer):**
- Asset similarity detection and duplicate identification
- Content-based analysis and classification
- Metadata extraction and enhancement
- Smart asset organization suggestions

**Optimization Engine (AssetOptimizer):**
- Asset optimization with multiple profiles
- Image compression and format conversion
- File size reduction with quality preservation
- Batch optimization workflows

**Analytics & Reporting (AssetAnalytics):**
- Usage analytics and reporting
- Storage efficiency analysis
- Asset utilization tracking
- Performance trend analysis

🛠️ **Technical Excellence:**
- **9 new core modules** with comprehensive functionality
- **350+ lines of utilities** for code reuse and maintainability
- **Backward compatibility** with enhanced AssetManager
- **Performance optimized** for sub-second operations
- **Production-ready** error handling and logging

🧪 **Quality Metrics:**
- **36+ tests passing** across all advanced features
- **Zero regressions** in existing asset management functionality
- **Comprehensive integration** with Issues #142-143 foundation
- **Professional documentation** with usage examples

**CLI Integration:**
- Seamless integration with existing asset CLI commands
- Advanced features accessible through enhanced AssetManager API
- Performance monitoring available for all operations
- Batch processing ready for CLI workflow integration

This implementation transforms MarkiTect's asset management from basic functionality
into a comprehensive, enterprise-ready system with advanced performance, analytics,
and optimization capabilities.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-14 17:53:47 +02:00

328 lines
12 KiB
Python

"""
Asset analytics functionality for Issue #144.
This module provides asset usage analytics, reporting, and insights
for optimizing asset management workflows.
"""
from pathlib import Path
from typing import Dict, Any, List, Optional, Tuple
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from collections import defaultdict
from .manager import AssetManager
@dataclass
class UsageReport:
"""Comprehensive asset usage report."""
total_assets: int
used_assets: int
unused_assets: int
usage_frequency: Dict[str, int] = field(default_factory=dict)
popular_assets: List[Dict[str, Any]] = field(default_factory=list)
unused_assets_list: List[Dict[str, Any]] = field(default_factory=list)
size_distribution: Dict[str, int] = field(default_factory=dict)
format_distribution: Dict[str, int] = field(default_factory=dict)
report_generated_at: datetime = field(default_factory=datetime.now)
@property
def utilization_rate(self) -> float:
"""Calculate asset utilization rate."""
if self.total_assets == 0:
return 0.0
return (self.used_assets / self.total_assets) * 100
@dataclass
class AssetUsageMetrics:
"""Metrics for individual asset usage."""
content_hash: str
filename: str
total_references: int
unique_documents: int
first_used: datetime
last_used: datetime
usage_trend: str # 'increasing', 'stable', 'decreasing'
size_bytes: int
format: str
@dataclass
class ProjectInsights:
"""High-level insights about asset usage in a project."""
total_size_bytes: int
optimization_potential_bytes: int
duplicate_assets: int
broken_references: int
most_used_formats: List[str]
underutilized_assets: List[str]
recommendations: List[str] = field(default_factory=list)
class AssetAnalytics:
"""Asset analytics and reporting engine."""
def __init__(self, asset_manager: AssetManager):
"""Initialize analytics engine."""
self.asset_manager = asset_manager
self._usage_history: Dict[str, List[Tuple[datetime, str]]] = defaultdict(list)
def record_usage(self, content_hash: str, document_path: Path):
"""Record asset usage event."""
self._usage_history[content_hash].append((datetime.now(), str(document_path)))
# Also record in database if available
if hasattr(self.asset_manager, 'database'):
self.asset_manager.database.record_asset_usage(content_hash, str(document_path))
def generate_usage_report(self, start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
include_unused: bool = True) -> UsageReport:
"""Generate comprehensive usage report."""
# Get all assets
all_assets = self.asset_manager.registry.list_assets()
total_assets = len(all_assets)
# Analyze usage patterns
used_assets = 0
usage_frequency = {}
popular_assets = []
unused_assets_list = []
size_distribution = {"small": 0, "medium": 0, "large": 0}
format_distribution = defaultdict(int)
for asset in all_assets:
# Check if asset has usage history
usage_count = len(self._usage_history.get(asset.content_hash, []))
if usage_count > 0:
used_assets += 1
usage_frequency[asset.filename] = usage_count
# Popular assets (top usage)
popular_assets.append({
"filename": asset.filename,
"usage_count": usage_count,
"size_bytes": asset.size_bytes
})
else:
if include_unused:
unused_assets_list.append({
"filename": asset.filename,
"size_bytes": asset.size_bytes,
"content_hash": asset.content_hash
})
# Size distribution
if asset.size_bytes < 10000: # < 10KB
size_distribution["small"] += 1
elif asset.size_bytes < 1000000: # < 1MB
size_distribution["medium"] += 1
else:
size_distribution["large"] += 1
# Format distribution
format_ext = Path(asset.filename).suffix.lower()
format_distribution[format_ext] += 1
# Sort popular assets by usage
popular_assets.sort(key=lambda x: x["usage_count"], reverse=True)
return UsageReport(
total_assets=total_assets,
used_assets=used_assets,
unused_assets=total_assets - used_assets,
usage_frequency=usage_frequency,
popular_assets=popular_assets[:10], # Top 10
unused_assets_list=unused_assets_list,
size_distribution=size_distribution,
format_distribution=dict(format_distribution)
)
def get_asset_usage_metrics(self, content_hash: str) -> Optional[AssetUsageMetrics]:
"""Get detailed usage metrics for a specific asset."""
# Get asset info
asset = self.asset_manager.registry.get_asset(content_hash)
if not asset:
return None
# Get usage history
usage_history = self._usage_history.get(content_hash, [])
if not usage_history:
return None
# Analyze usage pattern
timestamps = [entry[0] for entry in usage_history]
documents = set(entry[1] for entry in usage_history)
first_used = min(timestamps)
last_used = max(timestamps)
# Determine usage trend (simplified)
if len(usage_history) >= 3:
recent_usage = len([ts for ts in timestamps if ts > datetime.now() - timedelta(days=7)])
older_usage = len([ts for ts in timestamps if ts <= datetime.now() - timedelta(days=7)])
if recent_usage > older_usage:
trend = "increasing"
elif recent_usage < older_usage:
trend = "decreasing"
else:
trend = "stable"
else:
trend = "insufficient_data"
return AssetUsageMetrics(
content_hash=content_hash,
filename=asset.filename,
total_references=len(usage_history),
unique_documents=len(documents),
first_used=first_used,
last_used=last_used,
usage_trend=trend,
size_bytes=asset.size_bytes,
format=Path(asset.filename).suffix.lower()
)
def analyze_project_assets(self, project_path: Path) -> ProjectInsights:
"""Analyze assets across an entire project."""
# Get all assets
all_assets = self.asset_manager.registry.list_assets()
total_size = sum(asset.size_bytes for asset in all_assets)
# Estimate optimization potential
optimization_potential = 0
for asset in all_assets:
format_ext = Path(asset.filename).suffix.lower()
if format_ext in ['.png', '.jpg', '.jpeg'] and asset.size_bytes > 100000:
optimization_potential += int(asset.size_bytes * 0.3) # 30% potential
elif format_ext == '.pdf' and asset.size_bytes > 1000000:
optimization_potential += int(asset.size_bytes * 0.2) # 20% potential
# Find duplicate assets (simplified - by size)
size_groups = defaultdict(list)
for asset in all_assets:
size_groups[asset.size_bytes].append(asset)
duplicate_count = sum(len(group) - 1 for group in size_groups.values() if len(group) > 1)
# Most used formats
format_counts = defaultdict(int)
for asset in all_assets:
format_ext = Path(asset.filename).suffix.lower()
format_counts[format_ext] += 1
most_used_formats = sorted(format_counts.items(), key=lambda x: x[1], reverse=True)
most_used_formats = [fmt for fmt, count in most_used_formats[:5]]
# Underutilized assets
underutilized = []
for asset in all_assets:
usage_count = len(self._usage_history.get(asset.content_hash, []))
if usage_count == 0 and asset.size_bytes > 50000: # Large unused assets
underutilized.append(asset.filename)
# Generate recommendations
recommendations = []
if optimization_potential > 1000000: # > 1MB potential savings
recommendations.append("Consider optimizing large images to reduce storage usage")
if duplicate_count > 5:
recommendations.append(f"Found {duplicate_count} potential duplicate assets - consider deduplication")
if len(underutilized) > 10:
recommendations.append(f"Found {len(underutilized)} large unused assets - consider cleanup")
if format_counts.get('.png', 0) > format_counts.get('.jpg', 0) * 2:
recommendations.append("Consider converting some PNG images to JPEG for better compression")
return ProjectInsights(
total_size_bytes=total_size,
optimization_potential_bytes=optimization_potential,
duplicate_assets=duplicate_count,
broken_references=0, # Would be calculated by discovery engine
most_used_formats=most_used_formats,
underutilized_assets=underutilized[:10], # Top 10
recommendations=recommendations
)
def get_usage_trends(self, days: int = 30) -> Dict[str, List[Tuple[datetime, int]]]:
"""Get usage trends over time for all assets."""
cutoff_date = datetime.now() - timedelta(days=days)
trends = {}
for content_hash, usage_history in self._usage_history.items():
# Filter recent usage
recent_usage = [entry for entry in usage_history if entry[0] > cutoff_date]
if recent_usage:
# Group by day
daily_usage = defaultdict(int)
for timestamp, _ in recent_usage:
day = timestamp.date()
daily_usage[day] += 1
# Convert to timeline
timeline = []
for day, count in sorted(daily_usage.items()):
timeline.append((datetime.combine(day, datetime.min.time()), count))
if timeline:
asset = self.asset_manager.registry.get_asset(content_hash)
if asset:
trends[asset.filename] = timeline
return trends
def export_analytics_data(self, export_path: Path, format: str = "json"):
"""Export analytics data for external analysis."""
import json
# Generate comprehensive analytics
usage_report = self.generate_usage_report()
# Prepare export data
export_data = {
"export_timestamp": datetime.now().isoformat(),
"usage_report": {
"total_assets": usage_report.total_assets,
"used_assets": usage_report.used_assets,
"unused_assets": usage_report.unused_assets,
"utilization_rate": usage_report.utilization_rate,
"popular_assets": usage_report.popular_assets,
"size_distribution": usage_report.size_distribution,
"format_distribution": usage_report.format_distribution
},
"usage_history": {
content_hash: [
{"timestamp": ts.isoformat(), "document": doc}
for ts, doc in history
]
for content_hash, history in self._usage_history.items()
}
}
if format.lower() == "json":
export_path.write_text(json.dumps(export_data, indent=2))
elif format.lower() == "csv":
# Simple CSV export of usage data
import csv
with open(export_path, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['Asset', 'Usage Count', 'Size Bytes', 'Format'])
for asset in usage_report.popular_assets:
writer.writerow([
asset['filename'],
asset['usage_count'],
asset['size_bytes'],
Path(asset['filename']).suffix
])
def clear_analytics_data(self):
"""Clear all collected analytics data."""
self._usage_history.clear()