Files
kontextual-engine/tests/test_memory_runtime_adapter_boundaries.py

141 lines
4.8 KiB
Python

from kontextual_engine import (
Actor,
ActorType,
AuditEvent,
AuditOutcome,
MemoryAuditPublisher,
MemoryRuntimeAdapterCapability,
MemoryRuntimeRegistry,
MemorySemanticIndex,
OperationContext,
)
def test_memory_runtime_adapter_capability_serializes_dependency_boundary() -> None:
capability = MemoryRuntimeAdapterCapability(
adapter_name="local-semantic-index",
adapter_kind="semantic_index",
supported_operations=("upsert_memory_nodes", "query_memory_nodes"),
required_dependencies=("numpy",),
deterministic_local=True,
metadata={"ownership": "optional-adapter"},
)
assert capability.to_dict() == {
"adapter_name": "local-semantic-index",
"adapter_kind": "semantic_index",
"supported_operations": ["upsert_memory_nodes", "query_memory_nodes"],
"required_dependencies": ["numpy"],
"deterministic_local": True,
"metadata": {"ownership": "optional-adapter"},
}
def test_optional_memory_adapter_ports_exchange_serializable_payloads() -> None:
semantic_index: MemorySemanticIndex = FakeSemanticIndex()
audit_publisher: MemoryAuditPublisher = CapturingAuditPublisher()
registry: MemoryRuntimeRegistry = InMemoryRuntimeRegistry()
context = OperationContext.create(
Actor.create(ActorType.AI_AGENT, actor_id="agent-adapter-test"),
correlation_id="corr-adapter-boundary",
)
audit_event = AuditEvent.from_context(
"memory.runtime.export",
"memory-graph:memgraph-test",
AuditOutcome.SUCCESS,
context,
details={"operation_id": "memexport-test"},
)
upsert_result = semantic_index.upsert_memory_nodes([])
query_result = semantic_index.query_memory_nodes(graph_id="memgraph-test", query="contract boundary")
audit_result = audit_publisher.publish_memory_audit_event(audit_event)
registry_result = registry.publish_runtime_envelope(
{
"schema_version": "kontextual.memory.runtime-export.v1",
"graph_id": "memgraph-test",
"nodes": [],
"audit_events": [audit_event.to_dict()],
}
)
fetched = registry.fetch_runtime_envelope(registry_result["reference"])
assert semantic_index.capabilities().adapter_kind == "semantic_index"
assert upsert_result == {"upserted": 0}
assert query_result[0]["graph_id"] == "memgraph-test"
assert audit_result["event_id"] == audit_event.event_id
assert registry_result["reference"] == "memory-registry://memgraph-test"
assert fetched["audit_events"][0]["operation"] == "memory.runtime.export"
class FakeSemanticIndex:
name = "fake-semantic-index"
def capabilities(self) -> MemoryRuntimeAdapterCapability:
return MemoryRuntimeAdapterCapability(
adapter_name=self.name,
adapter_kind="semantic_index",
supported_operations=("upsert_memory_nodes", "query_memory_nodes"),
deterministic_local=True,
)
def upsert_memory_nodes(self, nodes: list) -> dict:
return {"upserted": len(nodes)}
def query_memory_nodes(
self,
*,
graph_id: str,
query: str,
limit: int = 10,
filters: dict | None = None,
) -> list[dict]:
return [
{
"graph_id": graph_id,
"node_id": "memnode-test",
"score": 1.0,
"query": query,
"filters": filters or {},
"limit": limit,
}
]
class CapturingAuditPublisher:
name = "capturing-audit-publisher"
def capabilities(self) -> MemoryRuntimeAdapterCapability:
return MemoryRuntimeAdapterCapability(
adapter_name=self.name,
adapter_kind="audit_sink",
supported_operations=("publish_memory_audit_event",),
deterministic_local=True,
)
def publish_memory_audit_event(self, event: AuditEvent) -> dict:
return {"published": True, "event_id": event.event_id, "operation": event.operation}
class InMemoryRuntimeRegistry:
name = "in-memory-runtime-registry"
def __init__(self) -> None:
self.envelopes: dict[str, dict] = {}
def capabilities(self) -> MemoryRuntimeAdapterCapability:
return MemoryRuntimeAdapterCapability(
adapter_name=self.name,
adapter_kind="runtime_registry",
supported_operations=("publish_runtime_envelope", "fetch_runtime_envelope"),
deterministic_local=True,
)
def publish_runtime_envelope(self, envelope: dict) -> dict:
reference = f"memory-registry://{envelope['graph_id']}"
self.envelopes[reference] = dict(envelope)
return {"reference": reference, "graph_id": envelope["graph_id"]}
def fetch_runtime_envelope(self, reference: str) -> dict:
return self.envelopes[reference]