import json from pathlib import Path from phase_memory.external_adapters import ( ADAPTER_PACK_MANIFEST_SCHEMA, ExternalAdapterPack, adapter_pack_manifest, fake_external_adapter_pack, fake_external_runtime_config, live_shaped_adapter_pack, validate_adapter_pack_manifest, ) from phase_memory.service import ( assert_audit_sink_conformance, assert_context_compiler_conformance, assert_event_log_conformance, assert_graph_store_conformance, assert_policy_gateway_conformance, assert_runtime_registry_conformance, assert_semantic_index_conformance, health_report, resolve_runtime_adapters, runtime_from_config, ) FIXTURES = Path(__file__).parent / "fixtures" def _load(name: str): return json.loads((FIXTURES / name).read_text(encoding="utf-8")) def test_fake_external_adapter_pack_satisfies_public_conformance_helpers() -> None: pack = fake_external_adapter_pack() adapters = pack.adapters assert_graph_store_conformance(adapters["graph_store"]) assert_event_log_conformance(adapters["event_log"]) assert_context_compiler_conformance(adapters["package_compiler"]) assert_policy_gateway_conformance(adapters["policy_gateway"]) assert_audit_sink_conformance(adapters["audit_sink"]) assert_semantic_index_conformance(adapters["semantic_index"]) assert_runtime_registry_conformance(adapters["runtime_registry"]) assert pack.to_dict()["adapters"]["package_compiler"] == "FakeMarkitectPackageCompiler" def test_fake_external_adapter_pack_manifest_declares_compatibility() -> None: pack = fake_external_adapter_pack() manifest = adapter_pack_manifest(pack) diagnostics = validate_adapter_pack_manifest(pack) assert manifest["schema_version"] == ADAPTER_PACK_MANIFEST_SCHEMA assert manifest["adapters"]["package_compiler"]["required_conformance"] == "assert_context_compiler_conformance" assert manifest["adapters"]["audit_sink"]["required_capabilities"] == ["telemetry.audit.fake"] assert diagnostics == () def test_adapter_pack_manifest_reports_missing_capabilities() -> None: pack = fake_external_adapter_pack() incomplete = ExternalAdapterPack( name=pack.name, adapters=pack.adapters, capabilities=tuple(capability for capability in pack.capabilities if capability != "telemetry.audit.fake"), ownership_boundaries=pack.ownership_boundaries, required_conformance=pack.required_conformance, capability_requirements=pack.capability_requirements, metadata=pack.metadata, ) diagnostics = validate_adapter_pack_manifest(incomplete) assert [diagnostic.code for diagnostic in diagnostics] == ["missing_adapter_capability"] assert diagnostics[0].metadata["capability"] == "telemetry.audit.fake" def test_external_runtime_config_resolves_supplied_fake_pack() -> None: config = fake_external_runtime_config() pack = fake_external_adapter_pack() bundle = resolve_runtime_adapters(config, external_adapters=pack.adapters) runtime = runtime_from_config(config, external_adapters=pack.adapters) assert not [diagnostic for diagnostic in bundle.diagnostics if diagnostic.severity == "error"] assert {diagnostic.code for diagnostic in bundle.diagnostics} == {"external_adapter_declared"} runtime.import_graph(_load("memory-graph.json"), source_ref="fake-external") envelope = runtime.compile_package( { "schema_version": "markitect.memory.selection.v1", "id": "selection.external", "nodes": ["decision.boundary"], "events": [], } ) registry_receipt = bundle.runtime_registry.publish_runtime_envelope(envelope) fetched = bundle.runtime_registry.fetch_runtime_envelope(registry_receipt["reference"]) report = health_report(runtime, config=config) assert envelope["valid"] is True assert envelope["data"]["package_response"]["package_ref"].startswith("fake-markitect:") assert fetched["operation"] == "package.compile" assert report["ok"] is True assert report["adapters"]["package_compiler"] == "FakeMarkitectPackageCompiler" def test_live_shaped_adapter_pack_uses_same_manifest_and_conformance_contract() -> None: config = fake_external_runtime_config() pack = live_shaped_adapter_pack() manifest = adapter_pack_manifest(pack) bundle = resolve_runtime_adapters(config, external_adapters=pack.adapters) runtime = runtime_from_config(config, external_adapters=pack.adapters) assert validate_adapter_pack_manifest(pack) == () assert manifest["metadata"]["network_required"] is False assert manifest["adapters"]["package_compiler"]["required_capabilities"] == ["markitect.package.compile.live-shaped"] assert not [diagnostic for diagnostic in bundle.diagnostics if diagnostic.severity == "error"] envelope = runtime.compile_package( { "schema_version": "markitect.memory.selection.v1", "id": "selection.live-shaped", "nodes": ["decision.boundary"], "events": [], } ) registry_receipt = bundle.runtime_registry.publish_runtime_envelope(envelope) fetched = bundle.runtime_registry.fetch_runtime_envelope(registry_receipt["reference"]) export = runtime.export_audit_events({"operation": "package.compile"}) assert envelope["data"]["package_response"]["package_ref"].startswith("markitect-live-shaped:") assert fetched["operation"] == "package.compile" assert export["batch"]["retention"]["mode"] == "live_shaped_telemetry"