Backend fabric extension

This commit is contained in:
2026-05-04 02:43:32 +02:00
parent 33aee0d162
commit 3f08a27a24
11 changed files with 1080 additions and 8 deletions

View File

@@ -32,6 +32,28 @@ from markitect_tool.cache import (
save_cache,
scan_markdown_files,
)
from markitect_tool.backend import (
BACKEND_CAPABILITIES,
DEFAULT_BACKEND_PATHS,
AccessPolicyGateway,
BackendCapabilityCheck,
BackendManifest,
BackendRegistry,
BackendRegistryError,
ContextPackageRegistry,
DependencyEdge,
DocumentSnapshot,
IndexBackend,
ProcessorResultStore,
ProvenanceEnvelope,
QueryAdapter,
SnapshotBackend,
SnapshotIdentity,
capability_check,
load_backend_manifest,
load_backend_registry,
snapshot_identity_for_file,
)
from markitect_tool.content_class import (
ClassCompositionResult,
ContentClass,
@@ -162,6 +184,26 @@ __all__ = [
"load_cache",
"save_cache",
"scan_markdown_files",
"BACKEND_CAPABILITIES",
"DEFAULT_BACKEND_PATHS",
"AccessPolicyGateway",
"BackendCapabilityCheck",
"BackendManifest",
"BackendRegistry",
"BackendRegistryError",
"ContextPackageRegistry",
"DependencyEdge",
"DocumentSnapshot",
"IndexBackend",
"ProcessorResultStore",
"ProvenanceEnvelope",
"QueryAdapter",
"SnapshotBackend",
"SnapshotIdentity",
"capability_check",
"load_backend_manifest",
"load_backend_registry",
"snapshot_identity_for_file",
"ClassCompositionResult",
"ContentClass",
"ContentClassRegistry",

View File

@@ -0,0 +1,49 @@
"""Optional backend fabric for snapshots, indexes, policy, and provenance."""
from markitect_tool.backend.engine import (
BACKEND_CAPABILITIES,
DEFAULT_BACKEND_PATHS,
BackendCapabilityCheck,
BackendManifest,
BackendRegistry,
BackendRegistryError,
DependencyEdge,
DocumentSnapshot,
ProvenanceEnvelope,
SnapshotIdentity,
capability_check,
load_backend_manifest,
load_backend_registry,
snapshot_identity_for_file,
)
from markitect_tool.backend.interfaces import (
AccessPolicyGateway,
ContextPackageRegistry,
IndexBackend,
ProcessorResultStore,
QueryAdapter,
SnapshotBackend,
)
__all__ = [
"BACKEND_CAPABILITIES",
"DEFAULT_BACKEND_PATHS",
"BackendCapabilityCheck",
"BackendManifest",
"BackendRegistry",
"BackendRegistryError",
"DependencyEdge",
"DocumentSnapshot",
"ProvenanceEnvelope",
"SnapshotIdentity",
"capability_check",
"load_backend_manifest",
"load_backend_registry",
"snapshot_identity_for_file",
"AccessPolicyGateway",
"ContextPackageRegistry",
"IndexBackend",
"ProcessorResultStore",
"QueryAdapter",
"SnapshotBackend",
]

View File

@@ -0,0 +1,359 @@
"""Backend manifests, registry, snapshot identity, and provenance models."""
from __future__ import annotations
import hashlib
import json
import re
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Any
import yaml
BACKEND_CAPABILITIES = {
"snapshots",
"ast",
"json",
"jsonpath",
"fts",
"sql",
"vector",
"hybrid",
"context_packages",
"policy",
"policy_pushdown",
"provenance",
"reference_graph",
"processor_results",
"source_maps",
}
DEFAULT_BACKEND_PATHS = (".markitect/backends", ".markitect/backend.yaml")
PARSER_ID = "markdown-it-py/commonmark"
PARSER_VERSION = "markitect-tool:1"
class BackendRegistryError(ValueError):
"""Raised when backend manifests or registry operations fail."""
@dataclass(frozen=True)
class BackendCapabilityCheck:
"""Compatibility result for a backend against required capabilities."""
backend_id: str
required: list[str]
supported: list[str]
missing: list[str]
@property
def compatible(self) -> bool:
return not self.missing
def to_dict(self) -> dict[str, Any]:
return {
"backend_id": self.backend_id,
"compatible": self.compatible,
"required": self.required,
"supported": self.supported,
"missing": self.missing,
}
@dataclass(frozen=True)
class BackendManifest:
"""Declarative manifest for an optional backend."""
id: str
kind: str = "cache-backend"
name: str | None = None
version: str = "1"
capabilities: list[str] = field(default_factory=list)
storage: dict[str, Any] = field(default_factory=dict)
policy: dict[str, Any] = field(default_factory=dict)
description: str | None = None
manifest_path: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)
def check(self, required: list[str]) -> BackendCapabilityCheck:
return capability_check(self, required)
def to_dict(self) -> dict[str, Any]:
data = {
"id": self.id,
"kind": self.kind,
"name": self.name,
"version": self.version,
"capabilities": self.capabilities,
"storage": self.storage,
"policy": self.policy,
"description": self.description,
"manifest_path": self.manifest_path,
"metadata": self.metadata,
}
return {key: value for key, value in data.items() if value not in (None, {}, [])}
@dataclass(frozen=True)
class SnapshotIdentity:
"""Content-addressed identity for a parsed document snapshot."""
source_path: str
content_hash: str
parser: str = PARSER_ID
parser_version: str = PARSER_VERSION
parse_options_hash: str = "sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
contract_hash: str | None = None
@property
def snapshot_id(self) -> str:
payload = json.dumps(self.to_dict(), sort_keys=True, ensure_ascii=False)
return "snapshot:" + hashlib.sha256(payload.encode("utf-8")).hexdigest()
def to_dict(self) -> dict[str, Any]:
data = {
"source_path": self.source_path,
"content_hash": self.content_hash,
"parser": self.parser,
"parser_version": self.parser_version,
"parse_options_hash": self.parse_options_hash,
"contract_hash": self.contract_hash,
}
return {key: value for key, value in data.items() if value is not None}
@dataclass(frozen=True)
class DependencyEdge:
"""Dependency edge between a snapshot/unit and another addressable target."""
source_id: str
target: str
kind: str
target_snapshot_id: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
data = {
"source_id": self.source_id,
"target": self.target,
"kind": self.kind,
"target_snapshot_id": self.target_snapshot_id,
"metadata": self.metadata,
}
return {key: value for key, value in data.items() if value not in (None, {})}
@dataclass(frozen=True)
class ProvenanceEnvelope:
"""Shared provenance metadata for backend-derived objects."""
operation: str
snapshot_id: str | None = None
source_path: str | None = None
content_hash: str | None = None
dependencies: list[DependencyEdge] = field(default_factory=list)
backend_id: str | None = None
policy_decision_id: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
data = {
"operation": self.operation,
"snapshot_id": self.snapshot_id,
"source_path": self.source_path,
"content_hash": self.content_hash,
"dependencies": [edge.to_dict() for edge in self.dependencies],
"backend_id": self.backend_id,
"policy_decision_id": self.policy_decision_id,
"metadata": self.metadata,
}
return {key: value for key, value in data.items() if value not in (None, [], {})}
@dataclass(frozen=True)
class DocumentSnapshot:
"""A parsed document snapshot that optional backends may persist."""
identity: SnapshotIdentity
document: dict[str, Any]
units: list[dict[str, Any]] = field(default_factory=list)
dependencies: list[DependencyEdge] = field(default_factory=list)
provenance: ProvenanceEnvelope | None = None
@property
def snapshot_id(self) -> str:
return self.identity.snapshot_id
def to_dict(self) -> dict[str, Any]:
data = {
"snapshot_id": self.snapshot_id,
"identity": self.identity.to_dict(),
"document": self.document,
"units": self.units,
"dependencies": [edge.to_dict() for edge in self.dependencies],
"provenance": self.provenance.to_dict() if self.provenance else None,
}
return {key: value for key, value in data.items() if value not in (None, [], {})}
class BackendRegistry:
"""Read-only registry of backend manifests."""
def __init__(self, manifests: list[BackendManifest] | None = None) -> None:
self._manifests: dict[str, BackendManifest] = {}
for manifest in manifests or []:
self.register(manifest)
def register(self, manifest: BackendManifest) -> None:
if manifest.id in self._manifests:
raise BackendRegistryError(f"Duplicate backend id `{manifest.id}`")
self._manifests[manifest.id] = manifest
def list(self) -> list[BackendManifest]:
return [self._manifests[key] for key in sorted(self._manifests)]
def get(self, backend_id: str) -> BackendManifest:
try:
return self._manifests[backend_id]
except KeyError as exc:
raise BackendRegistryError(f"Unknown backend `{backend_id}`") from exc
def find_by_capability(self, capability: str) -> list[BackendManifest]:
return [
manifest
for manifest in self.list()
if capability in manifest.capabilities
]
def to_dict(self) -> dict[str, Any]:
return {
"count": len(self._manifests),
"backends": [manifest.to_dict() for manifest in self.list()],
}
def capability_check(
manifest: BackendManifest,
required: list[str],
) -> BackendCapabilityCheck:
required_sorted = sorted({_normalize_capability(value) for value in required})
supported_sorted = sorted({_normalize_capability(value) for value in manifest.capabilities})
missing = sorted(set(required_sorted) - set(supported_sorted))
return BackendCapabilityCheck(
backend_id=manifest.id,
required=required_sorted,
supported=supported_sorted,
missing=missing,
)
def snapshot_identity_for_file(
path: str | Path,
*,
parse_options: dict[str, Any] | None = None,
contract_hash: str | None = None,
) -> SnapshotIdentity:
"""Build a content-addressed snapshot identity for a file."""
file_path = Path(path)
content_hash = "sha256:" + hashlib.sha256(file_path.read_bytes()).hexdigest()
options_hash = _hash_mapping(parse_options or {})
return SnapshotIdentity(
source_path=str(file_path),
content_hash=content_hash,
parse_options_hash=options_hash,
contract_hash=contract_hash,
)
def load_backend_manifest(path: str | Path) -> BackendManifest:
"""Load one backend manifest from YAML or Markdown fenced YAML."""
manifest_path = Path(path)
text = manifest_path.read_text(encoding="utf-8")
data = _extract_manifest_mapping(text, manifest_path)
return _manifest_from_mapping(data, manifest_path)
def load_backend_registry(paths: list[str | Path] | None = None) -> BackendRegistry:
"""Load backend manifests from files or directories without importing backends."""
registry = BackendRegistry()
for path in _iter_manifest_paths(paths or list(DEFAULT_BACKEND_PATHS)):
registry.register(load_backend_manifest(path))
return registry
def _extract_manifest_mapping(text: str, manifest_path: Path) -> dict[str, Any]:
if manifest_path.suffix.lower() in {".yaml", ".yml"}:
data = yaml.safe_load(text) or {}
else:
match = re.search(
r"```[^\n`]*markitect-backend[^\n`]*\n(?P<body>.*?)\n```",
text,
flags=re.DOTALL,
)
if not match:
raise BackendRegistryError(
f"Markdown backend manifest lacks a markitect-backend fenced block: {manifest_path}"
)
data = yaml.safe_load(match.group("body")) or {}
if not isinstance(data, dict):
raise BackendRegistryError(f"Backend manifest must be a mapping: {manifest_path}")
if isinstance(data.get("backend"), dict):
data = data["backend"]
return data
def _manifest_from_mapping(data: dict[str, Any], manifest_path: Path) -> BackendManifest:
backend_id = str(data.get("id", "")).strip()
if not backend_id:
raise BackendRegistryError(f"Backend manifest requires an id: {manifest_path}")
raw_capabilities = data.get("capabilities", [])
if not isinstance(raw_capabilities, list):
raise BackendRegistryError(f"Backend capabilities must be a list: {manifest_path}")
capabilities = [_normalize_capability(str(value)) for value in raw_capabilities]
unknown = sorted(set(capabilities) - BACKEND_CAPABILITIES)
metadata = dict(data.get("metadata") or {})
if unknown:
metadata["unknown_capabilities"] = unknown
return BackendManifest(
id=backend_id,
kind=str(data.get("kind", "cache-backend")),
name=str(data["name"]) if data.get("name") is not None else None,
version=str(data.get("version", "1")),
capabilities=capabilities,
storage=dict(data.get("storage") or {}),
policy=dict(data.get("policy") or {}),
description=str(data["description"]) if data.get("description") is not None else None,
manifest_path=str(manifest_path),
metadata=metadata,
)
def _iter_manifest_paths(paths: list[str | Path]) -> list[Path]:
manifest_paths: list[Path] = []
for raw_path in paths:
path = Path(raw_path)
if not path.exists():
continue
if path.is_file() and path.suffix.lower() in {".md", ".markdown", ".yaml", ".yml"}:
manifest_paths.append(path)
elif path.is_dir():
manifest_paths.extend(
candidate
for candidate in path.rglob("*")
if candidate.is_file()
and candidate.suffix.lower() in {".md", ".markdown", ".yaml", ".yml"}
)
return sorted(set(manifest_paths))
def _normalize_capability(value: str) -> str:
return value.strip().lower().replace("-", "_")
def _hash_mapping(mapping: dict[str, Any]) -> str:
payload = json.dumps(mapping, sort_keys=True, ensure_ascii=False)
return "sha256:" + hashlib.sha256(payload.encode("utf-8")).hexdigest()

View File

@@ -0,0 +1,155 @@
"""Protocol interfaces for optional Markitect backends."""
from __future__ import annotations
from pathlib import Path
from typing import Any, Protocol, runtime_checkable
from markitect_tool.backend.engine import (
BackendCapabilityCheck,
DependencyEdge,
DocumentSnapshot,
ProvenanceEnvelope,
)
@runtime_checkable
class SnapshotBackend(Protocol):
"""Durable parsed-document snapshot backend."""
backend_id: str
def capabilities(self) -> BackendCapabilityCheck:
"""Return supported snapshot capabilities."""
def put_document(
self,
source_path: str | Path,
content: str,
parse_options: dict[str, Any] | None = None,
) -> str:
"""Persist a parsed document snapshot and return its snapshot id."""
def get_snapshot(self, snapshot_id: str) -> DocumentSnapshot:
"""Return a previously stored snapshot."""
def resolve_source(self, source_path: str | Path) -> str | None:
"""Return the latest snapshot id for a source path."""
def diff_snapshot(self, old_id: str, new_id: str) -> dict[str, Any]:
"""Return a backend-specific snapshot diff."""
@runtime_checkable
class IndexBackend(Protocol):
"""Derived index backend for snapshots."""
backend_id: str
def capabilities(self) -> BackendCapabilityCheck:
"""Return supported index capabilities."""
def build(self, snapshot_ids: list[str], options: dict[str, Any] | None = None) -> dict[str, Any]:
"""Build derived indexes for snapshots."""
def refresh(self, changed_snapshots: list[str]) -> dict[str, Any]:
"""Refresh derived indexes for changed snapshots."""
def query(self, request: dict[str, Any]) -> dict[str, Any]:
"""Run a backend query and return a common result envelope."""
def explain(self, request: dict[str, Any]) -> dict[str, Any]:
"""Explain a backend query plan."""
@runtime_checkable
class QueryAdapter(Protocol):
"""Adapter from stable Markitect requests to backend execution."""
name: str
def supports(self, selector_or_query: str, target: str | None = None) -> bool:
"""Return whether the adapter can execute a query."""
def execute(self, request: dict[str, Any]) -> dict[str, Any]:
"""Execute a query and return common results."""
def explain(self, request: dict[str, Any]) -> dict[str, Any]:
"""Explain how a query would execute."""
@runtime_checkable
class ContextPackageRegistry(Protocol):
"""Agent-ready context package registry."""
registry_id: str
def create_package(
self,
query_or_manifest: dict[str, Any],
budget: dict[str, Any] | None = None,
policy: dict[str, Any] | None = None,
) -> str:
"""Create a context package and return its id."""
def activate(self, package_id: str, thread_or_workspace: str) -> str:
"""Activate a context package and return activation id."""
def deactivate(self, activation_id: str) -> None:
"""Deactivate an active context package."""
def refresh(self, package_id: str) -> str:
"""Refresh a package and return the new package id."""
def explain(self, package_id: str) -> dict[str, Any]:
"""Return package provenance, budget, and retrieval details."""
@runtime_checkable
class AccessPolicyGateway(Protocol):
"""Authorization and filtering gateway for backend results."""
gateway_id: str
def authorize(
self,
subject: str,
action: str,
object_id: str,
context: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Authorize one action against one object."""
def filter_results(
self,
subject: str,
action: str,
results: list[dict[str, Any]],
context: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Filter results and return policy decisions."""
def explain_decision(self, decision_id: str) -> dict[str, Any]:
"""Explain one policy decision."""
@runtime_checkable
class ProcessorResultStore(Protocol):
"""Optional store for deterministic or assisted processor outputs."""
store_id: str
def put_result(
self,
processor_name: str,
input_hash: str,
result: dict[str, Any],
provenance: ProvenanceEnvelope,
) -> str:
"""Persist a processor result and return its result id."""
def get_result(self, result_id: str) -> dict[str, Any]:
"""Return a processor result."""
def dependencies(self, result_id: str) -> list[DependencyEdge]:
"""Return dependencies for a processor result."""

View File

@@ -16,6 +16,11 @@ from markitect_tool.cache import (
load_cache,
save_cache,
)
from markitect_tool.backend import (
BackendRegistryError,
load_backend_registry,
snapshot_identity_for_file,
)
from markitect_tool.content_class import (
ContentClassResolutionError,
load_content_class_file,
@@ -458,6 +463,124 @@ def process(file: Path, root: Path, output_format: str) -> None:
raise click.exceptions.Exit(0 if result.valid else 1)
@main.group()
def backend() -> None:
"""Inspect optional backend manifests and snapshot identities."""
@backend.command("list")
@click.option(
"--path",
"paths",
multiple=True,
type=click.Path(path_type=Path),
help="Backend manifest file or directory. Defaults to .markitect/backends and .markitect/backend.yaml.",
)
@click.option("--capability", help="Only show backends that declare this capability.")
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
def backend_list(paths: tuple[Path, ...], capability: str | None, output_format: str) -> None:
"""List registered optional backend manifests."""
try:
registry = load_backend_registry(list(paths) or None)
except BackendRegistryError as exc:
raise click.ClickException(str(exc)) from exc
manifests = (
registry.find_by_capability(capability.replace("-", "_").lower())
if capability
else registry.list()
)
data = {
"count": len(manifests),
"backends": [manifest.to_dict() for manifest in manifests],
}
_emit_backend_list(data, output_format)
@backend.command("inspect")
@click.argument("backend_id")
@click.option(
"--path",
"paths",
multiple=True,
type=click.Path(path_type=Path),
help="Backend manifest file or directory. Defaults to .markitect/backends and .markitect/backend.yaml.",
)
@click.option(
"--require",
"required_capabilities",
multiple=True,
help="Required capability to check. May be repeated.",
)
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
def backend_inspect(
backend_id: str,
paths: tuple[Path, ...],
required_capabilities: tuple[str, ...],
output_format: str,
) -> None:
"""Inspect one backend manifest and optional compatibility check."""
try:
registry = load_backend_registry(list(paths) or None)
manifest = registry.get(backend_id)
except BackendRegistryError as exc:
raise click.ClickException(str(exc)) from exc
data = manifest.to_dict()
if required_capabilities:
data["capability_check"] = manifest.check(list(required_capabilities)).to_dict()
_emit_backend_manifest(data, output_format)
@backend.command("snapshot-id")
@click.argument("file", type=click.Path(exists=True, dir_okay=False, path_type=Path))
@click.option(
"--parse-option",
"parse_options",
multiple=True,
metavar="KEY=VALUE",
help="Parse option included in the snapshot identity hash.",
)
@click.option("--contract-hash", help="Optional contract hash included in the snapshot identity.")
@click.option(
"--format",
"output_format",
type=click.Choice(["json", "yaml", "text"], case_sensitive=False),
default="text",
show_default=True,
)
def backend_snapshot_id(
file: Path,
parse_options: tuple[str, ...],
contract_hash: str | None,
output_format: str,
) -> None:
"""Compute a read-only content-addressed snapshot identity for a file."""
try:
identity = snapshot_identity_for_file(
file,
parse_options=_parse_key_value_options(parse_options),
contract_hash=contract_hash,
)
except ValueError as exc:
raise click.ClickException(str(exc)) from exc
data = identity.to_dict() | {"snapshot_id": identity.snapshot_id}
_emit_snapshot_identity(data, output_format)
@main.group("class")
def class_group() -> None:
"""Resolve deterministic content classes."""
@@ -1070,6 +1193,51 @@ def _emit_processor_run(data: dict, output_format: str) -> None:
click.echo(f" [{diagnostic['severity']}] {diagnostic['code']}: {diagnostic['message']}")
def _emit_backend_list(data: dict, output_format: str) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
elif output_format == "yaml":
click.echo(yaml.safe_dump(data, sort_keys=False))
else:
click.echo(f"backends: {data['count']}")
for backend_data in data["backends"]:
capabilities = ", ".join(backend_data.get("capabilities", []))
click.echo(f"- {backend_data['id']} [{capabilities}]")
def _emit_backend_manifest(data: dict, output_format: str) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
elif output_format == "yaml":
click.echo(yaml.safe_dump(data, sort_keys=False))
else:
click.echo(data["id"])
if data.get("name"):
click.echo(f"name: {data['name']}")
click.echo(f"kind: {data.get('kind', 'cache-backend')}")
click.echo("capabilities: " + ", ".join(data.get("capabilities", [])))
if data.get("storage"):
click.echo(f"storage: {data['storage']}")
if data.get("policy"):
click.echo(f"policy: {data['policy']}")
if data.get("capability_check"):
check = data["capability_check"]
click.echo("compatible" if check["compatible"] else "incompatible")
if check.get("missing"):
click.echo("missing: " + ", ".join(check["missing"]))
def _emit_snapshot_identity(data: dict, output_format: str) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))
elif output_format == "yaml":
click.echo(yaml.safe_dump(data, sort_keys=False))
else:
click.echo(data["snapshot_id"])
click.echo(f"content_hash: {data['content_hash']}")
click.echo(f"parser: {data['parser']} {data['parser_version']}")
def _emit_content_class_result(data: dict, output_format: str) -> None:
if output_format == "json":
click.echo(json.dumps(data, indent=2, ensure_ascii=False))