Files
phase-memory/tests/test_external_adapter_packs.py

67 lines
2.6 KiB
Python

import json
from pathlib import Path
from phase_memory.external_adapters import fake_external_adapter_pack, fake_external_runtime_config
from phase_memory.service import (
assert_audit_sink_conformance,
assert_context_compiler_conformance,
assert_event_log_conformance,
assert_graph_store_conformance,
assert_policy_gateway_conformance,
assert_runtime_registry_conformance,
assert_semantic_index_conformance,
health_report,
resolve_runtime_adapters,
runtime_from_config,
)
FIXTURES = Path(__file__).parent / "fixtures"
def _load(name: str):
return json.loads((FIXTURES / name).read_text(encoding="utf-8"))
def test_fake_external_adapter_pack_satisfies_public_conformance_helpers() -> None:
pack = fake_external_adapter_pack()
adapters = pack.adapters
assert_graph_store_conformance(adapters["graph_store"])
assert_event_log_conformance(adapters["event_log"])
assert_context_compiler_conformance(adapters["package_compiler"])
assert_policy_gateway_conformance(adapters["policy_gateway"])
assert_audit_sink_conformance(adapters["audit_sink"])
assert_semantic_index_conformance(adapters["semantic_index"])
assert_runtime_registry_conformance(adapters["runtime_registry"])
assert pack.to_dict()["adapters"]["package_compiler"] == "FakeMarkitectPackageCompiler"
def test_external_runtime_config_resolves_supplied_fake_pack() -> None:
config = fake_external_runtime_config()
pack = fake_external_adapter_pack()
bundle = resolve_runtime_adapters(config, external_adapters=pack.adapters)
runtime = runtime_from_config(config, external_adapters=pack.adapters)
assert not [diagnostic for diagnostic in bundle.diagnostics if diagnostic.severity == "error"]
assert {diagnostic.code for diagnostic in bundle.diagnostics} == {"external_adapter_declared"}
runtime.import_graph(_load("memory-graph.json"), source_ref="fake-external")
envelope = runtime.compile_package(
{
"schema_version": "markitect.memory.selection.v1",
"id": "selection.external",
"nodes": ["decision.boundary"],
"events": [],
}
)
registry_receipt = bundle.runtime_registry.publish_runtime_envelope(envelope)
fetched = bundle.runtime_registry.fetch_runtime_envelope(registry_receipt["reference"])
report = health_report(runtime, config=config)
assert envelope["valid"] is True
assert envelope["data"]["package_response"]["package_ref"].startswith("fake-markitect:")
assert fetched["operation"] == "package.compile"
assert report["ok"] is True
assert report["adapters"]["package_compiler"] == "FakeMarkitectPackageCompiler"