extension for ref resolve, explode, implode, weave, tangle

This commit is contained in:
2026-05-04 02:25:49 +02:00
parent 8203f50fd5
commit 65bfc1aebf
39 changed files with 3959 additions and 25 deletions

View File

@@ -0,0 +1,23 @@
"""Markdown-native literate weave/tangle workflows."""
from markitect_tool.literate.engine import (
CodeChunk,
LiterateFile,
TangleResult,
WeaveResult,
discover_code_chunks,
tangle_markdown,
weave_markdown,
write_tangle_files,
)
__all__ = [
"CodeChunk",
"LiterateFile",
"TangleResult",
"WeaveResult",
"discover_code_chunks",
"tangle_markdown",
"weave_markdown",
"write_tangle_files",
]

View File

@@ -0,0 +1,317 @@
"""Literate programming helpers for Markdown fenced code chunks."""
from __future__ import annotations
import hashlib
import re
import shlex
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Any
from markdown_it import MarkdownIt
from markitect_tool.diagnostics import Diagnostic, SourceLocation
from markitect_tool.ops import OperationProvenance
@dataclass(frozen=True)
class CodeChunk:
"""A named fenced code chunk."""
chunk_id: str
content: str
language: str | None = None
target_path: str | None = None
references: list[str] = field(default_factory=list)
source_path: str | None = None
line_start: int | None = None
line_end: int | None = None
content_hash: str = ""
def to_dict(self) -> dict[str, Any]:
return {key: value for key, value in asdict(self).items() if value not in (None, [], "")}
@dataclass(frozen=True)
class LiterateFile:
"""One generated file from tangling."""
path: str
content: str
chunk_ids: list[str]
def to_dict(self) -> dict[str, Any]:
return asdict(self)
@dataclass(frozen=True)
class TangleResult:
"""Result of tangling Markdown code chunks."""
files: list[LiterateFile]
chunks: list[CodeChunk]
diagnostics: list[Diagnostic] = field(default_factory=list)
provenance: list[OperationProvenance] = field(default_factory=list)
@property
def valid(self) -> bool:
return not any(diagnostic.severity == "error" for diagnostic in self.diagnostics)
def to_dict(self) -> dict[str, Any]:
return {
"valid": self.valid,
"files": [file.to_dict() for file in self.files],
"chunks": [chunk.to_dict() for chunk in self.chunks],
"diagnostics": [diagnostic.to_dict() for diagnostic in self.diagnostics],
"provenance": [event.to_dict() for event in self.provenance],
}
@dataclass(frozen=True)
class WeaveResult:
"""Result of weaving Markdown documentation with a chunk index."""
markdown: str
chunks: list[CodeChunk]
def to_dict(self) -> dict[str, Any]:
return {
"markdown": self.markdown,
"chunks": [chunk.to_dict() for chunk in self.chunks],
}
_CHUNK_REF_RE = re.compile(r"<<(?P<id>[A-Za-z0-9_.:-]+)>>")
_CHUNK_LINE_REF_RE = re.compile(r"^(?P<indent>[ \t]*)<<(?P<id>[A-Za-z0-9_.:-]+)>>[ \t]*$", re.MULTILINE)
def discover_code_chunks(
markdown: str,
*,
source_path: str | Path | None = None,
) -> list[CodeChunk]:
"""Discover named fenced code chunks in Markdown order."""
parser = MarkdownIt("commonmark", {"tables": True}).enable("table")
chunks: list[CodeChunk] = []
used_ids: dict[str, int] = {}
for token in parser.parse(markdown):
if token.type != "fence":
continue
attrs = _parse_fence_info(token.info)
chunk_id = attrs.get("id")
if not chunk_id:
continue
chunk_id = _dedupe_id(_slug(chunk_id), used_ids)
line_start = token.map[0] + 1 if token.map else None
line_end = token.map[1] if token.map else None
chunks.append(
CodeChunk(
chunk_id=chunk_id,
content=token.content,
language=attrs.get("language"),
target_path=attrs.get("tangle") or attrs.get("target"),
references=_chunk_references(token.content),
source_path=str(source_path) if source_path else None,
line_start=line_start,
line_end=line_end,
content_hash=_hash_text(token.content),
)
)
return chunks
def tangle_markdown(
markdown: str,
*,
source_path: str | Path | None = None,
) -> TangleResult:
"""Tangle named chunks into target files."""
chunks = discover_code_chunks(markdown, source_path=source_path)
chunks_by_id = {chunk.chunk_id: chunk for chunk in chunks}
diagnostics: list[Diagnostic] = []
provenance: list[OperationProvenance] = []
target_chunks: dict[str, list[CodeChunk]] = {}
for chunk in chunks:
if chunk.target_path:
target_chunks.setdefault(chunk.target_path, []).append(chunk)
files: list[LiterateFile] = []
for target_path, grouped_chunks in target_chunks.items():
rendered_parts: list[str] = []
for chunk in grouped_chunks:
rendered_parts.append(_expand_chunk(chunk, chunks_by_id, diagnostics, []))
provenance.append(
OperationProvenance(
operation="literate.tangle",
source_path=chunk.source_path,
line_start=chunk.line_start,
line_end=chunk.line_end,
target_path=target_path,
dependencies=[chunk.source_path] if chunk.source_path else [],
metadata={"chunk_id": chunk.chunk_id, "references": chunk.references},
)
)
files.append(
LiterateFile(
path=target_path,
content=_join_tangled_parts(rendered_parts),
chunk_ids=[chunk.chunk_id for chunk in grouped_chunks],
)
)
return TangleResult(
files=files,
chunks=chunks,
diagnostics=diagnostics,
provenance=provenance,
)
def weave_markdown(
markdown: str,
*,
source_path: str | Path | None = None,
) -> WeaveResult:
"""Append a deterministic chunk index to human-readable Markdown."""
chunks = discover_code_chunks(markdown, source_path=source_path)
if not chunks:
return WeaveResult(markdown=markdown, chunks=[])
lines = [markdown.rstrip(), "", "## Code Chunk Index", ""]
for chunk in chunks:
target = f" -> `{chunk.target_path}`" if chunk.target_path else ""
refs = f"; refs: {', '.join(f'`{ref}`' for ref in chunk.references)}" if chunk.references else ""
location = f" line {chunk.line_start}" if chunk.line_start else ""
lines.append(f"- `{chunk.chunk_id}`{target}{refs}{location}")
return WeaveResult(markdown="\n".join(lines).rstrip() + "\n", chunks=chunks)
def write_tangle_files(result: TangleResult, output_dir: str | Path) -> list[str]:
"""Write tangled files under an output directory."""
root = Path(output_dir)
root.mkdir(parents=True, exist_ok=True)
written: list[str] = []
for file in result.files:
target = _safe_output_path(root, file.path)
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(file.content, encoding="utf-8")
written.append(str(target))
return written
def _expand_chunk(
chunk: CodeChunk,
chunks_by_id: dict[str, CodeChunk],
diagnostics: list[Diagnostic],
stack: list[str],
) -> str:
if chunk.chunk_id in stack:
diagnostics.append(
Diagnostic(
severity="error",
code="literate.chunk_cycle",
message="Cyclic chunk reference: " + " -> ".join(stack + [chunk.chunk_id]),
source=SourceLocation(path=chunk.source_path, line=chunk.line_start),
)
)
return f"<<{chunk.chunk_id}>>"
def replace_line(match: re.Match[str]) -> str:
indent = match.group("indent")
expanded = _expand_reference(match.group("id"), chunks_by_id, diagnostics, stack + [chunk.chunk_id], chunk)
return "\n".join(f"{indent}{line}" if line else line for line in expanded.splitlines())
rendered = _CHUNK_LINE_REF_RE.sub(replace_line, chunk.content)
def replace_inline(match: re.Match[str]) -> str:
return _expand_reference(match.group("id"), chunks_by_id, diagnostics, stack + [chunk.chunk_id], chunk)
return _CHUNK_REF_RE.sub(replace_inline, rendered)
def _expand_reference(
chunk_id: str,
chunks_by_id: dict[str, CodeChunk],
diagnostics: list[Diagnostic],
stack: list[str],
source_chunk: CodeChunk,
) -> str:
referenced = chunks_by_id.get(chunk_id)
if not referenced:
diagnostics.append(
Diagnostic(
severity="error",
code="literate.missing_chunk",
message=f"Missing chunk reference `{chunk_id}`",
source=SourceLocation(path=source_chunk.source_path, line=source_chunk.line_start),
)
)
return f"<<{chunk_id}>>"
return _expand_chunk(referenced, chunks_by_id, diagnostics, stack)
def _join_tangled_parts(parts: list[str]) -> str:
rendered = "\n".join(part.rstrip("\n") for part in parts if part is not None)
return rendered.rstrip() + "\n" if rendered else ""
def _safe_output_path(root: Path, relative_path: str) -> Path:
path = Path(relative_path)
if path.is_absolute():
raise ValueError(f"Tangle target must be relative: {relative_path}")
resolved = (root / path).resolve()
try:
resolved.relative_to(root.resolve())
except ValueError as exc:
raise ValueError(f"Tangle target escapes output directory: {relative_path}") from exc
return resolved
def _parse_fence_info(info: str) -> dict[str, str]:
match = re.match(r"^(?P<language>[^\s{]+)?(?:\s+\{(?P<attrs>.*)\})?\s*$", info.strip())
if not match:
return {"language": info.strip()} if info.strip() else {}
attrs = _parse_attrs(match.group("attrs") or "")
language = match.group("language")
if language:
attrs["language"] = language
return attrs
def _parse_attrs(raw: str) -> dict[str, str]:
attrs: dict[str, str] = {}
for part in shlex.split(raw):
if part.startswith("#") and len(part) > 1:
attrs["id"] = part[1:]
continue
if "=" not in part:
attrs[part] = "true"
continue
key, value = part.split("=", 1)
attrs[key.strip()] = value.strip()
return attrs
def _chunk_references(content: str) -> list[str]:
return [match.group("id") for match in _CHUNK_REF_RE.finditer(content)]
def _dedupe_id(unit_id: str, used_ids: dict[str, int]) -> str:
count = used_ids.get(unit_id, 0) + 1
used_ids[unit_id] = count
return unit_id if count == 1 else f"{unit_id}-{count}"
def _slug(value: str) -> str:
slug = re.sub(r"[^a-z0-9_.:-]+", "-", value.strip().lower())
slug = re.sub(r"-+", "-", slug).strip("-")
return slug or "chunk"
def _hash_text(text: str) -> str:
return "sha256:" + hashlib.sha256(text.encode("utf-8")).hexdigest()