feat(proxy): add proxy file system for non-markdown source conversion
Introduces a new `markitect/proxy/` module with pluggable extractors that convert non-markdown sources (PDF, HTML) into tracked markdown proxy files. Proxy files preserve origin metadata (path, checksum, timestamp) so they can be kept in sync when the original changes. CLI commands: `proxy create`, `proxy update`, `proxy status`, `proxy extractors`. Built-in extractors: PDF (pymupdf4llm), HTML (markdownify), Markdown (built-in). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
241
markitect/proxy/generator.py
Normal file
241
markitect/proxy/generator.py
Normal file
@@ -0,0 +1,241 @@
|
||||
"""
|
||||
ProxyGenerator — create, update, and check status of proxy files.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Dict, List
|
||||
|
||||
import yaml
|
||||
|
||||
from markitect.assets.utils import ContentHasher
|
||||
from markitect.frontmatter import FrontMatterParser
|
||||
from markitect.proxy.exceptions import ProxyError
|
||||
from markitect.proxy.models import ProxyMetadata
|
||||
from markitect.proxy.registry import ExtractorRegistry
|
||||
|
||||
logger = logging.getLogger("markitect.proxy.generator")
|
||||
|
||||
_frontmatter_parser = FrontMatterParser()
|
||||
|
||||
|
||||
class ProxyGenerator:
|
||||
"""Creates and manages markdown proxy files."""
|
||||
|
||||
def __init__(self, registry: ExtractorRegistry):
|
||||
self.registry = registry
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# create
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def create(self, source: Path, output_dir: Path, force: bool = False) -> Path:
|
||||
"""Create a proxy markdown file for *source*.
|
||||
|
||||
Args:
|
||||
source: Path to the original file (e.g. ``report.pdf``).
|
||||
output_dir: Directory where the proxy file will be written.
|
||||
force: If True, overwrite an existing proxy file.
|
||||
|
||||
Returns:
|
||||
Path to the created proxy file.
|
||||
|
||||
Raises:
|
||||
ProxyError: If the source doesn't exist, extractor fails, etc.
|
||||
"""
|
||||
source = source.resolve()
|
||||
if not source.is_file():
|
||||
raise ProxyError(
|
||||
f"Source file does not exist: {source}",
|
||||
context={"source": str(source)},
|
||||
)
|
||||
|
||||
output_dir = output_dir.resolve()
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
proxy_path = output_dir / f"{source.name}.md"
|
||||
if proxy_path.exists() and not force:
|
||||
raise ProxyError(
|
||||
f"Proxy file already exists: {proxy_path} (use --force to overwrite)",
|
||||
context={"proxy": str(proxy_path)},
|
||||
)
|
||||
|
||||
extractor = self.registry.get_extractor_for_file(source)
|
||||
|
||||
if not extractor.check_dependencies():
|
||||
raise ProxyError(
|
||||
f"Missing dependency for {extractor.name} extractor. "
|
||||
f"{extractor.dependency_hint()}",
|
||||
)
|
||||
|
||||
result = extractor.extract(source)
|
||||
|
||||
checksum = ContentHasher.hash_file(source)
|
||||
source_size = source.stat().st_size
|
||||
|
||||
# Relative path from proxy location to source
|
||||
rel_source = os.path.relpath(source, output_dir)
|
||||
|
||||
meta = ProxyMetadata(
|
||||
source_path=rel_source,
|
||||
source_checksum=f"sha256:{checksum}",
|
||||
source_size=source_size,
|
||||
generated_at=datetime.now(timezone.utc).isoformat(),
|
||||
extractor=result.extractor,
|
||||
extractor_version=result.extractor_version,
|
||||
)
|
||||
|
||||
self._write_proxy(proxy_path, meta, source.name, result.content)
|
||||
logger.info("Created proxy %s -> %s", proxy_path.name, rel_source)
|
||||
return proxy_path
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# update
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def update(self, proxy_path: Path) -> bool:
|
||||
"""Re-extract the proxy file if its source has changed.
|
||||
|
||||
Returns:
|
||||
True if the proxy was updated, False if already current.
|
||||
"""
|
||||
proxy_path = proxy_path.resolve()
|
||||
meta, body = self._read_proxy(proxy_path)
|
||||
|
||||
source = self._resolve_source(proxy_path, meta)
|
||||
if not source.is_file():
|
||||
raise ProxyError(
|
||||
f"Source file missing: {source}",
|
||||
context={"source_path": meta["source_path"]},
|
||||
)
|
||||
|
||||
current_checksum = f"sha256:{ContentHasher.hash_file(source)}"
|
||||
if current_checksum == meta.get("source_checksum"):
|
||||
return False
|
||||
|
||||
extractor = self.registry.get_extractor(source.suffix)
|
||||
if not extractor.check_dependencies():
|
||||
raise ProxyError(
|
||||
f"Missing dependency for {extractor.name} extractor. "
|
||||
f"{extractor.dependency_hint()}",
|
||||
)
|
||||
|
||||
result = extractor.extract(source)
|
||||
|
||||
rel_source = meta["source_path"]
|
||||
new_meta = ProxyMetadata(
|
||||
source_path=rel_source,
|
||||
source_checksum=current_checksum,
|
||||
source_size=source.stat().st_size,
|
||||
generated_at=datetime.now(timezone.utc).isoformat(),
|
||||
extractor=result.extractor,
|
||||
extractor_version=result.extractor_version,
|
||||
)
|
||||
|
||||
self._write_proxy(proxy_path, new_meta, source.name, result.content)
|
||||
logger.info("Updated proxy %s", proxy_path.name)
|
||||
return True
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# status
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def status(self, proxy_path: Path) -> Dict:
|
||||
"""Check a single proxy file's freshness.
|
||||
|
||||
Returns a dict with keys: proxy, source, status, extractor.
|
||||
Status is one of: ``current``, ``stale``, ``missing-source``.
|
||||
"""
|
||||
proxy_path = proxy_path.resolve()
|
||||
meta, _ = self._read_proxy(proxy_path)
|
||||
|
||||
source = self._resolve_source(proxy_path, meta)
|
||||
if not source.is_file():
|
||||
return {
|
||||
"proxy": str(proxy_path),
|
||||
"source": meta.get("source_path", ""),
|
||||
"status": "missing-source",
|
||||
"extractor": meta.get("extractor", ""),
|
||||
}
|
||||
|
||||
current_checksum = f"sha256:{ContentHasher.hash_file(source)}"
|
||||
is_current = current_checksum == meta.get("source_checksum")
|
||||
return {
|
||||
"proxy": str(proxy_path),
|
||||
"source": meta.get("source_path", ""),
|
||||
"status": "current" if is_current else "stale",
|
||||
"extractor": meta.get("extractor", ""),
|
||||
}
|
||||
|
||||
def bulk_status(self, directory: Path) -> List[Dict]:
|
||||
"""Scan *directory* for proxy files and return their statuses."""
|
||||
directory = directory.resolve()
|
||||
results = []
|
||||
for path in sorted(directory.rglob("*.md")):
|
||||
meta, _ = self._try_read_proxy(path)
|
||||
if meta is not None and meta.get("proxy") is True:
|
||||
results.append(self.status(path))
|
||||
return results
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# helpers
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
@staticmethod
|
||||
def _write_proxy(
|
||||
proxy_path: Path,
|
||||
meta: ProxyMetadata,
|
||||
source_name: str,
|
||||
content: str,
|
||||
) -> None:
|
||||
fm = {
|
||||
"proxy": True,
|
||||
"source_path": meta.source_path,
|
||||
"source_checksum": meta.source_checksum,
|
||||
"source_size": meta.source_size,
|
||||
"generated_at": meta.generated_at,
|
||||
"extractor": meta.extractor,
|
||||
"extractor_version": meta.extractor_version,
|
||||
}
|
||||
fm_text = yaml.dump(fm, default_flow_style=False, sort_keys=False)
|
||||
body = (
|
||||
f"---\n{fm_text}---\n\n"
|
||||
f"# {source_name}\n\n"
|
||||
f"*Proxy generated from `{meta.source_path}`*\n\n"
|
||||
f"{content}"
|
||||
)
|
||||
proxy_path.write_text(body, encoding="utf-8")
|
||||
|
||||
@staticmethod
|
||||
def _read_proxy(proxy_path: Path):
|
||||
"""Read and parse an existing proxy file.
|
||||
|
||||
Returns:
|
||||
Tuple of (frontmatter dict, body str).
|
||||
"""
|
||||
raw = proxy_path.read_text(encoding="utf-8")
|
||||
meta, body = _frontmatter_parser.parse(raw)
|
||||
if not meta.get("proxy"):
|
||||
raise ProxyError(
|
||||
f"Not a proxy file (missing 'proxy: true' in frontmatter): {proxy_path}",
|
||||
context={"path": str(proxy_path)},
|
||||
)
|
||||
return meta, body
|
||||
|
||||
@staticmethod
|
||||
def _try_read_proxy(path: Path):
|
||||
"""Attempt to read a proxy file, returning (None, None) on failure."""
|
||||
try:
|
||||
raw = path.read_text(encoding="utf-8")
|
||||
meta, body = _frontmatter_parser.parse(raw)
|
||||
return meta, body
|
||||
except Exception:
|
||||
return None, None
|
||||
|
||||
@staticmethod
|
||||
def _resolve_source(proxy_path: Path, meta: Dict) -> Path:
|
||||
"""Resolve the source path relative to the proxy file's directory."""
|
||||
source_rel = meta.get("source_path", "")
|
||||
return (proxy_path.parent / source_rel).resolve()
|
||||
Reference in New Issue
Block a user