#!/usr/bin/env python3 """ Implementation example for Issue #141 - Concept A: Hash-Based Asset Store This is a working prototype demonstrating the hash-based content-addressable storage approach for asset management with deduplication. """ import hashlib import sqlite3 import json from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Tuple class HashBasedAssetStore: """Content-addressable storage system using SHA-256 hashes.""" def __init__(self, store_path: Path): self.store_path = store_path self.store_path.mkdir(parents=True, exist_ok=True) # Initialize database self.db_path = store_path / "metadata.db" self._init_database() def _init_database(self): """Initialize SQLite database with asset tables.""" with sqlite3.connect(self.db_path) as conn: conn.executescript(""" CREATE TABLE IF NOT EXISTS assets ( content_hash TEXT PRIMARY KEY, file_size INTEGER NOT NULL, mime_type TEXT, original_extension TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS asset_names ( id INTEGER PRIMARY KEY AUTOINCREMENT, content_hash TEXT NOT NULL, virtual_name TEXT NOT NULL, document_id TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (content_hash) REFERENCES assets(content_hash) ); CREATE INDEX IF NOT EXISTS idx_asset_names_virtual ON asset_names(virtual_name); CREATE INDEX IF NOT EXISTS idx_asset_names_document ON asset_names(document_id); """) def store_asset(self, file_path: Path, document_id: str = None) -> str: """Store asset and return content hash.""" if not file_path.exists(): raise FileNotFoundError(f"Asset file not found: {file_path}") content = file_path.read_bytes() content_hash = hashlib.sha256(content).hexdigest() # Create hash-based directory structure hash_dir = self.store_path / "store" / "sha256" / content_hash[:6] hash_dir.mkdir(parents=True, exist_ok=True) file_ext = file_path.suffix stored_path = hash_dir / f"{content_hash}{file_ext}" # Store file if it doesn't exist if not stored_path.exists(): stored_path.write_bytes(content) # Add to database with sqlite3.connect(self.db_path) as conn: conn.execute(""" INSERT OR REPLACE INTO assets (content_hash, file_size, mime_type, original_extension) VALUES (?, ?, ?, ?) """, (content_hash, len(content), self._guess_mime_type(file_ext), file_ext)) print(f"āœ“ Stored new asset: {content_hash[:12]}...{file_ext}") else: print(f"āœ“ Deduplication: Asset already exists {content_hash[:12]}...{file_ext}") return content_hash def register_name(self, content_hash: str, virtual_name: str, document_id: str): """Register a virtual name for an asset.""" with sqlite3.connect(self.db_path) as conn: conn.execute(""" INSERT INTO asset_names (content_hash, virtual_name, document_id) VALUES (?, ?, ?) """, (content_hash, virtual_name, document_id)) print(f"āœ“ Registered name: {virtual_name} -> {content_hash[:12]}...") def get_asset_path(self, content_hash: str) -> Optional[Path]: """Get filesystem path for asset by hash.""" with sqlite3.connect(self.db_path) as conn: cursor = conn.execute(""" SELECT original_extension FROM assets WHERE content_hash = ? """, (content_hash,)) result = cursor.fetchone() if result: extension = result[0] hash_dir = self.store_path / "store" / "sha256" / content_hash[:6] asset_path = hash_dir / f"{content_hash}{extension}" return asset_path if asset_path.exists() else None return None def resolve_name(self, virtual_name: str, document_id: str) -> Optional[str]: """Resolve virtual name to content hash.""" with sqlite3.connect(self.db_path) as conn: cursor = conn.execute(""" SELECT content_hash FROM asset_names WHERE virtual_name = ? AND document_id = ? """, (virtual_name, document_id)) result = cursor.fetchone() return result[0] if result else None def list_assets(self) -> List[Dict]: """List all stored assets with metadata.""" with sqlite3.connect(self.db_path) as conn: cursor = conn.execute(""" SELECT a.content_hash, a.file_size, a.mime_type, a.original_extension, a.created_at, COUNT(an.id) as name_count FROM assets a LEFT JOIN asset_names an ON a.content_hash = an.content_hash GROUP BY a.content_hash """) assets = [] for row in cursor: assets.append({ "hash": row[0], "size": row[1], "mime_type": row[2], "extension": row[3], "created": row[4], "reference_count": row[5] }) return assets def get_document_assets(self, document_id: str) -> List[Dict]: """Get all assets used by a specific document.""" with sqlite3.connect(self.db_path) as conn: cursor = conn.execute(""" SELECT an.virtual_name, an.content_hash, a.file_size, a.mime_type FROM asset_names an JOIN assets a ON an.content_hash = a.content_hash WHERE an.document_id = ? ORDER BY an.virtual_name """, (document_id,)) document_assets = [] for row in cursor: document_assets.append({ "virtual_name": row[0], "content_hash": row[1], "size": row[2], "mime_type": row[3] }) return document_assets def _guess_mime_type(self, extension: str) -> str: """Simple MIME type guessing based on extension.""" mime_map = { ".png": "image/png", ".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".gif": "image/gif", ".svg": "image/svg+xml", ".pdf": "application/pdf", ".txt": "text/plain", ".md": "text/markdown" } return mime_map.get(extension.lower(), "application/octet-stream") class MarkdownAssetProcessor: """Process markdown content with hash-based asset references.""" def __init__(self, asset_store: HashBasedAssetStore): self.asset_store = asset_store def import_document_assets(self, md_file: Path, assets_dir: Path, document_id: str) -> str: """Import all assets for a document and update markdown references.""" if not md_file.exists(): raise FileNotFoundError(f"Markdown file not found: {md_file}") md_content = md_file.read_text() # Find all image references import re image_pattern = r'!\[([^\]]*)\]\(([^)]+)\)' def replace_image_ref(match): alt_text = match.group(1) image_path = match.group(2) # Look for image in assets directory full_image_path = assets_dir / image_path if full_image_path.exists(): # Store asset and get hash content_hash = self.asset_store.store_asset(full_image_path, document_id) # Register virtual name self.asset_store.register_name(content_hash, image_path, document_id) # Return hash-based reference return f'![{alt_text}](asset://{content_hash})' else: print(f"⚠ Asset not found: {image_path}") return match.group(0) # Return original if not found # Process and replace image references processed_md = re.sub(image_pattern, replace_image_ref, md_content) return processed_md def export_document_assets(self, md_content: str, document_id: str, output_dir: Path) -> str: """Export document with resolved asset references.""" import re def resolve_asset_ref(match): alt_text = match.group(1) asset_ref = match.group(2) if asset_ref.startswith('asset://'): content_hash = asset_ref[8:] # Remove 'asset://' prefix # Get original virtual name with sqlite3.connect(self.asset_store.db_path) as conn: cursor = conn.execute(""" SELECT virtual_name FROM asset_names WHERE content_hash = ? AND document_id = ? """, (content_hash, document_id)) result = cursor.fetchone() if result: virtual_name = result[0] # Copy asset to output directory asset_path = self.asset_store.get_asset_path(content_hash) if asset_path: output_assets_dir = output_dir / "assets" output_assets_dir.mkdir(exist_ok=True) output_asset_path = output_assets_dir / virtual_name if not output_asset_path.exists(): import shutil shutil.copy2(asset_path, output_asset_path) return f'![{alt_text}](assets/{virtual_name})' return match.group(0) # Return original if can't resolve # Process asset references resolved_md = re.sub(r'!\[([^\]]*)\]\(([^)]+)\)', resolve_asset_ref, md_content) return resolved_md def demo_hash_based_assets(): """Demonstrate the hash-based asset management system.""" print("šŸŽÆ Asset Management Demo - Concept A (Hash-Based)") print("=" * 55) # Setup demo_store = Path("./demo_hash_store") if demo_store.exists(): import shutil shutil.rmtree(demo_store) asset_store = HashBasedAssetStore(demo_store) processor = MarkdownAssetProcessor(asset_store) # Create demo assets demo_assets = demo_store / "demo_inputs" demo_assets.mkdir(parents=True) # Create test assets (same as Concept B demo for comparison) (demo_assets / "logo.png").write_text("PNG_IMAGE_CONTENT_LOGO") (demo_assets / "company_logo.png").write_text("PNG_IMAGE_CONTENT_LOGO") # Duplicate content (demo_assets / "diagram.png").write_text("PNG_IMAGE_CONTENT_DIAGRAM") print(f"Created test assets: 3 files") # Store assets individually to show deduplication print(f"\nšŸ“ Storing assets...") hash1 = asset_store.store_asset(demo_assets / "logo.png", "doc1") hash2 = asset_store.store_asset(demo_assets / "company_logo.png", "doc2") hash3 = asset_store.store_asset(demo_assets / "diagram.png", "doc1") # Register virtual names asset_store.register_name(hash1, "logo.png", "doc1") asset_store.register_name(hash2, "company_logo.png", "doc2") # Same content, different name asset_store.register_name(hash3, "diagram.png", "doc1") asset_store.register_name(hash3, "system_diagram.png", "doc2") # Same content, different name # Show results print(f"\nšŸ“Š Storage Results:") print(f" - Files processed: 3") print(f" - Unique content hashes:") print(f" • logo.png: {hash1[:12]}...") print(f" • company_logo.png: {hash2[:12]}... {'(same as logo.png)' if hash1 == hash2 else '(different)'}") print(f" • diagram.png: {hash3[:12]}...") # List all assets print(f"\nšŸ“‹ Asset Library:") assets = asset_store.list_assets() for asset in assets: print(f" • {asset['hash'][:12]}...{asset['extension']} " f"({asset['size']} bytes, {asset['reference_count']} references)") # Show document assets for doc_id in ["doc1", "doc2"]: print(f"\nšŸ“„ Document '{doc_id}' assets:") doc_assets = asset_store.get_document_assets(doc_id) for asset in doc_assets: print(f" • {asset['virtual_name']} -> {asset['content_hash'][:12]}... ({asset['size']} bytes)") print(f"\nāœ… Demo completed successfully!") print(f" - Asset store: {demo_store}") print(f" - Database: {asset_store.db_path}") print(f" - Storage efficiency: Perfect deduplication by content hash") # Show directory structure print(f"\nšŸ“‚ Storage directory structure:") import os for root, dirs, files in os.walk(demo_store): level = root.replace(str(demo_store), '').count(os.sep) indent = ' ' * 2 * level print(f"{indent}{os.path.basename(root)}/") subindent = ' ' * 2 * (level + 1) for file in files: print(f"{subindent}{file}") if __name__ == "__main__": demo_hash_based_assets()