generated from coulomb/repo-seed
141 lines
4.8 KiB
Python
141 lines
4.8 KiB
Python
from kontextual_engine import (
|
|
Actor,
|
|
ActorType,
|
|
AuditEvent,
|
|
AuditOutcome,
|
|
MemoryAuditPublisher,
|
|
MemoryRuntimeAdapterCapability,
|
|
MemoryRuntimeRegistry,
|
|
MemorySemanticIndex,
|
|
OperationContext,
|
|
)
|
|
|
|
|
|
def test_memory_runtime_adapter_capability_serializes_dependency_boundary() -> None:
|
|
capability = MemoryRuntimeAdapterCapability(
|
|
adapter_name="local-semantic-index",
|
|
adapter_kind="semantic_index",
|
|
supported_operations=("upsert_memory_nodes", "query_memory_nodes"),
|
|
required_dependencies=("numpy",),
|
|
deterministic_local=True,
|
|
metadata={"ownership": "optional-adapter"},
|
|
)
|
|
|
|
assert capability.to_dict() == {
|
|
"adapter_name": "local-semantic-index",
|
|
"adapter_kind": "semantic_index",
|
|
"supported_operations": ["upsert_memory_nodes", "query_memory_nodes"],
|
|
"required_dependencies": ["numpy"],
|
|
"deterministic_local": True,
|
|
"metadata": {"ownership": "optional-adapter"},
|
|
}
|
|
|
|
|
|
def test_optional_memory_adapter_ports_exchange_serializable_payloads() -> None:
|
|
semantic_index: MemorySemanticIndex = FakeSemanticIndex()
|
|
audit_publisher: MemoryAuditPublisher = CapturingAuditPublisher()
|
|
registry: MemoryRuntimeRegistry = InMemoryRuntimeRegistry()
|
|
context = OperationContext.create(
|
|
Actor.create(ActorType.AI_AGENT, actor_id="agent-adapter-test"),
|
|
correlation_id="corr-adapter-boundary",
|
|
)
|
|
audit_event = AuditEvent.from_context(
|
|
"memory.runtime.export",
|
|
"memory-graph:memgraph-test",
|
|
AuditOutcome.SUCCESS,
|
|
context,
|
|
details={"operation_id": "memexport-test"},
|
|
)
|
|
|
|
upsert_result = semantic_index.upsert_memory_nodes([])
|
|
query_result = semantic_index.query_memory_nodes(graph_id="memgraph-test", query="contract boundary")
|
|
audit_result = audit_publisher.publish_memory_audit_event(audit_event)
|
|
registry_result = registry.publish_runtime_envelope(
|
|
{
|
|
"schema_version": "kontextual.memory.runtime-export.v1",
|
|
"graph_id": "memgraph-test",
|
|
"nodes": [],
|
|
"audit_events": [audit_event.to_dict()],
|
|
}
|
|
)
|
|
fetched = registry.fetch_runtime_envelope(registry_result["reference"])
|
|
|
|
assert semantic_index.capabilities().adapter_kind == "semantic_index"
|
|
assert upsert_result == {"upserted": 0}
|
|
assert query_result[0]["graph_id"] == "memgraph-test"
|
|
assert audit_result["event_id"] == audit_event.event_id
|
|
assert registry_result["reference"] == "memory-registry://memgraph-test"
|
|
assert fetched["audit_events"][0]["operation"] == "memory.runtime.export"
|
|
|
|
|
|
class FakeSemanticIndex:
|
|
name = "fake-semantic-index"
|
|
|
|
def capabilities(self) -> MemoryRuntimeAdapterCapability:
|
|
return MemoryRuntimeAdapterCapability(
|
|
adapter_name=self.name,
|
|
adapter_kind="semantic_index",
|
|
supported_operations=("upsert_memory_nodes", "query_memory_nodes"),
|
|
deterministic_local=True,
|
|
)
|
|
|
|
def upsert_memory_nodes(self, nodes: list) -> dict:
|
|
return {"upserted": len(nodes)}
|
|
|
|
def query_memory_nodes(
|
|
self,
|
|
*,
|
|
graph_id: str,
|
|
query: str,
|
|
limit: int = 10,
|
|
filters: dict | None = None,
|
|
) -> list[dict]:
|
|
return [
|
|
{
|
|
"graph_id": graph_id,
|
|
"node_id": "memnode-test",
|
|
"score": 1.0,
|
|
"query": query,
|
|
"filters": filters or {},
|
|
"limit": limit,
|
|
}
|
|
]
|
|
|
|
|
|
class CapturingAuditPublisher:
|
|
name = "capturing-audit-publisher"
|
|
|
|
def capabilities(self) -> MemoryRuntimeAdapterCapability:
|
|
return MemoryRuntimeAdapterCapability(
|
|
adapter_name=self.name,
|
|
adapter_kind="audit_sink",
|
|
supported_operations=("publish_memory_audit_event",),
|
|
deterministic_local=True,
|
|
)
|
|
|
|
def publish_memory_audit_event(self, event: AuditEvent) -> dict:
|
|
return {"published": True, "event_id": event.event_id, "operation": event.operation}
|
|
|
|
|
|
class InMemoryRuntimeRegistry:
|
|
name = "in-memory-runtime-registry"
|
|
|
|
def __init__(self) -> None:
|
|
self.envelopes: dict[str, dict] = {}
|
|
|
|
def capabilities(self) -> MemoryRuntimeAdapterCapability:
|
|
return MemoryRuntimeAdapterCapability(
|
|
adapter_name=self.name,
|
|
adapter_kind="runtime_registry",
|
|
supported_operations=("publish_runtime_envelope", "fetch_runtime_envelope"),
|
|
deterministic_local=True,
|
|
)
|
|
|
|
def publish_runtime_envelope(self, envelope: dict) -> dict:
|
|
reference = f"memory-registry://{envelope['graph_id']}"
|
|
self.envelopes[reference] = dict(envelope)
|
|
return {"reference": reference, "graph_id": envelope["graph_id"]}
|
|
|
|
def fetch_runtime_envelope(self, reference: str) -> dict:
|
|
return self.envelopes[reference]
|