source adapter framework

This commit is contained in:
2026-05-14 22:05:34 +02:00
parent f8f20c7c32
commit eb34c0d4fb
17 changed files with 1924 additions and 15 deletions

View File

@@ -0,0 +1,380 @@
import importlib
import json
from pathlib import Path
from click.testing import CliRunner
import markitect_tool as api
from markitect_tool.diagnostics import Diagnostic
from markitect_tool.extension import OptionalDependency, builtin_extension_registry
from markitect_tool.source import (
NORMALIZED_SOURCE_SCHEMA_VERSION,
NormalizationQuality,
NormalizedMarkdownDocument,
NormalizedMarkdownSegment,
SourceAdapterDescriptor,
SourceAdapterMatch,
SourceAdapterMatchRequest,
SourceAdapterRegistry,
SourceAsset,
SourceInspectRequest,
SourceInspectResult,
SourceMetadata,
SourceProvenance,
SourceReadRequest,
SourceReadResult,
discover_source_adapters,
inspect_source,
normalization_cache_key,
normalize_source,
)
SAMPLE_SOURCE = Path("examples/source-adapters/sample.fake")
NORMALIZED_MARKDOWN = (
"# Fake Source\n\n"
"A small normalized segment.\n\n"
"## Second Segment\n\n"
"Another deterministic segment."
)
class FakeSourceAdapter:
def __init__(self, descriptor: SourceAdapterDescriptor, *, confidence: int = 80) -> None:
self.descriptor = descriptor
self.confidence = confidence
def can_read(self, request: SourceAdapterMatchRequest) -> SourceAdapterMatch:
return SourceAdapterMatch(
adapter_id=self.descriptor.id,
matched=request.asset.extension == ".fake",
confidence=self.confidence,
reason="extension",
)
def inspect(self, request: SourceInspectRequest) -> SourceInspectResult:
return SourceInspectResult(
asset=request.asset,
adapter={"id": self.descriptor.id, "version": self.descriptor.version, "options": request.options},
metadata=_source_metadata(),
capabilities=["read"],
quality=NormalizationQuality(lossiness="none", confidence=1.0),
)
def read(self, request: SourceReadRequest) -> SourceReadResult:
asset = request.asset
provenance = [
SourceProvenance(
source_uri=asset.uri,
source_path=asset.path,
digest=asset.digest,
)
]
segments = [
NormalizedMarkdownSegment(
segment_id="seg-0001",
order=0,
heading="Fake Source",
heading_level=1,
markdown="# Fake Source\n\nA small normalized segment.",
anchors=["fake-source"],
provenance=[
SourceProvenance(
source_uri=asset.uri,
source_path=asset.path,
anchor="fake-source",
section="Fake Source",
)
],
),
NormalizedMarkdownSegment(
segment_id="seg-0002",
order=1,
heading="Second Segment",
heading_level=2,
markdown="## Second Segment\n\nAnother deterministic segment.",
anchors=["second-segment"],
provenance=[
SourceProvenance(
source_uri=asset.uri,
source_path=asset.path,
anchor="second-segment",
section="Second Segment",
)
],
),
]
cache_key = normalization_cache_key(
asset=asset,
adapter_id=self.descriptor.id,
adapter_version=self.descriptor.version,
options=request.options,
)
document = NormalizedMarkdownDocument(
document_id=f"{self.descriptor.id}:fake-source-001",
asset=asset,
metadata=_source_metadata(),
markdown=NORMALIZED_MARKDOWN,
segments=segments,
quality=NormalizationQuality(lossiness="none", confidence=1.0, skipped_items=0, warnings=0),
provenance=provenance,
adapter={"id": self.descriptor.id, "version": self.descriptor.version, "options": request.options},
cache_key=cache_key,
)
return SourceReadResult(document=document)
def _source_metadata() -> SourceMetadata:
return SourceMetadata(
title="Fake Source",
creators=["Markitect Fixture"],
language="en",
identifiers={"fixture": "fake-source-001"},
)
def _fake_descriptor(adapter_id: str = "source.fake", *, confidence: int = 80) -> SourceAdapterDescriptor:
descriptor = None
def factory() -> FakeSourceAdapter:
assert descriptor is not None
return FakeSourceAdapter(descriptor, confidence=confidence)
descriptor = SourceAdapterDescriptor(
id=adapter_id,
version="1",
name="Fake Source Adapter",
summary="Contract-test adapter for plain fixture sources.",
operations=["read"],
media_types=["text/x.markitect-fake"],
extensions=[".fake"],
factory=factory,
safety={
"reads_files": True,
"writes_files": False,
"network": False,
"external_process": False,
},
)
return descriptor
def test_normalized_document_serialization_round_trips():
registry = SourceAdapterRegistry([_fake_descriptor()])
result = normalize_source(SAMPLE_SOURCE, registry=registry)
assert result.is_valid
assert result.document is not None
data = result.document.to_dict()
round_trip = NormalizedMarkdownDocument.from_dict(data).to_dict()
assert round_trip == data
assert data["schema_version"] == NORMALIZED_SOURCE_SCHEMA_VERSION
assert data["markdown"] == NORMALIZED_MARKDOWN
assert data["segments"][0]["segment_id"] == "seg-0001"
def test_normalization_cache_key_is_deterministic():
asset = SourceAsset(uri="sample.fake", path="sample.fake", digest="sha256:abc")
first = normalization_cache_key(
asset=asset,
adapter_id="source.fake",
adapter_version="1",
options={"skip_boilerplate": True},
)
second = normalization_cache_key(
asset=asset,
adapter_id="source.fake",
adapter_version="1",
options={"skip_boilerplate": True},
)
assert first == second
assert first.startswith("source-normalize:sha256:")
def test_source_registry_selects_fake_adapter_and_reports_unsupported():
registry = SourceAdapterRegistry([_fake_descriptor()])
asset = SourceAsset.from_path(SAMPLE_SOURCE)
descriptor, adapter, diagnostics = registry.select(asset)
assert descriptor is not None
assert descriptor.id == "source.fake"
assert adapter is not None
assert diagnostics == []
unsupported = SourceAsset(uri="example.bin", extension=".bin")
descriptor, adapter, diagnostics = registry.select(unsupported)
assert descriptor is None
assert adapter is None
assert diagnostics[0].code == "source.unsupported_format"
def test_source_registry_reports_missing_required_dependency():
descriptor = SourceAdapterDescriptor(
id="source.needs-missing",
version="1",
name="Missing Dependency Adapter",
operations=["read"],
media_types=[],
extensions=[".fake"],
factory=lambda: FakeSourceAdapter(_fake_descriptor("source.needs-missing")),
optional_dependencies=[
OptionalDependency(
name="definitely_missing_markitect_source_adapter_dependency",
package="missing-package",
required=True,
)
],
)
registry = SourceAdapterRegistry([descriptor])
_, _, diagnostics = registry.select(SourceAsset.from_path(SAMPLE_SOURCE))
assert diagnostics[0].code == "source.missing_dependency"
assert "definitely_missing_markitect_source_adapter_dependency" in diagnostics[0].details["missing"]
def test_source_registry_breaks_ambiguous_matches_by_adapter_id():
registry = SourceAdapterRegistry(
[
_fake_descriptor("source.b", confidence=80),
_fake_descriptor("source.a", confidence=80),
]
)
descriptor, _, diagnostics = registry.select(SourceAsset.from_path(SAMPLE_SOURCE))
assert descriptor is not None
assert descriptor.id == "source.a"
assert [diagnostic.code for diagnostic in diagnostics] == ["source.adapter_ambiguous"]
class FakeEntryPoint:
name = "fake"
def load(self):
return _fake_descriptor()
def test_discover_source_adapters_accepts_entry_point_descriptors():
registry = discover_source_adapters([FakeEntryPoint()])
assert registry.get("source.fake").name == "Fake Source Adapter"
def test_source_descriptor_maps_to_extension_descriptor():
extension = _fake_descriptor().to_extension_descriptor()
assert extension.kind == "source-adapter"
assert extension.input_contract == "SourceInspectRequest | SourceReadRequest"
assert "mkt source normalize" in extension.cli["commands"]
assert {capability.id for capability in extension.capabilities} >= {
"source",
"markdown",
"diagnostics",
"provenance",
}
def test_builtin_registry_exposes_source_adapter_framework():
registry = builtin_extension_registry()
descriptor = registry.get("source.adapter-registry")
assert descriptor.kind == "source-adapter-registry"
assert descriptor.metadata["entry_point_group"] == "markitect_tool.source_adapters"
assert "mkt source adapters" in descriptor.cli["commands"]
def test_inspect_and_normalize_source_api_use_injected_registry():
registry = SourceAdapterRegistry([_fake_descriptor()])
inspected = inspect_source(SAMPLE_SOURCE, registry=registry)
normalized = normalize_source(SAMPLE_SOURCE, registry=registry)
assert inspected.is_valid
assert inspected.metadata.title == "Fake Source"
assert normalized.is_valid
assert normalized.document is not None
assert normalized.document.markdown == NORMALIZED_MARKDOWN
def test_source_cli_uses_registry_and_emits_json(monkeypatch):
cli_module = importlib.import_module("markitect_tool.cli.main")
monkeypatch.setattr(
cli_module,
"default_source_adapter_registry",
lambda: SourceAdapterRegistry([_fake_descriptor()]),
)
result = CliRunner().invoke(cli_module.main, ["source", "adapters", "--format", "json"])
assert result.exit_code == 0, result.output
data = json.loads(result.output)
assert data["count"] == 1
assert data["adapters"][0]["id"] == "source.fake"
def test_source_cli_inspect_and_normalize(monkeypatch):
cli_module = importlib.import_module("markitect_tool.cli.main")
monkeypatch.setattr(
cli_module,
"default_source_adapter_registry",
lambda: SourceAdapterRegistry([_fake_descriptor()]),
)
runner = CliRunner()
inspected = runner.invoke(
cli_module.main,
["source", "inspect", str(SAMPLE_SOURCE), "--format", "json"],
)
normalized = runner.invoke(
cli_module.main,
["source", "normalize", str(SAMPLE_SOURCE), "--format", "markdown"],
)
assert inspected.exit_code == 0, inspected.output
assert json.loads(inspected.output)["metadata"]["title"] == "Fake Source"
assert normalized.exit_code == 0, normalized.output
assert normalized.output == NORMALIZED_MARKDOWN
def test_source_cli_markdown_output_suppresses_invalid_partial(monkeypatch):
cli_module = importlib.import_module("markitect_tool.cli.main")
monkeypatch.setattr(
cli_module,
"default_source_adapter_registry",
lambda: SourceAdapterRegistry(),
)
result = CliRunner(mix_stderr=False).invoke(
cli_module.main,
["source", "normalize", str(SAMPLE_SOURCE), "--format", "markdown"],
)
assert result.exit_code == 1
assert result.output == ""
assert "source.unsupported_format" in result.stderr
def test_source_examples_are_valid_json_fixtures():
for path in [
"examples/source-adapters/adapter-list.json",
"examples/source-adapters/inspect-result.json",
"examples/source-adapters/normalized-document.json",
]:
with open(path, encoding="utf-8") as handle:
data = json.load(handle)
assert data
def test_top_level_api_exports_source_contract():
assert api.SourceAsset
assert api.SourceAdapterDescriptor
assert api.SourceAdapterRegistry
assert api.default_source_adapter_registry
assert api.normalize_source
assert api.SOURCE_ADAPTER_ENTRY_POINT_GROUP == "markitect_tool.source_adapters"