Deterministic templating and generation support

This commit is contained in:
2026-05-04 01:12:54 +02:00
parent 4f010315bb
commit 1a1b5ab39c
13 changed files with 1122 additions and 7 deletions

View File

@@ -0,0 +1,31 @@
"""Deterministic Markdown generation primitives and hook boundaries."""
from markitect_tool.generation.engine import (
GeneratedDocument,
GenerationHook,
GenerationHookRequest,
GenerationHookResult,
GenerationPlan,
GenerationPlanError,
GenerationResult,
generate_stub_from_contract,
generate_with_hook,
load_data_file,
load_generation_plan_file,
run_generation_plan,
)
__all__ = [
"GeneratedDocument",
"GenerationHook",
"GenerationHookRequest",
"GenerationHookResult",
"GenerationPlan",
"GenerationPlanError",
"GenerationResult",
"generate_stub_from_contract",
"generate_with_hook",
"load_data_file",
"load_generation_plan_file",
"run_generation_plan",
]

View File

@@ -0,0 +1,339 @@
"""Markdown generation from contracts, templates, rules, and external hooks."""
from __future__ import annotations
import csv
import json
import re
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Any, Protocol
import yaml
from markitect_tool.contract import DocumentContract
from markitect_tool.core import parse_markdown
from markitect_tool.template import TemplateRenderResult, render_template
class GenerationPlanError(ValueError):
"""Raised when a Markdown generation plan cannot be loaded or run."""
@dataclass(frozen=True)
class GeneratedDocument:
"""One generated Markdown document."""
markdown: str
output_path: str | None = None
source_template: str | None = None
data: dict[str, Any] = field(default_factory=dict)
missing_variables: list[str] = field(default_factory=list)
def to_dict(self) -> dict[str, Any]:
data = asdict(self)
data["complete"] = not self.missing_variables
return {key: value for key, value in data.items() if value not in (None, [], {})}
@dataclass(frozen=True)
class GenerationResult:
"""Result of a deterministic generation run."""
documents: list[GeneratedDocument]
plan_path: str | None = None
def to_dict(self) -> dict[str, Any]:
data = {
"count": len(self.documents),
"documents": [document.to_dict() for document in self.documents],
"plan_path": self.plan_path,
}
return {key: value for key, value in data.items() if value is not None}
@dataclass(frozen=True)
class GenerationPlan:
"""Markdown/YAML rule-based generation plan."""
documents: list[dict[str, Any]]
source_path: str | None = None
def to_dict(self) -> dict[str, Any]:
data = {"documents": self.documents, "source_path": self.source_path}
return {key: value for key, value in data.items() if value is not None}
@dataclass(frozen=True)
class GenerationHookRequest:
"""Provider-neutral request for optional assisted generation."""
prompt: str
data: dict[str, Any] = field(default_factory=dict)
template: str | None = None
contract_id: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass(frozen=True)
class GenerationHookResult:
"""Provider-neutral response from an assisted generation hook."""
markdown: str
provider: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
data = asdict(self)
return {key: value for key, value in data.items() if value not in (None, {})}
class GenerationHook(Protocol):
"""Protocol implemented by optional external generation providers."""
def generate(self, request: GenerationHookRequest) -> GenerationHookResult:
"""Generate Markdown for a request."""
def load_data_file(path: str | Path) -> Any:
"""Load generation data from JSON, YAML, or CSV."""
file_path = Path(path)
suffix = file_path.suffix.lower()
if suffix == ".json":
return json.loads(file_path.read_text(encoding="utf-8"))
if suffix in {".yaml", ".yml"}:
return yaml.safe_load(file_path.read_text(encoding="utf-8")) or {}
if suffix == ".csv":
with file_path.open("r", encoding="utf-8", newline="") as handle:
return list(csv.DictReader(handle))
raise GenerationPlanError(f"Unsupported data file format: {file_path.suffix}")
def generate_stub_from_contract(
contract: DocumentContract,
*,
data: dict[str, Any] | None = None,
include_optional: bool = False,
) -> GeneratedDocument:
"""Generate a Markdown stub from a document contract."""
data = data or {}
frontmatter: dict[str, Any] = {}
if contract.document_type:
frontmatter["document_type"] = contract.document_type
for field_spec in contract.fields:
path = field_spec.path or (f"frontmatter.{field_spec.id}" if field_spec.id else "")
if not path.startswith("frontmatter.") or not field_spec.id:
continue
key_path = path.removeprefix("frontmatter.").split(".")
value = _value_for_field(field_spec, data)
_set_nested(frontmatter, key_path, value)
title = contract.title or contract.document_type or contract.id or "Generated Document"
parts = [_frontmatter_block(frontmatter), f"# {title}".strip()]
for section in contract.sections:
if section.presence == "forbidden":
continue
if section.presence == "optional" and not include_optional:
continue
heading_title = section.title or section.id or "Section"
level = section.level or 2
guidance = _section_guidance(section.raw.get("assertions"))
parts.extend(["", f"{'#' * level} {heading_title}", "", guidance or f"TODO: Add content for {heading_title}."])
markdown = "\n".join(part for part in parts if part is not None).rstrip() + "\n"
return GeneratedDocument(markdown=markdown, data=data)
def load_generation_plan_file(path: str | Path) -> GenerationPlan:
"""Load a generation plan from a Markdown file with a fenced YAML block."""
file_path = Path(path)
document = parse_markdown(file_path.read_text(encoding="utf-8"), source_path=str(file_path))
plan_data: dict[str, Any] | None = None
for token in document.tokens:
if token.get("type") != "fence":
continue
info = str(token.get("info", "")).strip().lower().split()
if "generation" not in info:
continue
if "yaml" not in info and "yml" not in info:
continue
loaded = yaml.safe_load(token.get("content", "")) or {}
if not isinstance(loaded, dict):
raise GenerationPlanError("Generation YAML block must be a mapping")
plan_data = loaded
break
if plan_data is None:
frontmatter_plan = document.frontmatter.get("generation")
if isinstance(frontmatter_plan, dict):
plan_data = frontmatter_plan
if not plan_data:
raise GenerationPlanError("No fenced ```yaml generation block found")
documents = plan_data.get("documents")
if documents is None:
documents = [plan_data]
if not isinstance(documents, list) or not all(isinstance(item, dict) for item in documents):
raise GenerationPlanError("Generation `documents` must be a list of mappings")
return GenerationPlan(documents=documents, source_path=str(file_path))
def run_generation_plan(
plan: GenerationPlan,
*,
base_dir: str | Path | None = None,
output_dir: str | Path | None = None,
dry_run: bool = False,
) -> GenerationResult:
"""Render every document described by a generation plan."""
base = Path(base_dir or Path(plan.source_path or ".").parent).resolve()
output_base = Path(output_dir).resolve() if output_dir else base
documents: list[GeneratedDocument] = []
for raw_doc in plan.documents:
template_path = _required_path(raw_doc, "template", base)
template_text = template_path.read_text(encoding="utf-8")
data = _data_for_plan_doc(raw_doc, base)
strict = bool(raw_doc.get("strict", True))
rendered = render_template(template_text, data, strict=strict)
output = raw_doc.get("output")
output_path: Path | None = None
if output:
output_path = (output_base / str(output)).resolve()
if not _is_within(output_path, output_base):
raise GenerationPlanError(f"Output path escapes output directory: {output}")
if not dry_run:
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(rendered.markdown, encoding="utf-8")
documents.append(
GeneratedDocument(
markdown=rendered.markdown,
output_path=str(output_path) if output_path else None,
source_template=str(template_path),
data=data,
missing_variables=rendered.missing_variables,
)
)
return GenerationResult(documents=documents, plan_path=plan.source_path)
def generate_with_hook(
request: GenerationHookRequest,
hook: GenerationHook,
) -> GenerationHookResult:
"""Run optional assisted generation through an external hook."""
return hook.generate(request)
def _data_for_plan_doc(raw_doc: dict[str, Any], base: Path) -> dict[str, Any]:
data: Any = {}
if "data_file" in raw_doc:
data = load_data_file((base / str(raw_doc["data_file"])).resolve())
if "data" in raw_doc:
inline_data = raw_doc["data"]
if not isinstance(inline_data, dict):
raise GenerationPlanError("Inline generation `data` must be a mapping")
if isinstance(data, dict):
data = _deep_merge(data, inline_data)
elif data:
raise GenerationPlanError("Cannot merge inline data into non-mapping data file")
else:
data = inline_data
if not isinstance(data, dict):
raise GenerationPlanError("Generation template data must be a mapping")
return data
def _required_path(raw_doc: dict[str, Any], key: str, base: Path) -> Path:
raw_path = raw_doc.get(key)
if not raw_path:
raise GenerationPlanError(f"Generation document requires `{key}`")
path = (base / str(raw_path)).resolve()
if not path.exists() or not path.is_file():
raise GenerationPlanError(f"Generation {key} not found: {path}")
return path
def _value_for_field(field_spec, data: dict[str, Any]) -> Any:
if field_spec.id and field_spec.id in data:
return data[field_spec.id]
if field_spec.path and field_spec.path.startswith("frontmatter."):
value = _get_nested(data, field_spec.path.removeprefix("frontmatter.").split("."))
if value is not _MISSING:
return value
if field_spec.default is not None:
return field_spec.default
if field_spec.type == "boolean":
return False
if field_spec.type in {"number", "integer"}:
return 0
if field_spec.type == "array":
return []
if field_spec.type == "object":
return {}
return f"TODO: {field_spec.id or 'value'}"
def _section_guidance(raw_assertions: Any) -> str | None:
if not isinstance(raw_assertions, list):
return None
guidance = []
for assertion in raw_assertions:
if isinstance(assertion, dict) and assertion.get("guidance"):
guidance.append(f"TODO: {assertion['guidance']}")
return "\n\n".join(guidance) if guidance else None
def _frontmatter_block(frontmatter: dict[str, Any]) -> str:
if not frontmatter:
return ""
return f"---\n{yaml.safe_dump(frontmatter, sort_keys=False).strip()}\n---\n"
def _set_nested(mapping: dict[str, Any], path: list[str], value: Any) -> None:
current = mapping
for part in path[:-1]:
nested = current.setdefault(part, {})
if not isinstance(nested, dict):
nested = {}
current[part] = nested
current = nested
current[path[-1]] = value
_MISSING = object()
def _get_nested(mapping: dict[str, Any], path: list[str]) -> Any:
current: Any = mapping
for part in path:
if isinstance(current, dict) and part in current:
current = current[part]
else:
return _MISSING
return current
def _deep_merge(left: dict[str, Any], right: dict[str, Any]) -> dict[str, Any]:
merged = dict(left)
for key, value in right.items():
if isinstance(merged.get(key), dict) and isinstance(value, dict):
merged[key] = _deep_merge(merged[key], value)
else:
merged[key] = value
return merged
def _is_within(path: Path, root: Path) -> bool:
try:
path.relative_to(root)
return True
except ValueError:
return False