Introduces a new `markitect/proxy/` module with pluggable extractors that convert non-markdown sources (PDF, HTML) into tracked markdown proxy files. Proxy files preserve origin metadata (path, checksum, timestamp) so they can be kept in sync when the original changes. CLI commands: `proxy create`, `proxy update`, `proxy status`, `proxy extractors`. Built-in extractors: PDF (pymupdf4llm), HTML (markdownify), Markdown (built-in). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
242 lines
8.5 KiB
Python
242 lines
8.5 KiB
Python
"""
|
|
ProxyGenerator — create, update, and check status of proxy files.
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Dict, List
|
|
|
|
import yaml
|
|
|
|
from markitect.assets.utils import ContentHasher
|
|
from markitect.frontmatter import FrontMatterParser
|
|
from markitect.proxy.exceptions import ProxyError
|
|
from markitect.proxy.models import ProxyMetadata
|
|
from markitect.proxy.registry import ExtractorRegistry
|
|
|
|
logger = logging.getLogger("markitect.proxy.generator")
|
|
|
|
_frontmatter_parser = FrontMatterParser()
|
|
|
|
|
|
class ProxyGenerator:
|
|
"""Creates and manages markdown proxy files."""
|
|
|
|
def __init__(self, registry: ExtractorRegistry):
|
|
self.registry = registry
|
|
|
|
# ------------------------------------------------------------------
|
|
# create
|
|
# ------------------------------------------------------------------
|
|
|
|
def create(self, source: Path, output_dir: Path, force: bool = False) -> Path:
|
|
"""Create a proxy markdown file for *source*.
|
|
|
|
Args:
|
|
source: Path to the original file (e.g. ``report.pdf``).
|
|
output_dir: Directory where the proxy file will be written.
|
|
force: If True, overwrite an existing proxy file.
|
|
|
|
Returns:
|
|
Path to the created proxy file.
|
|
|
|
Raises:
|
|
ProxyError: If the source doesn't exist, extractor fails, etc.
|
|
"""
|
|
source = source.resolve()
|
|
if not source.is_file():
|
|
raise ProxyError(
|
|
f"Source file does not exist: {source}",
|
|
context={"source": str(source)},
|
|
)
|
|
|
|
output_dir = output_dir.resolve()
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
proxy_path = output_dir / f"{source.name}.md"
|
|
if proxy_path.exists() and not force:
|
|
raise ProxyError(
|
|
f"Proxy file already exists: {proxy_path} (use --force to overwrite)",
|
|
context={"proxy": str(proxy_path)},
|
|
)
|
|
|
|
extractor = self.registry.get_extractor_for_file(source)
|
|
|
|
if not extractor.check_dependencies():
|
|
raise ProxyError(
|
|
f"Missing dependency for {extractor.name} extractor. "
|
|
f"{extractor.dependency_hint()}",
|
|
)
|
|
|
|
result = extractor.extract(source)
|
|
|
|
checksum = ContentHasher.hash_file(source)
|
|
source_size = source.stat().st_size
|
|
|
|
# Relative path from proxy location to source
|
|
rel_source = os.path.relpath(source, output_dir)
|
|
|
|
meta = ProxyMetadata(
|
|
source_path=rel_source,
|
|
source_checksum=f"sha256:{checksum}",
|
|
source_size=source_size,
|
|
generated_at=datetime.now(timezone.utc).isoformat(),
|
|
extractor=result.extractor,
|
|
extractor_version=result.extractor_version,
|
|
)
|
|
|
|
self._write_proxy(proxy_path, meta, source.name, result.content)
|
|
logger.info("Created proxy %s -> %s", proxy_path.name, rel_source)
|
|
return proxy_path
|
|
|
|
# ------------------------------------------------------------------
|
|
# update
|
|
# ------------------------------------------------------------------
|
|
|
|
def update(self, proxy_path: Path) -> bool:
|
|
"""Re-extract the proxy file if its source has changed.
|
|
|
|
Returns:
|
|
True if the proxy was updated, False if already current.
|
|
"""
|
|
proxy_path = proxy_path.resolve()
|
|
meta, body = self._read_proxy(proxy_path)
|
|
|
|
source = self._resolve_source(proxy_path, meta)
|
|
if not source.is_file():
|
|
raise ProxyError(
|
|
f"Source file missing: {source}",
|
|
context={"source_path": meta["source_path"]},
|
|
)
|
|
|
|
current_checksum = f"sha256:{ContentHasher.hash_file(source)}"
|
|
if current_checksum == meta.get("source_checksum"):
|
|
return False
|
|
|
|
extractor = self.registry.get_extractor(source.suffix)
|
|
if not extractor.check_dependencies():
|
|
raise ProxyError(
|
|
f"Missing dependency for {extractor.name} extractor. "
|
|
f"{extractor.dependency_hint()}",
|
|
)
|
|
|
|
result = extractor.extract(source)
|
|
|
|
rel_source = meta["source_path"]
|
|
new_meta = ProxyMetadata(
|
|
source_path=rel_source,
|
|
source_checksum=current_checksum,
|
|
source_size=source.stat().st_size,
|
|
generated_at=datetime.now(timezone.utc).isoformat(),
|
|
extractor=result.extractor,
|
|
extractor_version=result.extractor_version,
|
|
)
|
|
|
|
self._write_proxy(proxy_path, new_meta, source.name, result.content)
|
|
logger.info("Updated proxy %s", proxy_path.name)
|
|
return True
|
|
|
|
# ------------------------------------------------------------------
|
|
# status
|
|
# ------------------------------------------------------------------
|
|
|
|
def status(self, proxy_path: Path) -> Dict:
|
|
"""Check a single proxy file's freshness.
|
|
|
|
Returns a dict with keys: proxy, source, status, extractor.
|
|
Status is one of: ``current``, ``stale``, ``missing-source``.
|
|
"""
|
|
proxy_path = proxy_path.resolve()
|
|
meta, _ = self._read_proxy(proxy_path)
|
|
|
|
source = self._resolve_source(proxy_path, meta)
|
|
if not source.is_file():
|
|
return {
|
|
"proxy": str(proxy_path),
|
|
"source": meta.get("source_path", ""),
|
|
"status": "missing-source",
|
|
"extractor": meta.get("extractor", ""),
|
|
}
|
|
|
|
current_checksum = f"sha256:{ContentHasher.hash_file(source)}"
|
|
is_current = current_checksum == meta.get("source_checksum")
|
|
return {
|
|
"proxy": str(proxy_path),
|
|
"source": meta.get("source_path", ""),
|
|
"status": "current" if is_current else "stale",
|
|
"extractor": meta.get("extractor", ""),
|
|
}
|
|
|
|
def bulk_status(self, directory: Path) -> List[Dict]:
|
|
"""Scan *directory* for proxy files and return their statuses."""
|
|
directory = directory.resolve()
|
|
results = []
|
|
for path in sorted(directory.rglob("*.md")):
|
|
meta, _ = self._try_read_proxy(path)
|
|
if meta is not None and meta.get("proxy") is True:
|
|
results.append(self.status(path))
|
|
return results
|
|
|
|
# ------------------------------------------------------------------
|
|
# helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def _write_proxy(
|
|
proxy_path: Path,
|
|
meta: ProxyMetadata,
|
|
source_name: str,
|
|
content: str,
|
|
) -> None:
|
|
fm = {
|
|
"proxy": True,
|
|
"source_path": meta.source_path,
|
|
"source_checksum": meta.source_checksum,
|
|
"source_size": meta.source_size,
|
|
"generated_at": meta.generated_at,
|
|
"extractor": meta.extractor,
|
|
"extractor_version": meta.extractor_version,
|
|
}
|
|
fm_text = yaml.dump(fm, default_flow_style=False, sort_keys=False)
|
|
body = (
|
|
f"---\n{fm_text}---\n\n"
|
|
f"# {source_name}\n\n"
|
|
f"*Proxy generated from `{meta.source_path}`*\n\n"
|
|
f"{content}"
|
|
)
|
|
proxy_path.write_text(body, encoding="utf-8")
|
|
|
|
@staticmethod
|
|
def _read_proxy(proxy_path: Path):
|
|
"""Read and parse an existing proxy file.
|
|
|
|
Returns:
|
|
Tuple of (frontmatter dict, body str).
|
|
"""
|
|
raw = proxy_path.read_text(encoding="utf-8")
|
|
meta, body = _frontmatter_parser.parse(raw)
|
|
if not meta.get("proxy"):
|
|
raise ProxyError(
|
|
f"Not a proxy file (missing 'proxy: true' in frontmatter): {proxy_path}",
|
|
context={"path": str(proxy_path)},
|
|
)
|
|
return meta, body
|
|
|
|
@staticmethod
|
|
def _try_read_proxy(path: Path):
|
|
"""Attempt to read a proxy file, returning (None, None) on failure."""
|
|
try:
|
|
raw = path.read_text(encoding="utf-8")
|
|
meta, body = _frontmatter_parser.parse(raw)
|
|
return meta, body
|
|
except Exception:
|
|
return None, None
|
|
|
|
@staticmethod
|
|
def _resolve_source(proxy_path: Path, meta: Dict) -> Path:
|
|
"""Resolve the source path relative to the proxy file's directory."""
|
|
source_rel = meta.get("source_path", "")
|
|
return (proxy_path.parent / source_rel).resolve()
|