Files
phase-memory/tests/test_adapters.py

45 lines
1.6 KiB
Python

from phase_memory.adapters import (
AllowAllPolicyGateway,
InMemoryMemoryEventLog,
InMemoryMemoryGraphStore,
NoopContextPackageCompiler,
RecordingAuditSink,
)
from phase_memory.models import MemoryEdge, MemoryEvent, MemoryNode, ProfileIntent
from phase_memory.ports import graph_from_store
def test_in_memory_store_and_event_log_are_deterministic() -> None:
store = InMemoryMemoryGraphStore()
profile = ProfileIntent(profile_id="profile")
node = MemoryNode("node.a", "decision", "Boundary decision")
edge = MemoryEdge("edge.a", "governs", "node.a", "node.a")
store.save_profile(profile)
store.save_node(node)
store.save_edge(edge)
assert store.get_profile("profile") == profile
assert store.list_nodes() == [node]
assert store.list_edges(source="node.a") == [edge]
assert graph_from_store(store).nodes == (node,)
log = InMemoryMemoryEventLog()
event = MemoryEvent("event.a", "recorded")
assert log.append(event) == event
assert log.list_events(kind="recorded") == [event]
def test_local_policy_audit_and_compiler_adapters() -> None:
policy = AllowAllPolicyGateway()
audit = RecordingAuditSink()
compiler = NoopContextPackageCompiler()
decision = policy.authorize(action="read", resource="memory-node:node.a")
receipt = audit.record({"operation": "read", "allowed": decision.allowed})
package = compiler.compile_selection({"id": "selection.a", "nodes": ["node.a"], "events": []})
assert decision.allowed
assert receipt["recorded"] is True
assert package["package_id"] == "package:selection.a"