feat(memory): add permission aware retrieval

This commit is contained in:
2026-05-15 09:15:28 +02:00
parent d196be378c
commit 876f97c288
6 changed files with 438 additions and 5 deletions

View File

@@ -6,8 +6,10 @@ from kontextual_engine import (
DuplicateResourceError,
InMemoryMemoryGraphRepository,
MemoryGraphImportResult,
MemoryQueryRequest,
MemoryRuntimeService,
OperationContext,
PolicyDecision,
ValidationError,
)
@@ -71,6 +73,67 @@ def test_memory_runtime_service_imports_contracts_and_reports_audit_context() ->
)
def test_memory_query_returns_source_grounded_context_items_and_edges() -> None:
repo = InMemoryMemoryGraphRepository()
service = MemoryRuntimeService(repo)
summary = service.import_markitect_graph(_graph_contract())
result = service.query_memory(
MemoryQueryRequest(graph_id=summary.graph_id),
operation_context(),
)
decision = next(item for item in result.items if item.node.contract_node_id == "decision.contract-boundary")
assert result.success is True
assert result.total == 2
assert decision.to_context_item()["source_spans"][0]["path"] == "workplans/MKTT-WP-0016.md"
assert decision.to_context_item()["metadata"]["edges"][0]["contract_edge_id"] == "edge.boundary-support"
assert result.metadata["policy_enforced"] is True
assert result.audit_event is not None
assert result.audit_event.operation == "memory.query"
def test_memory_query_filters_denied_nodes_without_leaking_denied_text() -> None:
repo = InMemoryMemoryGraphRepository()
service = MemoryRuntimeService(repo, policy_gateway=DenyInternalMemoryPolicy())
graph = _graph_contract()
graph["nodes"].append(
{
"id": "claim.internal-secret",
"kind": "claim",
"text": "secret memory text must not leak",
"policy": {"labels": ["internal"]},
}
)
service.import_markitect_graph(graph)
result = service.query_memory(MemoryQueryRequest(text_contains="memory"), operation_context())
assert result.success is True
assert result.metadata["permission_filtered_count"] == 1
assert all(item.node.contract_node_id != "claim.internal-secret" for item in result.items)
assert "secret memory text must not leak" not in str(result.to_dict())
assert result.diagnostics[0].code == "memory.permission_denied"
assert result.diagnostics[0].details["contract_node_id"] == "claim.internal-secret"
assert result.audit_event is not None
assert result.audit_event.outcome.value == "partial"
def test_memory_query_scope_policy_fail_closed_returns_empty_result() -> None:
repo = InMemoryMemoryGraphRepository()
service = MemoryRuntimeService(repo, policy_gateway=BrokenMemoryPolicy())
service.import_markitect_graph(_graph_contract())
result = service.query_memory(MemoryQueryRequest(), operation_context())
assert result.success is False
assert result.items == ()
assert result.total == 0
assert result.diagnostics[0].details["policy_decision"]["effect"] == "fail_closed"
assert result.audit_event is not None
assert result.audit_event.outcome.value == "denied"
def test_memory_runtime_service_rejects_invalid_edge_contracts() -> None:
repo = InMemoryMemoryGraphRepository()
service = MemoryRuntimeService(repo)
@@ -83,6 +146,52 @@ def test_memory_runtime_service_rejects_invalid_edge_contracts() -> None:
assert "unknown node" in str(exc.value)
def operation_context() -> OperationContext:
actor = Actor.create(ActorType.HUMAN, actor_id="user-memory", display_name="Memory Tester")
return OperationContext.create(actor, correlation_id="corr-memory")
class DenyInternalMemoryPolicy:
def authorize(
self,
context: OperationContext,
action: str,
resource: str,
*,
resource_metadata: dict | None = None,
) -> PolicyDecision:
resource_metadata = resource_metadata or {}
labels = resource_metadata.get("policy", {}).get("labels", ())
if action == "memory.node.retrieve" and "internal" in labels:
return PolicyDecision.fail_closed(
context.actor.id,
action,
resource,
reason="internal memory denied in test policy",
context={"resource_metadata": resource_metadata},
)
return PolicyDecision.allow(
context.actor.id,
action,
resource,
context={"resource_metadata": resource_metadata},
)
class BrokenMemoryPolicy:
def authorize(
self,
context: OperationContext,
action: str,
resource: str,
*,
resource_metadata: dict | None = None,
) -> PolicyDecision:
if action == "memory.query":
raise RuntimeError("memory policy context unavailable")
return PolicyDecision.allow(context.actor.id, action, resource)
def _profile_contract() -> dict:
return {
"schema_version": "markitect.memory.profile.v1",