Deterministic ops layer and cli

This commit is contained in:
2026-05-04 00:23:04 +02:00
parent 6f0facd744
commit 274a7fcdd6
7 changed files with 778 additions and 2 deletions

View File

@@ -0,0 +1,21 @@
"""Deterministic Markdown document operations."""
from markitect_tool.ops.engine import (
ComposeResult,
IncludeError,
IncludeResult,
TransformResult,
compose_files,
resolve_includes,
transform_markdown,
)
__all__ = [
"ComposeResult",
"IncludeError",
"IncludeResult",
"TransformResult",
"compose_files",
"resolve_includes",
"transform_markdown",
]

View File

@@ -0,0 +1,300 @@
"""Deterministic transform, compose, and include operations."""
from __future__ import annotations
import re
import shlex
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Any
import yaml
from markitect_tool.core import parse_markdown
from markitect_tool.query import extract_document
class IncludeError(ValueError):
"""Raised when include resolution cannot continue."""
@dataclass(frozen=True)
class TransformResult:
"""Result of a deterministic Markdown transform."""
markdown: str
operations: list[str] = field(default_factory=list)
def to_dict(self) -> dict[str, Any]:
return asdict(self)
@dataclass(frozen=True)
class ComposeResult:
"""Result of composing multiple Markdown sources."""
markdown: str
sources: list[str] = field(default_factory=list)
def to_dict(self) -> dict[str, Any]:
return asdict(self)
@dataclass(frozen=True)
class IncludeResult:
"""Result of resolving include markers in Markdown."""
markdown: str
included_paths: list[str] = field(default_factory=list)
def to_dict(self) -> dict[str, Any]:
return asdict(self)
_COMMENT_INCLUDE_RE = re.compile(r"<!--\s*mkt:include\s+(?P<attrs>.*?)\s*-->", re.DOTALL)
_BRACE_INCLUDE_RE = re.compile(r"\{\{\s*include:(?P<path>[^}]+?)\s*\}\}")
_HEADING_RE = re.compile(r"^(#{1,6})(\s+.+)$", re.MULTILINE)
def transform_markdown(
markdown: str,
*,
strip_frontmatter: bool = False,
set_frontmatter: dict[str, Any] | None = None,
heading_delta: int = 0,
extract_selector: str | None = None,
source_path: str | None = None,
) -> TransformResult:
"""Apply deterministic operations to one Markdown document."""
operations: list[str] = []
frontmatter, body = _split_frontmatter(markdown)
if set_frontmatter:
frontmatter = _deep_merge(frontmatter, set_frontmatter)
operations.append("set_frontmatter")
if heading_delta:
body = shift_heading_levels(body, heading_delta)
operations.append(f"shift_headings:{heading_delta}")
if extract_selector:
document_text = _join_frontmatter(frontmatter, body) if frontmatter else body
document = parse_markdown(document_text, source_path=source_path)
body = "\n\n".join(extract_document(document, extract_selector))
frontmatter = {}
operations.append(f"extract:{extract_selector}")
if strip_frontmatter:
frontmatter = {}
operations.append("strip_frontmatter")
return TransformResult(markdown=_join_frontmatter(frontmatter, body), operations=operations)
def shift_heading_levels(markdown: str, delta: int) -> str:
"""Shift ATX heading levels by delta while clamping to levels 1 through 6."""
def replace(match: re.Match[str]) -> str:
marks = match.group(1)
suffix = match.group(2)
level = min(max(len(marks) + delta, 1), 6)
return f"{'#' * level}{suffix}"
return _HEADING_RE.sub(replace, markdown)
def compose_files(
paths: list[str | Path],
*,
title: str | None = None,
heading_delta: int = 0,
include_frontmatter: bool = False,
separator: str = "\n\n---\n\n",
) -> ComposeResult:
"""Compose Markdown files into one Markdown output."""
parts: list[str] = []
sources: list[str] = []
if title:
parts.append(f"# {title.strip()}")
for raw_path in paths:
path = Path(raw_path)
text = path.read_text(encoding="utf-8")
frontmatter, body = _split_frontmatter(text)
if include_frontmatter and frontmatter:
body = _join_frontmatter(frontmatter, body)
if heading_delta:
body = shift_heading_levels(body, heading_delta)
body = body.strip()
if body:
parts.append(body)
sources.append(str(path))
return ComposeResult(markdown=separator.join(parts).strip() + "\n", sources=sources)
def resolve_includes(
markdown: str,
*,
base_dir: str | Path,
current_path: str | Path | None = None,
max_depth: int = 10,
) -> IncludeResult:
"""Resolve Markdown include markers recursively.
Supported syntax:
- ``<!-- mkt:include path="relative/file.md" -->``
- ``<!-- mkt:include path="relative/file.md" selector="sections[heading=Intro]" heading_delta="1" -->``
- ``{{include:relative/file.md}}`` for a compact legacy-compatible shorthand.
"""
root = Path(base_dir).resolve()
stack = [Path(current_path).resolve()] if current_path else []
included: list[Path] = []
resolved = _resolve_include_text(
markdown,
root=root,
current_dir=Path(current_path).resolve().parent if current_path else root,
stack=stack,
included=included,
depth=0,
max_depth=max_depth,
)
return IncludeResult(
markdown=resolved,
included_paths=[str(path) for path in included],
)
def _resolve_include_text(
markdown: str,
*,
root: Path,
current_dir: Path,
stack: list[Path],
included: list[Path],
depth: int,
max_depth: int,
) -> str:
if depth > max_depth:
raise IncludeError(f"Include depth exceeded max_depth={max_depth}")
def replace_comment(match: re.Match[str]) -> str:
attrs = _parse_include_attrs(match.group("attrs"))
return _render_include(attrs, root, current_dir, stack, included, depth, max_depth)
def replace_brace(match: re.Match[str]) -> str:
attrs = {"path": match.group("path").strip()}
return _render_include(attrs, root, current_dir, stack, included, depth, max_depth)
markdown = _COMMENT_INCLUDE_RE.sub(replace_comment, markdown)
return _BRACE_INCLUDE_RE.sub(replace_brace, markdown)
def _render_include(
attrs: dict[str, str],
root: Path,
current_dir: Path,
stack: list[Path],
included: list[Path],
depth: int,
max_depth: int,
) -> str:
raw_path = attrs.get("path")
if not raw_path:
raise IncludeError("Include marker requires a path attribute")
include_path = _resolve_safe_path(raw_path, root, current_dir)
if include_path in stack:
cycle = " -> ".join([str(path) for path in stack + [include_path]])
raise IncludeError(f"Circular include detected: {cycle}")
if not include_path.exists() or not include_path.is_file():
raise IncludeError(f"Included file not found: {include_path}")
text = include_path.read_text(encoding="utf-8")
frontmatter, body = _split_frontmatter(text)
selector = attrs.get("selector")
if selector:
document = parse_markdown(text, source_path=str(include_path))
body = "\n\n".join(extract_document(document, selector))
elif attrs.get("include_frontmatter", "").lower() in {"1", "true", "yes"}:
body = _join_frontmatter(frontmatter, body)
heading_delta = int(attrs.get("heading_delta", "0"))
if heading_delta:
body = shift_heading_levels(body, heading_delta)
included.append(include_path)
return _resolve_include_text(
body.strip(),
root=root,
current_dir=include_path.parent,
stack=stack + [include_path],
included=included,
depth=depth + 1,
max_depth=max_depth,
)
def _parse_include_attrs(raw: str) -> dict[str, str]:
attrs: dict[str, str] = {}
for part in shlex.split(raw):
if "=" not in part:
raise IncludeError(f"Invalid include attribute `{part}`")
key, value = part.split("=", 1)
attrs[key.strip()] = value.strip()
return attrs
def _resolve_safe_path(raw_path: str, root: Path, current_dir: Path) -> Path:
candidate = Path(raw_path)
if candidate.is_absolute():
resolved = candidate.resolve()
else:
resolved = (current_dir / candidate).resolve()
try:
resolved.relative_to(root)
except ValueError as exc:
raise IncludeError(f"Included path escapes base directory: {raw_path}") from exc
return resolved
def _split_frontmatter(markdown: str) -> tuple[dict[str, Any], str]:
if not markdown.startswith("---\n"):
return {}, markdown
end = markdown.find("\n---", 4)
if end == -1:
return {}, markdown
closing_end = markdown.find("\n", end + 4)
if closing_end == -1:
closing_end = len(markdown)
else:
closing_end += 1
raw_frontmatter = markdown[4:end]
data = yaml.safe_load(raw_frontmatter) if raw_frontmatter.strip() else {}
if data is None:
data = {}
if not isinstance(data, dict):
return {}, markdown
return data, markdown[closing_end:]
def _join_frontmatter(frontmatter: dict[str, Any], body: str) -> str:
body = body.lstrip("\n")
if not frontmatter:
return body
rendered = yaml.safe_dump(frontmatter, sort_keys=False).strip()
return f"---\n{rendered}\n---\n\n{body}"
def _deep_merge(left: dict[str, Any], right: dict[str, Any]) -> dict[str, Any]:
merged = dict(left)
for key, value in right.items():
if isinstance(merged.get(key), dict) and isinstance(value, dict):
merged[key] = _deep_merge(merged[key], value)
else:
merged[key] = value
return merged