Added deterministic function layer

This commit is contained in:
2026-05-04 19:26:25 +02:00
parent 3840ff4617
commit 1197b39a76
11 changed files with 1305 additions and 16 deletions

View File

@@ -20,6 +20,19 @@ from markitect_tool.contract import (
validate_contract,
validate_contract_file,
)
from markitect_tool.document_function import (
DocumentFunctionCall,
DocumentFunctionDescriptor,
DocumentFunctionError,
DocumentFunctionEvaluationResult,
DocumentFunctionParameter,
DocumentFunctionRegistry,
DocumentFunctionRun,
default_document_function_registry,
parse_document_function_calls,
render_document_functions,
validate_document_functions,
)
from markitect_tool.cache import (
CacheEntry,
CacheManifest,
@@ -220,6 +233,17 @@ __all__ = [
"load_contract_file",
"validate_contract",
"validate_contract_file",
"DocumentFunctionCall",
"DocumentFunctionDescriptor",
"DocumentFunctionError",
"DocumentFunctionEvaluationResult",
"DocumentFunctionParameter",
"DocumentFunctionRegistry",
"DocumentFunctionRun",
"default_document_function_registry",
"parse_document_function_calls",
"render_document_functions",
"validate_document_functions",
"CacheEntry",
"CacheManifest",
"CacheStatus",

View File

@@ -37,6 +37,13 @@ from markitect_tool.contract import (
load_contract_file,
validate_contract,
)
from markitect_tool.document_function import (
DocumentFunctionError,
default_document_function_registry,
render_document_functions,
validate_document_functions,
)
from markitect_tool.extension import ProcessingContext
from markitect_tool.explode import (
ExplodeError,
explode_markdown_file,
@@ -858,6 +865,77 @@ def policy_resource_manifest(manifest_file: Path, output_format: str) -> None:
_emit_resource_manifest_result({"manifest": manifest.to_dict()}, output_format)
@main.group("function")
def function_group() -> None:
"""Inspect and execute deterministic document functions."""
@function_group.command("list")
@click.option("--namespace", help="Only list functions in one namespace.")
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
def function_list(namespace: str | None, output_format: str) -> None:
"""List registered document functions."""
registry = default_document_function_registry()
functions = [descriptor.to_dict() for descriptor in registry.list(namespace=namespace)]
_emit_function_catalog({"count": len(functions), "functions": functions}, output_format)
@function_group.command("render")
@click.argument("file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
def function_render(file: Path, output_format: str) -> None:
"""Render deterministic document function calls in a Markdown file."""
try:
text = file.read_text(encoding="utf-8")
result = render_document_functions(text, context=ProcessingContext(source_path=file))
except DocumentFunctionError as exc:
raise click.ClickException(str(exc)) from exc
_emit_function_result(result.to_dict(), output_format)
raise click.exceptions.Exit(0 if result.valid else 1)
@function_group.command("check")
@click.argument("file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option("--allow", "allowed", multiple=True, help="Only allow this function id. May be repeated.")
@click.option("--forbid", "forbidden", multiple=True, help="Forbid this function id. May be repeated.")
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
def function_check(
file: Path,
allowed: tuple[str, ...],
forbidden: tuple[str, ...],
output_format: str,
) -> None:
"""Validate document function calls without rendering."""
try:
text = file.read_text(encoding="utf-8")
result = validate_document_functions(text, allowed=list(allowed), forbidden=list(forbidden))
except DocumentFunctionError as exc:
raise click.ClickException(str(exc)) from exc
_emit_function_check_result(result.to_dict(), output_format)
raise click.exceptions.Exit(0 if result.valid else 1)
@main.group("class")
def class_group() -> None:
"""Resolve deterministic content classes."""
@@ -1831,6 +1909,39 @@ def _emit_resource_manifest_result(data: dict, output_format: str) -> None:
click.echo(f"actions: {actions}")
def _emit_function_catalog(data: dict, output_format: str) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
elif output_format == "yaml":
click.echo(yaml.safe_dump(data, sort_keys=False))
else:
for function in data.get("functions", []):
click.echo(f"{function['id']}: {function.get('summary', '')}")
def _emit_function_result(data: dict, output_format: str) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
elif output_format == "yaml":
click.echo(yaml.safe_dump(data, sort_keys=False))
else:
click.echo(data.get("content", ""))
for diagnostic in data.get("diagnostics", []):
click.echo(f"[{diagnostic['severity']}] {diagnostic['code']}: {diagnostic['message']}")
def _emit_function_check_result(data: dict, output_format: str) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
elif output_format == "yaml":
click.echo(yaml.safe_dump(data, sort_keys=False))
else:
click.echo("valid" if data.get("valid") else "invalid")
click.echo(f"functions: {len(data.get('calls', []))}")
for diagnostic in data.get("diagnostics", []):
click.echo(f"- [{diagnostic['severity']}] {diagnostic['code']}: {diagnostic['message']}")
def _emit_metrics(data: dict, output_format: str) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))

View File

@@ -0,0 +1,791 @@
"""Markdown-native deterministic document functions."""
from __future__ import annotations
import json
import re
import shlex
from dataclasses import asdict, dataclass, field
from typing import Any, Callable
from markitect_tool.diagnostics import Diagnostic, SourceLocation, has_error
from markitect_tool.extension import (
ProcessingCapability,
ProcessingContext,
ProcessingProvenance,
ProcessingResult,
ProcessingTrace,
)
INLINE_CALL_RE = re.compile(r"\{\{mkt:(?P<body>.+?)\}\}", re.DOTALL)
FENCE_CALL_RE = re.compile(
r"```(?P<info>[^\n`]*)\n(?P<body>.*?)\n```",
re.DOTALL,
)
FunctionImplementation = Callable[..., Any]
class DocumentFunctionError(ValueError):
"""Raised when document function parsing or evaluation fails."""
@dataclass(frozen=True)
class DocumentFunctionParameter:
"""One declared document function parameter."""
name: str
kind: str = "string"
required: bool = True
default: Any = None
variadic: bool = False
description: str | None = None
def to_dict(self) -> dict[str, Any]:
return _drop_empty(asdict(self))
@dataclass(frozen=True)
class DocumentFunctionDescriptor:
"""Inspectable descriptor for a document function."""
id: str
summary: str
parameters: list[DocumentFunctionParameter] = field(default_factory=list)
output_type: str = "markdown"
execution: str = "deterministic"
capabilities: list[ProcessingCapability] = field(default_factory=list)
safety: dict[str, Any] = field(default_factory=dict)
examples: list[str] = field(default_factory=list)
metadata: dict[str, Any] = field(default_factory=dict)
implementation: FunctionImplementation | None = field(default=None, compare=False, repr=False)
@property
def namespace(self) -> str:
return self.id.split(".", 1)[0] if "." in self.id else "default"
def to_dict(self) -> dict[str, Any]:
return _drop_empty(
{
"id": self.id,
"namespace": self.namespace,
"summary": self.summary,
"parameters": [parameter.to_dict() for parameter in self.parameters],
"output_type": self.output_type,
"execution": self.execution,
"capabilities": [capability.to_dict() for capability in self.capabilities],
"safety": self.safety,
"examples": self.examples,
"metadata": self.metadata,
}
)
@dataclass(frozen=True)
class DocumentFunctionCall:
"""Parsed document function call."""
function_id: str
args: list[Any] = field(default_factory=list)
kwargs: dict[str, Any] = field(default_factory=dict)
body: str | None = None
raw: str = ""
inline: bool = True
line: int | None = None
pipeline: list["DocumentFunctionCall"] = field(default_factory=list)
def to_dict(self) -> dict[str, Any]:
data = asdict(self)
data["pipeline"] = [call.to_dict() for call in self.pipeline]
return _drop_empty(data)
@dataclass(frozen=True)
class DocumentFunctionRun:
"""One function call result."""
call: DocumentFunctionCall
output: Any = None
diagnostics: list[Diagnostic] = field(default_factory=list)
provenance: list[ProcessingProvenance] = field(default_factory=list)
trace: list[ProcessingTrace] = field(default_factory=list)
@property
def valid(self) -> bool:
return not has_error(self.diagnostics)
def to_dict(self) -> dict[str, Any]:
return _drop_empty(
{
"call": self.call.to_dict(),
"valid": self.valid,
"output": self.output,
"diagnostics": [diagnostic.to_dict() for diagnostic in self.diagnostics],
"provenance": [event.to_dict() for event in self.provenance],
"trace": [event.to_dict() for event in self.trace],
}
)
@dataclass(frozen=True)
class DocumentFunctionEvaluationResult:
"""Result of expanding document functions in a Markdown document."""
content: str
calls: list[DocumentFunctionRun] = field(default_factory=list)
diagnostics: list[Diagnostic] = field(default_factory=list)
provenance: list[ProcessingProvenance] = field(default_factory=list)
trace: list[ProcessingTrace] = field(default_factory=list)
@property
def valid(self) -> bool:
return not has_error(self.diagnostics)
def to_processing_result(self) -> ProcessingResult:
return ProcessingResult(
output={"content": self.content},
diagnostics=self.diagnostics,
provenance=self.provenance,
trace=self.trace,
metadata={"calls": [run.call.to_dict() for run in self.calls]},
)
def to_dict(self) -> dict[str, Any]:
return _drop_empty(
{
"valid": self.valid,
"content": self.content,
"calls": [run.to_dict() for run in self.calls],
"diagnostics": [diagnostic.to_dict() for diagnostic in self.diagnostics],
"provenance": [event.to_dict() for event in self.provenance],
"trace": [event.to_dict() for event in self.trace],
}
)
class DocumentFunctionRegistry:
"""Registry and evaluator for document functions."""
def __init__(
self,
descriptors: list[DocumentFunctionDescriptor] | None = None,
) -> None:
self._descriptors: dict[str, DocumentFunctionDescriptor] = {}
for descriptor in descriptors or []:
self.register(descriptor)
def register(self, descriptor: DocumentFunctionDescriptor) -> None:
if descriptor.id in self._descriptors:
raise DocumentFunctionError(f"Duplicate document function `{descriptor.id}`")
if descriptor.implementation is None:
raise DocumentFunctionError(f"Document function `{descriptor.id}` has no implementation")
self._descriptors[descriptor.id] = descriptor
def get(self, function_id: str) -> DocumentFunctionDescriptor:
try:
return self._descriptors[function_id]
except KeyError as exc:
raise DocumentFunctionError(f"Unknown document function `{function_id}`") from exc
def list(self, *, namespace: str | None = None) -> list[DocumentFunctionDescriptor]:
descriptors = [self._descriptors[key] for key in sorted(self._descriptors)]
if namespace is not None:
return [descriptor for descriptor in descriptors if descriptor.namespace == namespace]
return descriptors
def to_dict(self) -> dict[str, Any]:
return {
"count": len(self._descriptors),
"functions": [descriptor.to_dict() for descriptor in self.list()],
}
def evaluate_call(
self,
call: DocumentFunctionCall,
*,
context: ProcessingContext | None = None,
) -> DocumentFunctionRun:
context = context or ProcessingContext()
output: Any = None
diagnostics: list[Diagnostic] = []
provenance: list[ProcessingProvenance] = []
trace: list[ProcessingTrace] = []
calls = [call, *call.pipeline]
for index, current in enumerate(calls):
if index > 0:
current = DocumentFunctionCall(
function_id=current.function_id,
args=[output, *current.args],
kwargs=current.kwargs,
body=current.body,
raw=current.raw,
inline=current.inline,
line=current.line,
)
run = self._evaluate_single(current, context=context)
diagnostics.extend(run.diagnostics)
provenance.extend(run.provenance)
trace.extend(run.trace)
if not run.valid:
output = current.raw
break
output = run.output
return DocumentFunctionRun(
call=call,
output=output,
diagnostics=diagnostics,
provenance=provenance,
trace=trace,
)
def _evaluate_single(
self,
call: DocumentFunctionCall,
*,
context: ProcessingContext,
) -> DocumentFunctionRun:
try:
descriptor = self.get(call.function_id)
except DocumentFunctionError as exc:
return _call_error(call, "function.unknown", str(exc), context)
if descriptor.execution != "deterministic":
return _call_error(
call,
"function.execution_blocked",
f"Function `{descriptor.id}` is `{descriptor.execution}` and is not enabled.",
context,
details={"execution": descriptor.execution},
)
blocked = _blocked_capabilities(descriptor, context)
if blocked:
return _call_error(
call,
"function.capability_blocked",
f"Function `{descriptor.id}` requires blocked capabilities {blocked}.",
context,
details={"capabilities": blocked},
)
try:
args = [_resolve_value(arg, context) for arg in call.args]
kwargs = {key: _resolve_value(value, context) for key, value in call.kwargs.items()}
if call.body is not None:
kwargs.setdefault("body", _resolve_value(call.body, context))
_validate_arguments(descriptor, args, kwargs)
if descriptor.id == "data.get":
output = context.variables.get(str(args[0]), kwargs.get("default", ""))
raise _FunctionOutputReady(output)
assert descriptor.implementation is not None
output = descriptor.implementation(*args, **kwargs)
except _FunctionOutputReady as ready:
output = ready.output
except Exception as exc:
return _call_error(call, "function.evaluation_failed", str(exc), context)
provenance = [
ProcessingProvenance(
operation=f"document_function.{descriptor.id}",
source_path=str(context.source_path) if context.source_path else None,
metadata={
"function": descriptor.id,
"execution": descriptor.execution,
"output_type": descriptor.output_type,
},
)
]
trace = [
ProcessingTrace(
event="document_function.executed",
metadata={"function": descriptor.id, "line": call.line},
)
]
return DocumentFunctionRun(call=call, output=output, provenance=provenance, trace=trace)
def default_document_function_registry() -> DocumentFunctionRegistry:
"""Return built-in deterministic document functions."""
return DocumentFunctionRegistry(
[
_descriptor(
"text.upper",
"Uppercase text.",
_text_upper,
[DocumentFunctionParameter("value")],
examples=['{{mkt:text.upper "draft"}}'],
),
_descriptor(
"text.lower",
"Lowercase text.",
_text_lower,
[DocumentFunctionParameter("value")],
examples=['{{mkt:text.lower "DRAFT"}}'],
),
_descriptor(
"text.title",
"Title-case text.",
_text_title,
[DocumentFunctionParameter("value")],
examples=['{{mkt:text.title "release notes"}}'],
),
_descriptor(
"text.trim",
"Trim surrounding whitespace.",
_text_trim,
[DocumentFunctionParameter("value")],
examples=['{{mkt:text.trim " ok "}}'],
),
_descriptor(
"text.replace",
"Replace text deterministically.",
_text_replace,
[
DocumentFunctionParameter("value"),
DocumentFunctionParameter("old"),
DocumentFunctionParameter("new"),
],
examples=['{{mkt:text.replace "draft" draft final}}'],
),
_descriptor(
"text.join",
"Join text values.",
_text_join,
[
DocumentFunctionParameter("items", variadic=True),
DocumentFunctionParameter("sep", required=False, default=""),
],
examples=['{{mkt:text.join "A" "B" sep=", "}}'],
),
_descriptor(
"md.heading",
"Create a Markdown heading.",
_md_heading,
[
DocumentFunctionParameter("text", required=False),
DocumentFunctionParameter("level", kind="integer", required=False, default=2),
DocumentFunctionParameter("body", required=False),
],
examples=['{{mkt:md.heading text="Decision" level=2}}'],
),
_descriptor(
"md.bold",
"Create Markdown bold text.",
_md_bold,
[DocumentFunctionParameter("text")],
examples=['{{mkt:md.bold "Important"}}'],
),
_descriptor(
"md.link",
"Create a Markdown link.",
_md_link,
[DocumentFunctionParameter("text"), DocumentFunctionParameter("url")],
examples=['{{mkt:md.link "OpenAI" "https://openai.com"}}'],
),
_descriptor(
"md.codeblock",
"Create a fenced Markdown code block.",
_md_codeblock,
[
DocumentFunctionParameter("body", required=False),
DocumentFunctionParameter("lang", required=False, default=""),
],
examples=["```mkt-function md.codeblock lang=python\nprint('hi')\n```"],
),
_descriptor(
"data.get",
"Read a value from processing context variables.",
_data_get,
[DocumentFunctionParameter("key"), DocumentFunctionParameter("default", required=False, default="")],
examples=["{{mkt:data.get title}}"],
),
]
)
def parse_document_function_calls(text: str) -> list[DocumentFunctionCall]:
"""Parse inline and fenced document function calls."""
calls: list[DocumentFunctionCall] = []
for match in INLINE_CALL_RE.finditer(text):
line = _line_for_offset(text, match.start())
calls.append(_parse_call_expression(match.group("body"), raw=match.group(0), inline=True, line=line))
for match in FENCE_CALL_RE.finditer(text):
info = match.group("info").strip()
tokens = info.split(None, 1)
if not tokens or tokens[0] not in {"mkt-function", "markitect-function", "function"}:
continue
expression = tokens[1] if len(tokens) > 1 else ""
line = _line_for_offset(text, match.start())
calls.append(
_parse_call_expression(
expression,
raw=match.group(0),
inline=False,
line=line,
body=match.group("body"),
)
)
return calls
def render_document_functions(
text: str,
*,
registry: DocumentFunctionRegistry | None = None,
context: ProcessingContext | None = None,
) -> DocumentFunctionEvaluationResult:
"""Expand deterministic document functions in Markdown content."""
registry = registry or default_document_function_registry()
context = context or ProcessingContext()
runs: list[DocumentFunctionRun] = []
diagnostics: list[Diagnostic] = []
provenance: list[ProcessingProvenance] = []
trace: list[ProcessingTrace] = []
def replace_inline(match: re.Match[str]) -> str:
call = _parse_call_expression(
match.group("body"),
raw=match.group(0),
inline=True,
line=_line_for_offset(text, match.start()),
)
run = registry.evaluate_call(call, context=context)
runs.append(run)
diagnostics.extend(run.diagnostics)
provenance.extend(run.provenance)
trace.extend(run.trace)
if not run.valid:
return match.group(0)
return _format_function_output(run.output, inline=True)
content = INLINE_CALL_RE.sub(replace_inline, text)
def replace_fence(match: re.Match[str]) -> str:
info = match.group("info").strip()
tokens = info.split(None, 1)
if not tokens or tokens[0] not in {"mkt-function", "markitect-function", "function"}:
return match.group(0)
call = _parse_call_expression(
tokens[1] if len(tokens) > 1 else "",
raw=match.group(0),
inline=False,
line=_line_for_offset(text, match.start()),
body=match.group("body"),
)
run = registry.evaluate_call(call, context=context)
runs.append(run)
diagnostics.extend(run.diagnostics)
provenance.extend(run.provenance)
trace.extend(run.trace)
if not run.valid:
return match.group(0)
return _format_function_output(run.output, inline=False)
content = FENCE_CALL_RE.sub(replace_fence, content)
trace.append(ProcessingTrace(event="document_function.rendered", metadata={"calls": len(runs)}))
return DocumentFunctionEvaluationResult(
content=content,
calls=runs,
diagnostics=diagnostics,
provenance=provenance,
trace=trace,
)
def validate_document_functions(
text: str,
*,
registry: DocumentFunctionRegistry | None = None,
allowed: list[str] | None = None,
forbidden: list[str] | None = None,
) -> DocumentFunctionEvaluationResult:
"""Validate function calls without rendering the document."""
registry = registry or default_document_function_registry()
allowed_set = set(allowed or [])
forbidden_set = set(forbidden or [])
diagnostics: list[Diagnostic] = []
runs: list[DocumentFunctionRun] = []
for call in parse_document_function_calls(text):
if allowed_set and call.function_id not in allowed_set:
diagnostics.append(_diagnostic(call, "function.not_allowed", f"Function `{call.function_id}` is not allowed."))
if call.function_id in forbidden_set:
diagnostics.append(_diagnostic(call, "function.forbidden", f"Function `{call.function_id}` is forbidden."))
try:
descriptor = registry.get(call.function_id)
if descriptor.execution != "deterministic":
diagnostics.append(
_diagnostic(
call,
"function.unstable",
f"Function `{call.function_id}` is `{descriptor.execution}` and cannot run in deterministic contexts.",
)
)
except DocumentFunctionError as exc:
diagnostics.append(_diagnostic(call, "function.unknown", str(exc)))
runs.append(DocumentFunctionRun(call=call))
return DocumentFunctionEvaluationResult(content=text, calls=runs, diagnostics=diagnostics)
def _parse_call_expression(
expression: str,
*,
raw: str,
inline: bool,
line: int | None,
body: str | None = None,
) -> DocumentFunctionCall:
pipeline_parts = [part.strip() for part in expression.split("|") if part.strip()]
if not pipeline_parts:
raise DocumentFunctionError("Document function call is empty.")
first = _parse_single_call(pipeline_parts[0], raw=raw, inline=inline, line=line, body=body)
pipeline = [
_parse_single_call(part, raw=part, inline=inline, line=line)
for part in pipeline_parts[1:]
]
return DocumentFunctionCall(
function_id=first.function_id,
args=first.args,
kwargs=first.kwargs,
body=first.body,
raw=raw,
inline=inline,
line=line,
pipeline=pipeline,
)
def _parse_single_call(
expression: str,
*,
raw: str,
inline: bool,
line: int | None,
body: str | None = None,
) -> DocumentFunctionCall:
try:
parts = shlex.split(expression)
except ValueError as exc:
raise DocumentFunctionError(f"Invalid function syntax: {exc}") from exc
if not parts:
raise DocumentFunctionError("Document function call is empty.")
function_id = parts[0]
args: list[Any] = []
kwargs: dict[str, Any] = {}
for token in parts[1:]:
if "=" in token and not token.startswith("="):
key, value = token.split("=", 1)
kwargs[key.replace("-", "_")] = _parse_literal(value)
else:
args.append(_parse_literal(token))
return DocumentFunctionCall(
function_id=function_id,
args=args,
kwargs=kwargs,
body=body,
raw=raw,
inline=inline,
line=line,
)
def _descriptor(
function_id: str,
summary: str,
implementation: FunctionImplementation,
parameters: list[DocumentFunctionParameter],
*,
output_type: str = "markdown",
examples: list[str] | None = None,
) -> DocumentFunctionDescriptor:
return DocumentFunctionDescriptor(
id=function_id,
summary=summary,
parameters=parameters,
output_type=output_type,
capabilities=[
ProcessingCapability(id="document_function", kind="execute"),
ProcessingCapability(id="deterministic", kind="execution"),
],
safety={"network": False, "filesystem": False, "assisted_generation": False},
examples=examples or [],
implementation=implementation,
)
def _validate_arguments(
descriptor: DocumentFunctionDescriptor,
args: list[Any],
kwargs: dict[str, Any],
) -> None:
required = [parameter for parameter in descriptor.parameters if parameter.required and not parameter.variadic]
positional = [parameter for parameter in descriptor.parameters if not parameter.variadic]
variadic = next((parameter for parameter in descriptor.parameters if parameter.variadic), None)
if len(args) > len(positional) and variadic is None:
raise DocumentFunctionError(f"Function `{descriptor.id}` received too many positional arguments.")
for index, parameter in enumerate(required):
if index < len(args) or parameter.name in kwargs:
continue
raise DocumentFunctionError(f"Function `{descriptor.id}` requires `{parameter.name}`.")
def _blocked_capabilities(
descriptor: DocumentFunctionDescriptor,
context: ProcessingContext,
) -> list[str]:
blocked = []
policy = context.policy or {}
blocked_ids = set(policy.get("blocked_capabilities") or [])
for capability in descriptor.capabilities:
if capability.id in blocked_ids:
blocked.append(capability.id)
if descriptor.safety.get("network") and policy.get("network") is False:
blocked.append("network")
if descriptor.safety.get("filesystem") and policy.get("filesystem") is False:
blocked.append("filesystem")
if descriptor.safety.get("assisted_generation") and policy.get("assisted_generation") is False:
blocked.append("assisted_generation")
return sorted(set(blocked))
def _resolve_value(value: Any, context: ProcessingContext) -> Any:
if isinstance(value, str):
if value.startswith("${") and value.endswith("}"):
key = value[2:-1].strip()
return context.variables.get(key, "")
return value
def _format_function_output(value: Any, *, inline: bool) -> str:
if isinstance(value, str):
return value
if isinstance(value, list):
return ", ".join(str(item) for item in value) if inline else "\n".join(str(item) for item in value)
if isinstance(value, dict):
return json.dumps(value, sort_keys=True, ensure_ascii=False)
return "" if value is None else str(value)
def _parse_literal(value: str) -> Any:
lowered = value.lower()
if lowered == "true":
return True
if lowered == "false":
return False
if lowered in {"null", "none"}:
return None
try:
return int(value)
except ValueError:
pass
return value
def _call_error(
call: DocumentFunctionCall,
code: str,
message: str,
context: ProcessingContext,
details: dict[str, Any] | None = None,
) -> DocumentFunctionRun:
return DocumentFunctionRun(
call=call,
diagnostics=[
Diagnostic(
severity="error",
code=code,
message=message,
source=SourceLocation(
path=str(context.source_path) if context.source_path else None,
line=call.line,
)
if context.source_path or call.line
else None,
details=details or {"function": call.function_id},
)
],
)
def _diagnostic(
call: DocumentFunctionCall,
code: str,
message: str,
) -> Diagnostic:
return Diagnostic(
severity="error",
code=code,
message=message,
source=SourceLocation(line=call.line) if call.line else None,
details={"function": call.function_id},
)
def _line_for_offset(text: str, offset: int) -> int:
return text.count("\n", 0, offset) + 1
def _text_upper(value: Any) -> str:
return str(value).upper()
def _text_lower(value: Any) -> str:
return str(value).lower()
def _text_title(value: Any) -> str:
return str(value).title()
def _text_trim(value: Any) -> str:
return str(value).strip()
def _text_replace(value: Any, old: Any, new: Any) -> str:
return str(value).replace(str(old), str(new))
def _text_join(*items: Any, sep: str = "") -> str:
return str(sep).join(str(item) for item in items)
def _md_heading(text: Any = None, *, level: int = 2, body: Any = None) -> str:
heading = str(text if text is not None else body if body is not None else "").strip()
depth = max(1, min(6, int(level)))
return f"{'#' * depth} {heading}"
def _md_bold(text: Any) -> str:
return f"**{text}**"
def _md_link(text: Any, url: Any) -> str:
return f"[{text}]({url})"
def _md_codeblock(body: Any = "", *, lang: str = "") -> str:
info = str(lang).strip()
return f"```{info}\n{body}\n```"
def _data_get(key: Any, default: Any = "", *, body: Any = None) -> Any:
return body if body is not None else default if str(key).startswith("$") else key
class _FunctionOutputReady(Exception):
def __init__(self, output: Any) -> None:
self.output = output
def _drop_empty(data: dict[str, Any]) -> dict[str, Any]:
return {
key: value
for key, value in data.items()
if value not in (None, [], {}, "")
}

View File

@@ -18,6 +18,7 @@ def builtin_extension_registry() -> ExtensionRegistry:
_runtime_form_state_descriptor(),
_runtime_assessment_descriptor(),
_local_label_policy_descriptor(),
_document_function_descriptor(),
]:
registry.register(descriptor)
return registry
@@ -233,3 +234,34 @@ def _local_label_policy_descriptor() -> ExtensionDescriptor:
]
},
)
def _document_function_descriptor() -> ExtensionDescriptor:
return ExtensionDescriptor(
id="document.function",
kind="document-function",
summary="Markdown-native deterministic document function registry and evaluator.",
capabilities=[
ProcessingCapability(id="document_function", kind="execute"),
ProcessingCapability(id="deterministic", kind="execution"),
ProcessingCapability(id="diagnostics", kind="emit"),
ProcessingCapability(id="provenance", kind="emit"),
],
safety={
"network": False,
"filesystem": False,
"assisted_generation": False,
"external_process": False,
},
input_contract="Markdown with {{mkt:function ...}} or mkt-function fences",
output_contract="DocumentFunctionEvaluationResult",
diagnostics_namespace="document_function",
provenance_prefix="document_function",
cli={"commands": ["mkt function list", "mkt function check", "mkt function render"]},
docs=["docs/document-functions.md"],
examples=["examples/functions/basic-functions.md"],
metadata={
"execution": "deterministic-only",
"external_policy_services_required": False,
},
)