Files
markitect-tool/src/markitect_tool/cli/main.py

2252 lines
74 KiB
Python

"""`mkt` command entry point."""
from __future__ import annotations
import json
from pathlib import Path
import click
import yaml
from markitect_tool.cache import (
build_cache,
cache_path_for,
detect_changes,
fingerprint_file,
load_cache,
save_cache,
)
from markitect_tool.backend import (
BackendRegistryError,
LocalSnapshotStore,
load_backend_registry,
load_snapshot_state_file,
local_index_path_for,
plan_snapshot_refresh,
snapshot_identity_for_file,
)
from markitect_tool.content_class import (
ContentClassResolutionError,
load_content_class_file,
)
from markitect_tool.core import Document, parse_markdown_file
from markitect_tool.contract import (
ContractLoaderError,
check_markdown_file,
collect_metrics,
load_contract_file,
validate_contract,
)
from markitect_tool.explode import (
ExplodeError,
explode_markdown_file,
implode_markdown_directory,
)
from markitect_tool.generation import (
GenerationPlanError,
generate_stub_from_contract,
load_data_file,
load_generation_plan_file,
run_generation_plan,
)
from markitect_tool.literate import tangle_markdown, weave_markdown, write_tangle_files
from markitect_tool.ops import IncludeError, compose_files, resolve_includes, transform_markdown
from markitect_tool.processor import ProcessorContext, run_fenced_processors
from markitect_tool.policy import (
EnterprisePolicyError,
FlexAuthResourceManifest,
LocalLabelPolicyGateway,
load_enterprise_policy_subject,
)
from markitect_tool.query import (
InvalidQueryError,
extract_document,
extract_document_jsonpath,
query_document,
query_document_jsonpath,
)
from markitect_tool.reference import (
ReferenceContext,
ReferenceResolutionError,
load_namespaces,
resolve_reference,
)
from markitect_tool.runtime import evaluate_form_state, load_runtime_context_file
from markitect_tool.schema import load_schema_file, validate_markdown_file, validate_schema
from markitect_tool.template import (
MissingTemplateVariable,
TemplateError,
analyze_template,
render_template,
)
from markitect_tool.workflow import WorkflowError, WorkflowRunner, load_workflow_file
@click.group()
@click.version_option()
def main() -> None:
"""Markdown-native toolkit for structured knowledge artifacts."""
@main.command()
@click.argument("file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "tree"], case_sensitive=False),
default="json",
show_default=True,
)
def parse(file: Path, output_format: str) -> None:
"""Parse a Markdown file into a structured representation."""
document = parse_markdown_file(file)
data = document.to_dict()
if output_format == "yaml":
click.echo(yaml.safe_dump(data, sort_keys=False))
elif output_format == "tree":
for heading in document.headings:
click.echo(f"{'#' * heading.level} {heading.text}")
else:
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
@main.group()
def ast() -> None:
"""Inspect parsed Markdown ASTs and parser summaries."""
@ast.command("show")
@click.argument("file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "tree"], case_sensitive=False),
default="json",
show_default=True,
)
def ast_show(file: Path, output_format: str) -> None:
"""Show a parsed Markdown AST without requiring a cache."""
document = parse_markdown_file(file)
data = document.to_dict()
if output_format == "yaml":
click.echo(yaml.safe_dump(data, sort_keys=False))
elif output_format == "tree":
for heading in document.headings:
click.echo(f"{'#' * heading.level} {heading.text}")
else:
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
@ast.command("stats")
@click.argument("file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
def ast_stats(file: Path, output_format: str) -> None:
"""Summarize parsed Markdown AST shape and token distribution."""
document = parse_markdown_file(file)
data = _ast_stats(document.to_dict(), str(file))
_emit_ast_stats(data, output_format)
@main.command()
@click.argument("file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
def metrics(file: Path, output_format: str) -> None:
"""Report practical size and complexity metrics for a Markdown file."""
document = parse_markdown_file(file)
data = collect_metrics(document).to_dict() | {"document_path": str(file)}
_emit_metrics(data, output_format)
@main.command()
@click.argument("file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.argument("selector")
@click.option(
"--engine",
type=click.Choice(["selector", "jsonpath"], case_sensitive=False),
default="selector",
show_default=True,
help="Query engine to use.",
)
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="json",
show_default=True,
)
def query(file: Path, selector: str, engine: str, output_format: str) -> None:
"""Query structured Markdown content with a small selector."""
document = parse_markdown_file(file)
try:
matches = (
query_document_jsonpath(document, selector)
if engine == "jsonpath"
else query_document(document, selector)
)
except InvalidQueryError as exc:
raise click.ClickException(str(exc)) from exc
data = {
"selector": selector,
"engine": engine,
"document_path": str(file),
"count": len(matches),
"matches": [match.to_dict() for match in matches],
}
_emit_query(data, output_format)
@main.command()
@click.argument("file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.argument("selector")
@click.option(
"--engine",
type=click.Choice(["selector", "jsonpath"], case_sensitive=False),
default="selector",
show_default=True,
help="Query engine to use.",
)
@click.option(
"--format",
"output_format",
type=click.Choice(["text", "json", "yaml"], case_sensitive=False),
default="text",
show_default=True,
)
def extract(file: Path, selector: str, engine: str, output_format: str) -> None:
"""Extract text or Markdown content from structured Markdown."""
document = parse_markdown_file(file)
try:
items = (
extract_document_jsonpath(document, selector)
if engine == "jsonpath"
else extract_document(document, selector)
)
except InvalidQueryError as exc:
raise click.ClickException(str(exc)) from exc
data = {
"selector": selector,
"engine": engine,
"document_path": str(file),
"count": len(items),
"items": items,
}
_emit_extract(data, output_format)
@main.command()
@click.argument("file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option("--strip-frontmatter", is_flag=True, help="Remove YAML frontmatter.")
@click.option(
"--set",
"set_values",
multiple=True,
metavar="KEY=VALUE",
help="Set a frontmatter value. Dot paths create nested mappings.",
)
@click.option(
"--heading-delta",
type=int,
default=0,
show_default=True,
help="Shift ATX heading levels, clamped to 1..6.",
)
@click.option("--extract", "extract_selector", help="Replace content with selector output.")
@click.option(
"--output",
type=click.Path(dir_okay=False, path_type=Path),
help="Write transformed Markdown to a file.",
)
@click.option(
"--format",
"output_format",
type=click.Choice(["markdown", "json", "yaml"], case_sensitive=False),
default="markdown",
show_default=True,
)
def transform(
file: Path,
strip_frontmatter: bool,
set_values: tuple[str, ...],
heading_delta: int,
extract_selector: str | None,
output: Path | None,
output_format: str,
) -> None:
"""Apply deterministic transforms to a Markdown file."""
try:
frontmatter_updates = _parse_key_value_options(set_values)
result = transform_markdown(
file.read_text(encoding="utf-8"),
strip_frontmatter=strip_frontmatter,
set_frontmatter=frontmatter_updates,
heading_delta=heading_delta,
extract_selector=extract_selector,
source_path=str(file),
)
except (InvalidQueryError, ValueError) as exc:
raise click.ClickException(str(exc)) from exc
_emit_markdown_result(result.to_dict(), output_format, output)
@main.command()
@click.argument(
"files",
nargs=-1,
required=True,
type=click.Path(exists=True, dir_okay=False, path_type=Path),
)
@click.option("--title", help="Add a top-level title before composed files.")
@click.option(
"--heading-delta",
type=int,
default=0,
show_default=True,
help="Shift heading levels in each input before composing.",
)
@click.option(
"--include-frontmatter",
is_flag=True,
help="Keep each input file's frontmatter in the composed body.",
)
@click.option(
"--output",
type=click.Path(dir_okay=False, path_type=Path),
help="Write composed Markdown to a file.",
)
@click.option(
"--format",
"output_format",
type=click.Choice(["markdown", "json", "yaml"], case_sensitive=False),
default="markdown",
show_default=True,
)
def compose(
files: tuple[Path, ...],
title: str | None,
heading_delta: int,
include_frontmatter: bool,
output: Path | None,
output_format: str,
) -> None:
"""Compose multiple Markdown files into one document."""
result = compose_files(
list(files),
title=title,
heading_delta=heading_delta,
include_frontmatter=include_frontmatter,
)
_emit_markdown_result(result.to_dict(), output_format, output)
@main.command()
@click.argument("file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option(
"--base-dir",
type=click.Path(exists=True, file_okay=False, path_type=Path),
help="Directory includes must stay within. Defaults to the input file directory.",
)
@click.option(
"--max-depth",
type=int,
default=10,
show_default=True,
help="Maximum recursive include depth.",
)
@click.option(
"--output",
type=click.Path(dir_okay=False, path_type=Path),
help="Write resolved Markdown to a file.",
)
@click.option(
"--format",
"output_format",
type=click.Choice(["markdown", "json", "yaml"], case_sensitive=False),
default="markdown",
show_default=True,
)
def include(
file: Path,
base_dir: Path | None,
max_depth: int,
output: Path | None,
output_format: str,
) -> None:
"""Resolve Markdown include markers in a document."""
try:
result = resolve_includes(
file.read_text(encoding="utf-8"),
base_dir=base_dir or file.parent,
current_path=file,
max_depth=max_depth,
)
except IncludeError as exc:
raise click.ClickException(str(exc)) from exc
_emit_markdown_result(result.to_dict(), output_format, output)
@main.command()
@click.argument("file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option(
"--output-dir",
required=True,
type=click.Path(file_okay=False, path_type=Path),
help="Directory to write exploded Markdown files and manifest into.",
)
@click.option(
"--variant",
type=click.Choice(["flat", "hierarchical"], case_sensitive=False),
default="flat",
show_default=True,
)
@click.option("--force", is_flag=True, help="Allow writing into a non-empty output directory.")
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
def explode(
file: Path,
output_dir: Path,
variant: str,
force: bool,
output_format: str,
) -> None:
"""Explode a Markdown file into reversible section files."""
try:
result = explode_markdown_file(file, output_dir, variant=variant, overwrite=force)
except ExplodeError as exc:
raise click.ClickException(str(exc)) from exc
_emit_explode_result(result.to_dict(), output_format)
@main.command()
@click.argument("directory", type=click.Path(exists=True, file_okay=False, path_type=Path))
@click.option(
"--manifest",
"manifest_path",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
help="Manifest path. Defaults to markitect-explode.yaml in the input directory.",
)
@click.option(
"--output",
type=click.Path(dir_okay=False, path_type=Path),
help="Write imploded Markdown to a file.",
)
@click.option(
"--format",
"output_format",
type=click.Choice(["markdown", "json", "yaml"], case_sensitive=False),
default="markdown",
show_default=True,
)
def implode(
directory: Path,
manifest_path: Path | None,
output: Path | None,
output_format: str,
) -> None:
"""Implode a Markdown directory created by `mkt explode`."""
try:
result = implode_markdown_directory(directory, manifest_path=manifest_path)
except ExplodeError as exc:
raise click.ClickException(str(exc)) from exc
_emit_markdown_result(result.to_dict(), output_format, output)
@main.group("ref")
def ref_group() -> None:
"""Resolve namespaced Markdown content references."""
@ref_group.command("resolve")
@click.argument("context_file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.argument("reference")
@click.option(
"--root",
type=click.Path(exists=True, file_okay=False, path_type=Path),
default=Path("."),
show_default=True,
help="Root that relative paths and namespaces must stay within.",
)
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
def ref_resolve(context_file: Path, reference: str, root: Path, output_format: str) -> None:
"""Resolve a content reference using a Markdown document as context."""
context_document = parse_markdown_file(context_file)
context = ReferenceContext.from_document(
context_document,
root=root,
current_path=context_file,
)
try:
resolution = resolve_reference(reference, context=context)
except ReferenceResolutionError as exc:
raise click.ClickException(str(exc)) from exc
_emit_reference_result(resolution.to_dict(), output_format)
@main.command("process")
@click.argument("file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option(
"--root",
type=click.Path(exists=True, file_okay=False, path_type=Path),
default=Path("."),
show_default=True,
help="Root used for relative processor references.",
)
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
def process(file: Path, root: Path, output_format: str) -> None:
"""Run deterministic fenced-block processors in a Markdown file."""
document = parse_markdown_file(file)
context = ProcessorContext(
root=root,
current_path=file,
namespaces=load_namespaces(document.frontmatter),
)
result = run_fenced_processors(
file.read_text(encoding="utf-8"),
context=context,
source_path=file,
)
_emit_processor_run(result.to_dict(), output_format)
raise click.exceptions.Exit(0 if result.valid else 1)
@main.group()
def backend() -> None:
"""Inspect optional backend manifests and snapshot identities."""
@backend.command("list")
@click.option(
"--path",
"paths",
multiple=True,
type=click.Path(path_type=Path),
help="Backend manifest file or directory. Defaults to .markitect/backends and .markitect/backend.yaml.",
)
@click.option("--capability", help="Only show backends that declare this capability.")
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
def backend_list(paths: tuple[Path, ...], capability: str | None, output_format: str) -> None:
"""List registered optional backend manifests."""
try:
registry = load_backend_registry(list(paths) or None)
except BackendRegistryError as exc:
raise click.ClickException(str(exc)) from exc
manifests = (
registry.find_by_capability(capability.replace("-", "_").lower())
if capability
else registry.list()
)
data = {
"count": len(manifests),
"backends": [manifest.to_dict() for manifest in manifests],
}
_emit_backend_list(data, output_format)
@backend.command("inspect")
@click.argument("backend_id")
@click.option(
"--path",
"paths",
multiple=True,
type=click.Path(path_type=Path),
help="Backend manifest file or directory. Defaults to .markitect/backends and .markitect/backend.yaml.",
)
@click.option(
"--require",
"required_capabilities",
multiple=True,
help="Required capability to check. May be repeated.",
)
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
def backend_inspect(
backend_id: str,
paths: tuple[Path, ...],
required_capabilities: tuple[str, ...],
output_format: str,
) -> None:
"""Inspect one backend manifest and optional compatibility check."""
try:
registry = load_backend_registry(list(paths) or None)
manifest = registry.get(backend_id)
except BackendRegistryError as exc:
raise click.ClickException(str(exc)) from exc
data = manifest.to_dict()
if required_capabilities:
data["capability_check"] = manifest.check(list(required_capabilities)).to_dict()
_emit_backend_manifest(data, output_format)
@backend.command("snapshot-id")
@click.argument("file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option(
"--parse-option",
"parse_options",
multiple=True,
metavar="KEY=VALUE",
help="Parse option included in the snapshot identity hash.",
)
@click.option("--contract-hash", help="Optional contract hash included in the snapshot identity.")
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
def backend_snapshot_id(
file: Path,
parse_options: tuple[str, ...],
contract_hash: str | None,
output_format: str,
) -> None:
"""Compute a read-only content-addressed snapshot identity for a file."""
try:
identity = snapshot_identity_for_file(
file,
parse_options=_parse_key_value_options(parse_options),
contract_hash=contract_hash,
)
except ValueError as exc:
raise click.ClickException(str(exc)) from exc
data = identity.to_dict() | {"snapshot_id": identity.snapshot_id}
_emit_snapshot_identity(data, output_format)
@backend.command("refresh-plan")
@click.argument("paths", nargs=-1, required=True, type=click.Path(exists=True, path_type=Path))
@click.option(
"--root",
type=click.Path(exists=True, file_okay=False, path_type=Path),
default=Path("."),
show_default=True,
help="Root used for relative source paths.",
)
@click.option(
"--state",
"state_file",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
help="YAML/JSON snapshot state file from a previous backend run.",
)
@click.option("--no-recursive", is_flag=True, help="Do not recurse into directories.")
@click.option(
"--verify-hashes",
is_flag=True,
help="Hash metadata-changed files to avoid unnecessary parse/index work.",
)
@click.option(
"--parse-option",
"parse_options",
multiple=True,
metavar="KEY=VALUE",
help="Parse option included in the identity comparison.",
)
@click.option("--contract-hash", help="Optional contract hash included in identity comparison.")
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
def backend_refresh_plan(
paths: tuple[Path, ...],
root: Path,
state_file: Path | None,
no_recursive: bool,
verify_hashes: bool,
parse_options: tuple[str, ...],
contract_hash: str | None,
output_format: str,
) -> None:
"""Plan cheap-first snapshot and index refresh work."""
try:
previous = load_snapshot_state_file(state_file) if state_file else []
plan = plan_snapshot_refresh(
list(paths),
previous=previous,
root=root,
recursive=not no_recursive,
parse_options=_parse_key_value_options(parse_options),
contract_hash=contract_hash,
verify_hashes=verify_hashes,
)
except (ValueError, TypeError) as exc:
raise click.ClickException(str(exc)) from exc
_emit_refresh_plan(plan.to_dict(), output_format)
raise click.exceptions.Exit(1 if plan.dirty else 0)
@main.group()
def policy() -> None:
"""Check local access policy decisions."""
@policy.command("check")
@click.argument("subject")
@click.argument("action")
@click.argument("object_id")
@click.option(
"--policy",
"policy_file",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
help="Local label policy file.",
)
@click.option("--label", "labels", multiple=True, help="Object policy label. May be repeated.")
@click.option("--path", "object_path", help="Object path for path ACL and path-label rules.")
@click.option("--trust-zone", help="Object trust zone.")
@click.option(
"--policy-mode",
type=click.Choice(["off", "audit", "enforce"], case_sensitive=False),
help="Override policy mode for this check.",
)
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
def policy_check(
subject: str,
action: str,
object_id: str,
policy_file: Path | None,
labels: tuple[str, ...],
object_path: str | None,
trust_zone: str | None,
policy_mode: str | None,
output_format: str,
) -> None:
"""Authorize one subject/action/object tuple with local label policy."""
try:
gateway = _load_policy_gateway(policy_file, policy_mode) or LocalLabelPolicyGateway()
decision = gateway.authorize(
subject,
action,
object_id,
context={
"object": {
"labels": list(labels),
"path": object_path,
"trust_zone": trust_zone,
}
},
)
except ValueError as exc:
raise click.ClickException(str(exc)) from exc
_emit_policy_result({"decision": decision}, output_format)
raise click.exceptions.Exit(0 if decision.get("allowed") else 1)
@policy.command("subject")
@click.argument("claims_file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option(
"--policy-map",
"policy_map_file",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
required=True,
help="Enterprise policy map file.",
)
@click.option("--group", "groups", multiple=True, help="Additional resolved group. May be repeated.")
@click.option(
"--environment",
type=click.Choice(["development", "test", "production"], case_sensitive=False),
help="Validation environment for issuer safety checks.",
)
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
def policy_subject(
claims_file: Path,
policy_map_file: Path,
groups: tuple[str, ...],
environment: str | None,
output_format: str,
) -> None:
"""Map enterprise identity claims into a Markitect policy subject."""
try:
subject = load_enterprise_policy_subject(
claims_file,
policy_map_file,
extra_groups=list(groups),
environment=environment,
)
except EnterprisePolicyError as exc:
raise click.ClickException(str(exc)) from exc
_emit_subject_result({"subject": subject.to_dict()}, output_format)
@policy.command("resource-manifest")
@click.argument("manifest_file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
def policy_resource_manifest(manifest_file: Path, output_format: str) -> None:
"""Inspect a Markitect flex-auth resource registration manifest."""
try:
manifest = FlexAuthResourceManifest.from_file(manifest_file)
except EnterprisePolicyError as exc:
raise click.ClickException(str(exc)) from exc
_emit_resource_manifest_result({"manifest": manifest.to_dict()}, output_format)
@main.group("class")
def class_group() -> None:
"""Resolve deterministic content classes."""
@class_group.command("resolve")
@click.argument("class_file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.argument("class_name")
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
def class_resolve(class_file: Path, class_name: str, output_format: str) -> None:
"""Resolve content class inheritance and merged slots."""
try:
registry = load_content_class_file(class_file)
result = registry.compose(class_name)
except ContentClassResolutionError as exc:
raise click.ClickException(str(exc)) from exc
_emit_content_class_result(result.to_dict(), output_format)
raise click.exceptions.Exit(0 if result.valid else 1)
@main.command()
@click.argument("file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option(
"--output-dir",
type=click.Path(file_okay=False, path_type=Path),
help="Write tangled files under this directory. Omit for dry JSON/YAML/text output.",
)
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
def tangle(file: Path, output_dir: Path | None, output_format: str) -> None:
"""Tangle named Markdown code chunks into target files."""
result = tangle_markdown(file.read_text(encoding="utf-8"), source_path=file)
data = result.to_dict()
if output_dir and result.valid:
data["written_files"] = write_tangle_files(result, output_dir)
_emit_tangle_result(data, output_format)
raise click.exceptions.Exit(0 if result.valid else 1)
@main.command()
@click.argument("file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option(
"--output",
type=click.Path(dir_okay=False, path_type=Path),
help="Write woven Markdown to a file.",
)
@click.option(
"--format",
"output_format",
type=click.Choice(["markdown", "json", "yaml"], case_sensitive=False),
default="markdown",
show_default=True,
)
def weave(file: Path, output: Path | None, output_format: str) -> None:
"""Weave Markdown documentation with a deterministic chunk index."""
result = weave_markdown(file.read_text(encoding="utf-8"), source_path=file)
_emit_markdown_result(result.to_dict(), output_format, output)
@main.group()
def cache() -> None:
"""Fingerprint Markdown files and detect changed inputs."""
@cache.command("init")
@click.option(
"--root",
type=click.Path(exists=True, file_okay=False, path_type=Path),
default=Path("."),
show_default=True,
help="Root used for the default local index path.",
)
@click.option(
"--index-path",
type=click.Path(dir_okay=False, path_type=Path),
help="SQLite index path. Defaults to .markitect/cache/index.sqlite3 under root.",
)
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
def cache_init(root: Path, index_path: Path | None, output_format: str) -> None:
"""Initialize the local SQLite snapshot/index store."""
resolved_index = local_index_path_for(root, index_path)
store = LocalSnapshotStore(resolved_index)
store.initialize()
data = {
"index_path": str(resolved_index),
"schema_version": "1",
"sources": len(store.load_state()),
}
_emit_local_index_data(data, output_format)
@cache.command("fingerprint")
@click.argument("file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option(
"--root",
type=click.Path(exists=True, file_okay=False, path_type=Path),
default=Path("."),
show_default=True,
help="Root used for relative cache paths.",
)
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="json",
show_default=True,
)
def cache_fingerprint(file: Path, root: Path, output_format: str) -> None:
"""Fingerprint one Markdown file."""
entry = fingerprint_file(file, root=root)
_emit_cache_data(entry.to_dict(), output_format)
@cache.command("build")
@click.argument("paths", nargs=-1, required=True, type=click.Path(exists=True, path_type=Path))
@click.option(
"--root",
type=click.Path(exists=True, file_okay=False, path_type=Path),
default=Path("."),
show_default=True,
help="Root used for relative cache paths.",
)
@click.option(
"--cache-path",
type=click.Path(dir_okay=False, path_type=Path),
help="Cache manifest path. Defaults to .markitect/cache/manifest.json under root.",
)
@click.option("--no-recursive", is_flag=True, help="Do not recurse into directories.")
@click.option("--dry-run", is_flag=True, help="Report manifest without writing it.")
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
def cache_build(
paths: tuple[Path, ...],
root: Path,
cache_path: Path | None,
no_recursive: bool,
dry_run: bool,
output_format: str,
) -> None:
"""Build or refresh a lightweight Markdown cache manifest."""
manifest = build_cache(list(paths), root=root, recursive=not no_recursive)
manifest_path = cache_path_for(root, cache_path)
if not dry_run:
save_cache(manifest, manifest_path)
data = manifest.to_dict() | {
"cache_path": str(manifest_path),
"written": not dry_run,
"count": len(manifest.entries),
}
_emit_cache_data(data, output_format)
@cache.command("status")
@click.argument("paths", nargs=-1, required=True, type=click.Path(exists=True, path_type=Path))
@click.option(
"--root",
type=click.Path(exists=True, file_okay=False, path_type=Path),
default=Path("."),
show_default=True,
help="Root used for relative cache paths.",
)
@click.option(
"--cache-path",
type=click.Path(dir_okay=False, path_type=Path),
help="Cache manifest path. Defaults to .markitect/cache/manifest.json under root.",
)
@click.option("--no-recursive", is_flag=True, help="Do not recurse into directories.")
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
def cache_status(
paths: tuple[Path, ...],
root: Path,
cache_path: Path | None,
no_recursive: bool,
output_format: str,
) -> None:
"""Report changed, new, unchanged, and deleted Markdown files."""
manifest_path = cache_path_for(root, cache_path)
manifest = load_cache(manifest_path)
status = detect_changes(manifest, list(paths), root=root, recursive=not no_recursive)
data = status.to_dict() | {"cache_path": str(manifest_path)}
_emit_cache_data(data, output_format)
raise click.exceptions.Exit(1 if status.dirty else 0)
@cache.command("index")
@click.argument("paths", nargs=-1, required=True, type=click.Path(exists=True, path_type=Path))
@click.option(
"--root",
type=click.Path(exists=True, file_okay=False, path_type=Path),
default=Path("."),
show_default=True,
help="Root used for relative index paths.",
)
@click.option(
"--index-path",
type=click.Path(dir_okay=False, path_type=Path),
help="SQLite index path. Defaults to .markitect/cache/index.sqlite3 under root.",
)
@click.option("--no-recursive", is_flag=True, help="Do not recurse into directories.")
@click.option(
"--no-verify-hashes",
is_flag=True,
help="Do not hash metadata-changed files before parsing.",
)
@click.option(
"--parse-option",
"parse_options",
multiple=True,
metavar="KEY=VALUE",
help="Parse option included in the snapshot identity hash.",
)
@click.option("--contract-hash", help="Optional contract hash included in snapshot identity.")
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
def cache_index(
paths: tuple[Path, ...],
root: Path,
index_path: Path | None,
no_recursive: bool,
no_verify_hashes: bool,
parse_options: tuple[str, ...],
contract_hash: str | None,
output_format: str,
) -> None:
"""Build or refresh the local SQLite snapshot/index store."""
try:
store = LocalSnapshotStore(local_index_path_for(root, index_path))
result = store.build(
list(paths),
root=root,
recursive=not no_recursive,
parse_options=_parse_key_value_options(parse_options),
contract_hash=contract_hash,
verify_hashes=not no_verify_hashes,
)
except ValueError as exc:
raise click.ClickException(str(exc)) from exc
_emit_local_index_data(result.to_dict(), output_format)
@cache.command("query")
@click.argument("selector")
@click.option(
"--root",
type=click.Path(exists=True, file_okay=False, path_type=Path),
default=Path("."),
show_default=True,
help="Root used for the default local index path.",
)
@click.option(
"--index-path",
type=click.Path(dir_okay=False, path_type=Path),
help="SQLite index path. Defaults to .markitect/cache/index.sqlite3 under root.",
)
@click.option(
"--path",
"paths",
multiple=True,
help="Restrict query to one or more indexed relative paths.",
)
@click.option(
"--policy",
"policy_file",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
help="Local label policy file used to filter results.",
)
@click.option("--subject", default="anonymous", help="Policy subject id.")
@click.option(
"--policy-mode",
type=click.Choice(["off", "audit", "enforce"], case_sensitive=False),
help="Override policy mode for this query.",
)
@click.option(
"--engine",
type=click.Choice(["selector", "jsonpath"], case_sensitive=False),
default="selector",
show_default=True,
help="Query engine to use.",
)
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="json",
show_default=True,
)
def cache_query(
selector: str,
root: Path,
index_path: Path | None,
paths: tuple[str, ...],
policy_file: Path | None,
subject: str,
policy_mode: str | None,
engine: str,
output_format: str,
) -> None:
"""Run a selector or JSONPath query over indexed document snapshots."""
store = LocalSnapshotStore(local_index_path_for(root, index_path))
policy_gateway = _load_policy_gateway(policy_file, policy_mode)
indexed_paths = sorted(paths or [state.path for state in store.load_state()])
all_matches = []
try:
for indexed_path in indexed_paths:
document = Document.from_dict(store.get_document(indexed_path))
policy_metadata = store.policy_metadata(indexed_path) if policy_gateway else {}
matches = (
query_document_jsonpath(document, selector)
if engine == "jsonpath"
else query_document(document, selector)
)
for match in matches:
item = match.to_dict()
item["source_path"] = indexed_path
if policy_metadata:
item["policy"] = policy_metadata
all_matches.append(item)
except KeyError as exc:
raise click.ClickException(str(exc)) from exc
except InvalidQueryError as exc:
raise click.ClickException(str(exc)) from exc
policy_result = None
if policy_gateway:
policy_result = policy_gateway.filter_results(subject, "query", all_matches)
all_matches = policy_result["results"]
data = {
"selector": selector,
"engine": engine,
"index_path": str(local_index_path_for(root, index_path)),
"count": len(all_matches),
"matches": all_matches,
}
if policy_result:
data["policy"] = policy_result.get("policy")
data["policy_decisions"] = policy_result.get("decisions")
data["diagnostics"] = policy_result.get("diagnostics")
_emit_query(data, output_format)
@main.command()
@click.argument("text")
@click.option(
"--root",
type=click.Path(exists=True, file_okay=False, path_type=Path),
default=Path("."),
show_default=True,
help="Root used for the default local index path.",
)
@click.option(
"--index-path",
type=click.Path(dir_okay=False, path_type=Path),
help="SQLite index path. Defaults to .markitect/cache/index.sqlite3 under root.",
)
@click.option("--limit", type=int, default=20, show_default=True)
@click.option(
"--policy",
"policy_file",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
help="Local label policy file used to filter results.",
)
@click.option("--subject", default="anonymous", help="Policy subject id.")
@click.option(
"--policy-mode",
type=click.Choice(["off", "audit", "enforce"], case_sensitive=False),
help="Override policy mode for this search.",
)
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
def search(
text: str,
root: Path,
index_path: Path | None,
limit: int,
policy_file: Path | None,
subject: str,
policy_mode: str | None,
output_format: str,
) -> None:
"""Search the local SQLite index with FTS5."""
try:
store = LocalSnapshotStore(local_index_path_for(root, index_path))
policy_gateway = _load_policy_gateway(policy_file, policy_mode)
if policy_gateway:
policy_result = store.search_with_policy(
text,
subject=subject,
gateway=policy_gateway,
limit=limit,
)
matches = policy_result["results"]
else:
policy_result = None
matches = [result.to_dict() for result in store.search(text, limit=limit)]
except ValueError as exc:
raise click.ClickException(str(exc)) from exc
data = {
"query": text,
"index_path": str(local_index_path_for(root, index_path)),
"count": len(matches),
"matches": matches,
}
if policy_result:
data["policy"] = policy_result.get("policy")
data["policy_decisions"] = policy_result.get("decisions")
data["diagnostics"] = policy_result.get("diagnostics")
_emit_search_results(data, output_format)
@main.group()
def workflow() -> None:
"""Inspect, plan, and run declarative Markdown workflows."""
@workflow.command("inspect")
@click.argument("workflow_file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
def workflow_inspect(workflow_file: Path, output_format: str) -> None:
"""Inspect a workflow definition without executing steps."""
try:
plan = load_workflow_file(workflow_file)
result = WorkflowRunner(plan).inspect()
except WorkflowError as exc:
raise click.ClickException(str(exc)) from exc
_emit_workflow_result(result.to_dict() | {"workflow": plan.to_dict()}, output_format)
raise click.exceptions.Exit(0 if result.valid else 1)
@workflow.command("plan")
@click.argument("workflow_file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option(
"--output-dir",
type=click.Path(file_okay=False, path_type=Path),
help="Output root for path-safety checks. No files are written in plan mode.",
)
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
def workflow_plan(workflow_file: Path, output_dir: Path | None, output_format: str) -> None:
"""Run a workflow in dry-run mode and report planned outputs."""
try:
plan = load_workflow_file(workflow_file)
result = WorkflowRunner(plan, output_dir=output_dir).run(dry_run=True)
except WorkflowError as exc:
raise click.ClickException(str(exc)) from exc
_emit_workflow_result(result.to_dict(), output_format)
raise click.exceptions.Exit(0 if result.valid else 1)
@workflow.command("run")
@click.argument("workflow_file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option(
"--output-dir",
type=click.Path(file_okay=False, path_type=Path),
help="Output root for workflow outputs. Defaults to the workflow directory.",
)
@click.option("--dry-run", is_flag=True, help="Execute without writing output files.")
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
def workflow_run(
workflow_file: Path,
output_dir: Path | None,
dry_run: bool,
output_format: str,
) -> None:
"""Run a deterministic Markdown workflow."""
try:
plan = load_workflow_file(workflow_file)
result = WorkflowRunner(plan, output_dir=output_dir).run(dry_run=dry_run)
except WorkflowError as exc:
raise click.ClickException(str(exc)) from exc
_emit_workflow_result(result.to_dict(), output_format)
raise click.exceptions.Exit(0 if result.valid else 1)
@main.group()
def template() -> None:
"""Render and inspect deterministic Markdown templates."""
@template.command("inspect")
@click.argument("template_file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
def template_inspect(template_file: Path, output_format: str) -> None:
"""Inspect variables required by a template."""
data = analyze_template(template_file.read_text(encoding="utf-8")).to_dict() | {
"template_path": str(template_file)
}
_emit_template_analysis(data, output_format)
raise click.exceptions.Exit(0 if data["valid"] else 1)
@template.command("render")
@click.argument("template_file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option(
"--data",
"data_file",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
help="JSON, YAML, or CSV data file. CSV must contain one record for render.",
)
@click.option(
"--set",
"set_values",
multiple=True,
metavar="KEY=VALUE",
help="Set a template data value. Dot paths create nested mappings.",
)
@click.option("--lenient", is_flag=True, help="Keep unresolved placeholders instead of failing.")
@click.option(
"--output",
type=click.Path(dir_okay=False, path_type=Path),
help="Write rendered Markdown to a file.",
)
@click.option(
"--format",
"output_format",
type=click.Choice(["markdown", "json", "yaml"], case_sensitive=False),
default="markdown",
show_default=True,
)
def template_render(
template_file: Path,
data_file: Path | None,
set_values: tuple[str, ...],
lenient: bool,
output: Path | None,
output_format: str,
) -> None:
"""Render a Markdown template with structured data."""
try:
data = _load_template_data(data_file)
data = _deep_merge_cli(data, _parse_key_value_options(set_values))
result = render_template(
template_file.read_text(encoding="utf-8"),
data,
strict=not lenient,
)
except (MissingTemplateVariable, TemplateError, ValueError, TypeError) as exc:
raise click.ClickException(str(exc)) from exc
_emit_markdown_result(result.to_dict(), output_format, output)
@main.group()
def generate() -> None:
"""Generate Markdown from contracts, rules, or external hooks."""
@generate.command("stub")
@click.option(
"--contract",
"contract_file",
required=True,
type=click.Path(exists=True, dir_okay=False, path_type=Path),
help="Markdown document contract to generate from.",
)
@click.option(
"--data",
"data_file",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
help="Optional JSON/YAML data for frontmatter values.",
)
@click.option(
"--set",
"set_values",
multiple=True,
metavar="KEY=VALUE",
help="Set generation data. Dot paths create nested mappings.",
)
@click.option("--include-optional", is_flag=True, help="Include optional contract sections.")
@click.option(
"--output",
type=click.Path(dir_okay=False, path_type=Path),
help="Write generated Markdown to a file.",
)
@click.option(
"--format",
"output_format",
type=click.Choice(["markdown", "json", "yaml"], case_sensitive=False),
default="markdown",
show_default=True,
)
def generate_stub(
contract_file: Path,
data_file: Path | None,
set_values: tuple[str, ...],
include_optional: bool,
output: Path | None,
output_format: str,
) -> None:
"""Generate a Markdown stub from a document contract."""
try:
data = _load_template_data(data_file)
data = _deep_merge_cli(data, _parse_key_value_options(set_values))
result = generate_stub_from_contract(
load_contract_file(contract_file),
data=data,
include_optional=include_optional,
)
except (ContractLoaderError, ValueError, TypeError) as exc:
raise click.ClickException(str(exc)) from exc
_emit_markdown_result(result.to_dict(), output_format, output)
@generate.command("rules")
@click.argument("rules_file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option(
"--output-dir",
type=click.Path(file_okay=False, path_type=Path),
help="Directory used for relative output paths in the plan.",
)
@click.option("--dry-run", is_flag=True, help="Render without writing output files.")
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml"], case_sensitive=False),
default="json",
show_default=True,
)
def generate_rules(
rules_file: Path,
output_dir: Path | None,
dry_run: bool,
output_format: str,
) -> None:
"""Run a Markdown/YAML generation plan."""
try:
plan = load_generation_plan_file(rules_file)
result = run_generation_plan(
plan,
base_dir=rules_file.parent,
output_dir=output_dir,
dry_run=dry_run,
)
except (GenerationPlanError, TemplateError, MissingTemplateVariable) as exc:
raise click.ClickException(str(exc)) from exc
_emit_jsonish(result.to_dict(), output_format)
@main.command()
@click.argument("file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option(
"--schema",
"schema_file",
required=True,
type=click.Path(exists=True, dir_okay=False, path_type=Path),
)
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
def validate(file: Path, schema_file: Path, output_format: str) -> None:
"""Validate a Markdown file against a Markdown schema file."""
result = validate_markdown_file(file, schema_file)
_emit_result(result.to_dict(), output_format)
raise click.exceptions.Exit(0 if result.valid else 1)
@main.group()
def schema() -> None:
"""Work with Markdown schema files."""
@schema.command("validate")
@click.argument("schema_file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
def schema_validate(schema_file: Path, output_format: str) -> None:
"""Validate that a Markdown schema contains a well-formed JSON Schema."""
loaded = load_schema_file(schema_file)
result = validate_schema(loaded.schema)
data = result.to_dict() | {"schema_path": str(schema_file)}
_emit_result(data, output_format)
raise click.exceptions.Exit(0 if result.valid else 1)
@main.group()
def contract() -> None:
"""Work with Markdown document contracts."""
@contract.command("validate")
@click.argument("contract_file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
def contract_validate(contract_file: Path, output_format: str) -> None:
"""Validate that a Markdown contract file is well formed."""
result = validate_contract(load_contract_file(contract_file))
_emit_diagnostic_result(result.to_dict(), output_format)
raise click.exceptions.Exit(0 if result.valid else 1)
@contract.command("check")
@click.argument("file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option(
"--contract",
"contract_file",
required=True,
type=click.Path(exists=True, dir_okay=False, path_type=Path),
)
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
@click.option(
"--context",
"context_file",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
help="YAML or JSON runtime context used for field prefill and dynamic rules.",
)
def contract_check(
file: Path,
contract_file: Path,
output_format: str,
context_file: Path | None,
) -> None:
"""Check a Markdown file against a Markdown document contract."""
try:
result = check_markdown_file(file, contract_file, context_path=context_file)
except ContractLoaderError as exc:
raise click.ClickException(str(exc)) from exc
_emit_diagnostic_result(result.to_dict(), output_format)
raise click.exceptions.Exit(0 if result.valid else 1)
@contract.command("form-state")
@click.argument("file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option(
"--contract",
"contract_file",
required=True,
type=click.Path(exists=True, dir_okay=False, path_type=Path),
)
@click.option(
"--context",
"context_file",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
help="YAML or JSON runtime context used for field prefill and dynamic rules.",
)
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
def contract_form_state(
file: Path,
contract_file: Path,
context_file: Path | None,
output_format: str,
) -> None:
"""Evaluate UI-neutral form state for a document contract."""
try:
document = parse_markdown_file(file)
contract_definition = load_contract_file(contract_file)
context = load_runtime_context_file(context_file) if context_file else None
form_state = evaluate_form_state(document, contract_definition, context)
except ContractLoaderError as exc:
raise click.ClickException(str(exc)) from exc
_emit_form_state(form_state.to_dict(), output_format)
raise click.exceptions.Exit(0 if form_state.valid else 1)
def _load_policy_gateway(
policy_file: Path | None,
policy_mode: str | None,
) -> LocalLabelPolicyGateway | None:
if policy_file is None and policy_mode is None:
return None
try:
if policy_file:
return LocalLabelPolicyGateway.from_file(policy_file, mode=policy_mode)
return LocalLabelPolicyGateway(mode=policy_mode)
except ValueError as exc:
raise click.ClickException(str(exc)) from exc
def _emit_result(data: dict, output_format: str) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
elif output_format == "yaml":
click.echo(yaml.safe_dump(data, sort_keys=False))
else:
if data.get("valid"):
click.echo("valid")
else:
click.echo("invalid")
for violation in data.get("violations", []):
click.echo(f"- {violation['path']}: {violation['message']}")
def _emit_diagnostic_result(data: dict, output_format: str) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
elif output_format == "yaml":
click.echo(yaml.safe_dump(data, sort_keys=False))
else:
click.echo("valid" if data.get("valid") else "invalid")
for diagnostic in data.get("diagnostics", []):
click.echo(
f"- [{diagnostic['severity']}] {diagnostic['code']}: "
f"{diagnostic['message']}"
)
if diagnostic.get("source"):
source = diagnostic["source"]
suffix = f":{source['line']}" if source.get("line") else ""
click.echo(f" source: {source.get('path', '<document>')}{suffix}")
if diagnostic.get("guidance"):
click.echo(f" guidance: {diagnostic['guidance']}")
def _emit_form_state(data: dict, output_format: str) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
elif output_format == "yaml":
click.echo(yaml.safe_dump(data, sort_keys=False))
else:
click.echo("valid" if data.get("valid") else "invalid")
for field in data.get("fields", []):
value = field.get("value", "<missing>") if field.get("exists") else "<missing>"
flags = []
if field.get("required"):
flags.append("required")
if field.get("visible") is False:
flags.append("hidden")
if field.get("enabled") is False:
flags.append("disabled")
suffix = f" ({', '.join(flags)})" if flags else ""
click.echo(f"- {field['id']}: {value} [{field.get('origin', 'unknown')}]{suffix}")
for diagnostic in data.get("diagnostics", []):
click.echo(
f" [{diagnostic['severity']}] {diagnostic['code']}: "
f"{diagnostic['message']}"
)
def _emit_policy_result(data: dict, output_format: str) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
elif output_format == "yaml":
click.echo(yaml.safe_dump(data, sort_keys=False))
else:
decision = data["decision"]
click.echo("allowed" if decision.get("allowed") else "denied")
click.echo(f"effect: {decision.get('effect')}")
click.echo(f"decision_id: {decision.get('decision_id')}")
click.echo(f"reason: {decision.get('reason')}")
def _emit_subject_result(data: dict, output_format: str) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
elif output_format == "yaml":
click.echo(yaml.safe_dump(data, sort_keys=False))
else:
subject = data["subject"]
click.echo(f"subject: {subject.get('id')}")
click.echo(f"roles: {', '.join(subject.get('roles', [])) or '<none>'}")
click.echo(f"labels: {', '.join(subject.get('allowed_labels', [])) or '<none>'}")
click.echo(f"trust_zones: {', '.join(subject.get('trust_zones', [])) or '<none>'}")
click.echo(f"actions: {', '.join(subject.get('allowed_actions', [])) or '<none>'}")
def _emit_resource_manifest_result(data: dict, output_format: str) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
elif output_format == "yaml":
click.echo(yaml.safe_dump(data, sort_keys=False))
else:
manifest = data["manifest"]
click.echo(f"manifest: {manifest.get('id')}")
click.echo(f"system: {manifest.get('system')}")
click.echo(f"resources: {len(manifest.get('resources', []))}")
actions = ", ".join(manifest.get("actions", [])) or "<none>"
click.echo(f"actions: {actions}")
def _emit_metrics(data: dict, output_format: str) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
elif output_format == "yaml":
click.echo(yaml.safe_dump(data, sort_keys=False))
else:
doc = data["document"]
click.echo("document")
for metric, value in doc.items():
click.echo(f"- {metric}: {value}")
sections = data.get("sections", [])
if sections:
click.echo("sections")
for section in sections:
click.echo(
f"- {section['heading']}: words={section['words']}, "
f"paragraphs={section['paragraphs']}, line={section['line']}"
)
def _emit_query(data: dict, output_format: str) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
elif output_format == "yaml":
click.echo(yaml.safe_dump(data, sort_keys=False))
else:
click.echo(f"{data['count']} match(es)")
if data.get("policy"):
_emit_policy_summary(data["policy"])
for match in data["matches"]:
location = f":{match['line']}" if match.get("line") else ""
click.echo(f"- {match['kind']} {match['path']}{location}")
if match.get("text"):
click.echo(f" {match['text'].splitlines()[0]}")
for diagnostic in data.get("diagnostics", []):
click.echo(f"! [{diagnostic['severity']}] {diagnostic['code']}: {diagnostic['message']}")
def _emit_extract(data: dict, output_format: str) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
elif output_format == "yaml":
click.echo(yaml.safe_dump(data, sort_keys=False))
else:
click.echo("\n\n".join(data["items"]))
def _emit_markdown_result(data: dict, output_format: str, output: Path | None) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
return
if output_format == "yaml":
click.echo(yaml.safe_dump(data, sort_keys=False))
return
markdown = data["markdown"]
if output:
output.write_text(markdown, encoding="utf-8")
else:
click.echo(markdown, nl=False)
def _emit_cache_data(data: dict, output_format: str) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
elif output_format == "yaml":
click.echo(yaml.safe_dump(data, sort_keys=False))
else:
if "dirty" in data:
click.echo("dirty" if data["dirty"] else "clean")
for key in ["new", "changed", "deleted", "unchanged"]:
values = data.get(key, [])
if values:
click.echo(f"{key}: {len(values)}")
for value in values:
click.echo(f"- {value}")
else:
click.echo(f"cache_path: {data.get('cache_path', '<none>')}")
click.echo(f"count: {data.get('count', len(data.get('entries', [])))}")
if data.get("written") is not None:
click.echo(f"written: {data['written']}")
def _emit_ast_stats(data: dict, output_format: str) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
elif output_format == "yaml":
click.echo(yaml.safe_dump(data, sort_keys=False))
else:
click.echo(f"document_path: {data['document_path']}")
for key, value in data["counts"].items():
click.echo(f"{key}: {value}")
click.echo(f"max_heading_depth: {data['max_heading_depth']}")
if data["token_types"]:
click.echo("token_types:")
for token_type, count in data["token_types"].items():
click.echo(f"- {token_type}: {count}")
def _emit_local_index_data(data: dict, output_format: str) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
elif output_format == "yaml":
click.echo(yaml.safe_dump(data, sort_keys=False))
else:
click.echo(f"index_path: {data['index_path']}")
if data.get("schema_version"):
click.echo(f"schema_version: {data['schema_version']}")
if data.get("sources") is not None:
click.echo(f"sources: {data['sources']}")
if data.get("dirty") is not None:
click.echo("dirty" if data["dirty"] else "clean")
for key in ["parsed", "indexed", "metadata_updated", "deleted"]:
values = data.get(key, [])
click.echo(f"{key}: {len(values)}")
for value in values:
click.echo(f"- {value}")
def _emit_search_results(data: dict, output_format: str) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
elif output_format == "yaml":
click.echo(yaml.safe_dump(data, sort_keys=False))
else:
click.echo(f"{data['count']} match(es)")
if data.get("policy"):
_emit_policy_summary(data["policy"])
for match in data["matches"]:
span = ""
if match.get("line_start"):
span = f":{match['line_start']}"
heading = f" [{match['heading']}]" if match.get("heading") else ""
click.echo(
f"- {match['path']}{span} {match['unit_kind']}#{match['unit_index']}{heading}"
)
preview = " ".join(str(match.get("text", "")).split())
if preview:
click.echo(f" {preview[:160]}")
for diagnostic in data.get("diagnostics", []):
click.echo(f"! [{diagnostic['severity']}] {diagnostic['code']}: {diagnostic['message']}")
def _emit_policy_summary(policy_data: dict) -> None:
click.echo(
"policy: "
f"mode={policy_data.get('mode')} "
f"subject={policy_data.get('subject')} "
f"allowed={policy_data.get('allowed', 0)} "
f"denied={policy_data.get('denied', 0)} "
f"redacted={policy_data.get('redacted', 0)}"
)
def _emit_workflow_result(data: dict, output_format: str) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
elif output_format == "yaml":
click.echo(yaml.safe_dump(data, sort_keys=False))
else:
click.echo("valid" if data.get("valid", True) else "invalid")
click.echo(f"workflow: {data.get('workflow_id') or data.get('workflow', {}).get('id', '<unknown>')}")
if data.get("sources"):
click.echo(f"sources: {len(data['sources'])}")
for source_id, source_data in data["sources"].items():
click.echo(f"- {source_id}: {source_data.get('count', 1)}")
if data.get("steps"):
click.echo(f"steps: {len(data['steps'])}")
for step_id, step_data in data["steps"].items():
click.echo(f"- {step_id}: {step_data.get('kind', '<unknown>')}")
if data.get("outputs"):
click.echo(f"outputs: {len(data['outputs'])}")
for output in data["outputs"]:
status = "written" if output.get("written") else "planned"
path = output.get("path") or "<memory>"
click.echo(f"- {output['id']}: {status} {path}")
for diagnostic in data.get("diagnostics", []):
click.echo(f"! [{diagnostic['severity']}] {diagnostic['code']}: {diagnostic['message']}")
def _emit_reference_result(data: dict, output_format: str) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
elif output_format == "yaml":
click.echo(yaml.safe_dump(data, sort_keys=False))
else:
click.echo(f"{data['count']} unit(s)")
click.echo(f"target: {data['target_path']}")
for unit in data["units"]:
span = unit.get("span", {})
line = f":{span['line_start']}" if span.get("line_start") else ""
click.echo(f"- {unit['kind']} {unit['unit_id']} {unit['source_path']}{line}")
if unit.get("name"):
click.echo(f" {unit['name']}")
def _emit_explode_result(data: dict, output_format: str) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
elif output_format == "yaml":
click.echo(yaml.safe_dump(data, sort_keys=False))
else:
manifest = data["manifest"]
click.echo(f"manifest: {data['manifest_path']}")
click.echo(f"variant: {manifest['variant']}")
click.echo(f"entries: {len(manifest['entries'])}")
for entry in manifest["entries"]:
click.echo(f"- {entry['kind']} {entry['file']}")
def _emit_processor_run(data: dict, output_format: str) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
elif output_format == "yaml":
click.echo(yaml.safe_dump(data, sort_keys=False))
else:
click.echo("valid" if data["valid"] else "invalid")
click.echo(f"processors: {data['count']}")
for block, result in zip(data["blocks"], data["results"], strict=False):
line = f":{block['line_start']}" if block.get("line_start") else ""
click.echo(f"- {block['processor']} {block['unit_id']}{line}")
if result.get("content"):
click.echo(f" content: {result['content'].splitlines()[0]}")
for diagnostic in result.get("diagnostics", []):
click.echo(f" [{diagnostic['severity']}] {diagnostic['code']}: {diagnostic['message']}")
def _emit_backend_list(data: dict, output_format: str) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
elif output_format == "yaml":
click.echo(yaml.safe_dump(data, sort_keys=False))
else:
click.echo(f"backends: {data['count']}")
for backend_data in data["backends"]:
capabilities = ", ".join(backend_data.get("capabilities", []))
click.echo(f"- {backend_data['id']} [{capabilities}]")
def _emit_backend_manifest(data: dict, output_format: str) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
elif output_format == "yaml":
click.echo(yaml.safe_dump(data, sort_keys=False))
else:
click.echo(data["id"])
if data.get("name"):
click.echo(f"name: {data['name']}")
click.echo(f"kind: {data.get('kind', 'cache-backend')}")
click.echo("capabilities: " + ", ".join(data.get("capabilities", [])))
if data.get("storage"):
click.echo(f"storage: {data['storage']}")
if data.get("policy"):
click.echo(f"policy: {data['policy']}")
if data.get("capability_check"):
check = data["capability_check"]
click.echo("compatible" if check["compatible"] else "incompatible")
if check.get("missing"):
click.echo("missing: " + ", ".join(check["missing"]))
def _emit_snapshot_identity(data: dict, output_format: str) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
elif output_format == "yaml":
click.echo(yaml.safe_dump(data, sort_keys=False))
else:
click.echo(data["snapshot_id"])
click.echo(f"content_hash: {data['content_hash']}")
click.echo(f"parser: {data['parser']} {data['parser_version']}")
def _emit_refresh_plan(data: dict, output_format: str) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
elif output_format == "yaml":
click.echo(yaml.safe_dump(data, sort_keys=False))
else:
click.echo("dirty" if data["dirty"] else "clean")
counts = data["counts"]
for key in [
"unchanged",
"needs_hash",
"needs_parse",
"needs_index",
"needs_metadata_update",
"deleted",
"invalidated",
]:
click.echo(f"{key}: {counts[key]}")
for entry in data["entries"]:
actions = ",".join(entry.get("actions", [])) or "none"
click.echo(f"- {entry['path']}: {actions} ({entry['reason']})")
if entry.get("invalidated_by"):
click.echo(f" invalidated_by: {', '.join(entry['invalidated_by'])}")
def _emit_content_class_result(data: dict, output_format: str) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
elif output_format == "yaml":
click.echo(yaml.safe_dump(data, sort_keys=False))
else:
click.echo("valid" if data["valid"] else "invalid")
click.echo("linearization: " + " -> ".join(data["linearization"]))
for slot, value in data.get("slots", {}).items():
click.echo(f"- {slot}: {value}")
for diagnostic in data.get("diagnostics", []):
click.echo(f"! [{diagnostic['severity']}] {diagnostic['code']}: {diagnostic['message']}")
def _emit_tangle_result(data: dict, output_format: str) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
elif output_format == "yaml":
click.echo(yaml.safe_dump(data, sort_keys=False))
else:
click.echo("valid" if data["valid"] else "invalid")
click.echo(f"files: {len(data['files'])}")
for file in data["files"]:
click.echo(f"- {file['path']}: {', '.join(file['chunk_ids'])}")
for diagnostic in data.get("diagnostics", []):
click.echo(f"! [{diagnostic['severity']}] {diagnostic['code']}: {diagnostic['message']}")
for written in data.get("written_files", []):
click.echo(f"written: {written}")
def _emit_jsonish(data: dict, output_format: str) -> None:
if output_format == "yaml":
click.echo(yaml.safe_dump(data, sort_keys=False))
else:
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
def _emit_template_analysis(data: dict, output_format: str) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
elif output_format == "yaml":
click.echo(yaml.safe_dump(data, sort_keys=False))
else:
click.echo("valid" if data["valid"] else "invalid")
click.echo(f"variables: {data['unique_variables']}")
for variable in data["variables"]:
click.echo(f"- {variable}")
for error in data["syntax_errors"]:
click.echo(f"! {error}")
def _parse_key_value_options(items: tuple[str, ...]) -> dict[str, object]:
values: dict[str, object] = {}
for item in items:
if "=" not in item:
raise ValueError(f"Expected KEY=VALUE, got `{item}`")
key, raw_value = item.split("=", 1)
key = key.strip()
if not key:
raise ValueError(f"Expected non-empty key in `{item}`")
_set_path(values, key.split("."), yaml.safe_load(raw_value))
return values
def _set_path(mapping: dict[str, object], path: list[str], value: object) -> None:
current = mapping
for part in path[:-1]:
next_value = current.setdefault(part, {})
if not isinstance(next_value, dict):
raise ValueError(f"Cannot set nested frontmatter path through scalar `{part}`")
current = next_value
current[path[-1]] = value
def _ast_stats(document: dict, document_path: str) -> dict:
token_types: dict[str, int] = {}
for token in document.get("tokens", []):
token_type = str(token.get("type", "unknown"))
token_types[token_type] = token_types.get(token_type, 0) + 1
headings = document.get("headings", [])
return {
"document_path": document_path,
"source_path": document.get("source_path"),
"counts": {
"frontmatter_keys": len(document.get("frontmatter", {})),
"headings": len(headings),
"sections": len(document.get("sections", [])),
"blocks": len(document.get("blocks", [])),
"tokens": len(document.get("tokens", [])),
},
"max_heading_depth": max(
[int(heading.get("level", 0)) for heading in headings] or [0]
),
"token_types": dict(sorted(token_types.items())),
}
def _load_template_data(data_file: Path | None) -> dict[str, object]:
if data_file is None:
return {}
data = load_data_file(data_file)
if isinstance(data, list):
if len(data) != 1:
raise ValueError("Template render expects exactly one CSV record")
data = data[0]
if not isinstance(data, dict):
raise ValueError("Template data must be a mapping")
return data
def _deep_merge_cli(left: dict[str, object], right: dict[str, object]) -> dict[str, object]:
merged = dict(left)
for key, value in right.items():
if isinstance(merged.get(key), dict) and isinstance(value, dict):
merged[key] = _deep_merge_cli(merged[key], value)
else:
merged[key] = value
return merged
if __name__ == "__main__":
main()