Files
markitect-tool/src/markitect_tool/cache/engine.py

231 lines
7.0 KiB
Python

"""Lightweight cache manifest for Markdown fingerprints and parse summaries."""
from __future__ import annotations
import hashlib
import json
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Any
from markitect_tool.core import parse_markdown_file
CACHE_SCHEMA_VERSION = "1.0"
DEFAULT_CACHE_PATH = ".markitect/cache/manifest.json"
@dataclass(frozen=True)
class CacheEntry:
"""One cached source-file fingerprint and parse summary."""
path: str
content_hash: str
size: int
mtime_ns: int
parser: str = "markdown-it-py/commonmark"
headings: int = 0
sections: int = 0
blocks: int = 0
def to_dict(self) -> dict[str, Any]:
return asdict(self)
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "CacheEntry":
return cls(
path=str(data["path"]),
content_hash=str(data["content_hash"]),
size=int(data["size"]),
mtime_ns=int(data["mtime_ns"]),
parser=str(data.get("parser", "markdown-it-py/commonmark")),
headings=int(data.get("headings", 0)),
sections=int(data.get("sections", 0)),
blocks=int(data.get("blocks", 0)),
)
@dataclass(frozen=True)
class CacheManifest:
"""A file-backed cache manifest."""
schema_version: str = CACHE_SCHEMA_VERSION
root: str = "."
entries: dict[str, CacheEntry] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
return {
"schema_version": self.schema_version,
"root": self.root,
"entries": {
path: entry.to_dict()
for path, entry in sorted(self.entries.items())
},
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "CacheManifest":
entries = {
path: CacheEntry.from_dict(entry)
for path, entry in (data.get("entries") or {}).items()
if isinstance(entry, dict)
}
return cls(
schema_version=str(data.get("schema_version", CACHE_SCHEMA_VERSION)),
root=str(data.get("root", ".")),
entries=entries,
)
@dataclass(frozen=True)
class CacheStatus:
"""Change detection result against a cache manifest."""
new: list[str] = field(default_factory=list)
changed: list[str] = field(default_factory=list)
unchanged: list[str] = field(default_factory=list)
deleted: list[str] = field(default_factory=list)
@property
def dirty(self) -> bool:
return bool(self.new or self.changed or self.deleted)
def to_dict(self) -> dict[str, Any]:
return {
"dirty": self.dirty,
"new": self.new,
"changed": self.changed,
"unchanged": self.unchanged,
"deleted": self.deleted,
"counts": {
"new": len(self.new),
"changed": len(self.changed),
"unchanged": len(self.unchanged),
"deleted": len(self.deleted),
},
}
def cache_path_for(root: str | Path, cache_path: str | Path | None = None) -> Path:
"""Return the manifest path for a root and optional cache path."""
path = Path(cache_path or DEFAULT_CACHE_PATH)
if path.is_absolute():
return path
return Path(root) / path
def load_cache(path: str | Path) -> CacheManifest:
"""Load a cache manifest. Missing manifests produce an empty manifest."""
manifest_path = Path(path)
if not manifest_path.exists():
return CacheManifest(root=str(manifest_path.parent.parent.parent if manifest_path.name == "manifest.json" else "."))
data = json.loads(manifest_path.read_text(encoding="utf-8"))
if not isinstance(data, dict):
raise ValueError("Cache manifest must be a JSON object")
return CacheManifest.from_dict(data)
def save_cache(manifest: CacheManifest, path: str | Path) -> None:
"""Write a cache manifest."""
manifest_path = Path(path)
manifest_path.parent.mkdir(parents=True, exist_ok=True)
manifest_path.write_text(
json.dumps(manifest.to_dict(), indent=2, ensure_ascii=False) + "\n",
encoding="utf-8",
)
def fingerprint_file(path: str | Path, *, root: str | Path | None = None) -> CacheEntry:
"""Fingerprint one Markdown file and record a small parse summary."""
file_path = Path(path)
stat = file_path.stat()
digest = hashlib.sha256(file_path.read_bytes()).hexdigest()
document = parse_markdown_file(file_path)
relative_path = _relative(file_path, Path(root).resolve() if root else file_path.parent.resolve())
return CacheEntry(
path=relative_path,
content_hash=f"sha256:{digest}",
size=stat.st_size,
mtime_ns=stat.st_mtime_ns,
headings=len(document.headings),
sections=len(document.sections),
blocks=len(document.blocks),
)
def build_cache(
paths: list[str | Path],
*,
root: str | Path = ".",
recursive: bool = True,
) -> CacheManifest:
"""Build a cache manifest for Markdown files under paths."""
root_path = Path(root).resolve()
entries: dict[str, CacheEntry] = {}
for source in scan_markdown_files(paths, recursive=recursive):
entry = fingerprint_file(source, root=root_path)
entries[entry.path] = entry
return CacheManifest(root=str(root_path), entries=entries)
def detect_changes(
manifest: CacheManifest,
paths: list[str | Path],
*,
root: str | Path = ".",
recursive: bool = True,
) -> CacheStatus:
"""Compare current Markdown files against a cache manifest."""
current = build_cache(paths, root=root, recursive=recursive)
current_paths = set(current.entries)
cached_paths = set(manifest.entries)
new = sorted(current_paths - cached_paths)
deleted = sorted(cached_paths - current_paths)
changed: list[str] = []
unchanged: list[str] = []
for path in sorted(current_paths & cached_paths):
if current.entries[path].content_hash == manifest.entries[path].content_hash:
unchanged.append(path)
else:
changed.append(path)
return CacheStatus(new=new, changed=changed, unchanged=unchanged, deleted=deleted)
def scan_markdown_files(
paths: list[str | Path],
*,
recursive: bool = True,
) -> list[Path]:
"""Return Markdown files for a set of files or directories."""
files: list[Path] = []
for raw_path in paths:
path = Path(raw_path)
if path.is_file() and _is_markdown(path):
files.append(path)
elif path.is_dir():
pattern = "**/*" if recursive else "*"
files.extend(candidate for candidate in path.glob(pattern) if candidate.is_file() and _is_markdown(candidate))
return sorted(set(files))
def _is_markdown(path: Path) -> bool:
return path.suffix.lower() in {".md", ".markdown"}
def _relative(path: Path, root: Path) -> str:
resolved = path.resolve()
try:
return resolved.relative_to(root).as_posix()
except ValueError:
return resolved.as_posix()