From 876f97c2888faf57bbb0c1a96ce598aa6220922c Mon Sep 17 00:00:00 2001 From: tegwick Date: Fri, 15 May 2026 09:15:28 +0200 Subject: [PATCH] feat(memory): add permission aware retrieval --- docs/memory-graph-runtime.md | 4 + src/kontextual_engine/__init__.py | 6 + src/kontextual_engine/services/__init__.py | 11 +- .../services/memory_service.py | 305 +++++++++++++++++- tests/test_memory_graph_runtime.py | 109 +++++++ ...NT-WP-0017-agentic-memory-graph-runtime.md | 8 +- 6 files changed, 438 insertions(+), 5 deletions(-) diff --git a/docs/memory-graph-runtime.md b/docs/memory-graph-runtime.md index 6e8b15e..8fcbd2b 100644 --- a/docs/memory-graph-runtime.md +++ b/docs/memory-graph-runtime.md @@ -22,6 +22,10 @@ with `markitect-tool`. tests and future service wiring. - `MemoryRuntimeService.import_markitect_graph()` persists an imported graph and can attach an audit event when an `OperationContext` is supplied. +- `MemoryRuntimeService.query_memory()` retrieves graph nodes through a scope + policy check plus per-node policy checks, returns source-grounded context + items, preserves safe denied diagnostics, and emits an audit event in the + result envelope. ## Boundary diff --git a/src/kontextual_engine/__init__.py b/src/kontextual_engine/__init__.py index 1b37e96..ac418ac 100644 --- a/src/kontextual_engine/__init__.py +++ b/src/kontextual_engine/__init__.py @@ -136,6 +136,9 @@ from .services import ( ContextEntityQueryResult, LexicalIndexRefreshResult, MemoryGraphImportSummary, + MemoryQueryRequest, + MemoryRetrievalItem, + MemoryRetrievalResult, MemoryRuntimeService, RelationshipChangeResult, RelationshipQueryItem, @@ -261,6 +264,9 @@ __all__ = [ "MemoryGraphRepository", "MemoryNodeRecord", "MemoryProfileRecord", + "MemoryQueryRequest", + "MemoryRetrievalItem", + "MemoryRetrievalResult", "MemoryRuntimeService", "MemorySourceSpan", "NormalizedDocument", diff --git a/src/kontextual_engine/services/__init__.py b/src/kontextual_engine/services/__init__.py index 2a837d4..1910678 100644 --- a/src/kontextual_engine/services/__init__.py +++ b/src/kontextual_engine/services/__init__.py @@ -7,7 +7,13 @@ from .asset_service import ( ) from .content_service import RepresentationContentResult, RepresentationContentStream, RepresentationContentService from .ingestion_service import AssetIngestionResult, AssetIngestionService -from .memory_service import MemoryGraphImportSummary, MemoryRuntimeService +from .memory_service import ( + MemoryGraphImportSummary, + MemoryQueryRequest, + MemoryRetrievalItem, + MemoryRetrievalResult, + MemoryRuntimeService, +) from .retrieval_service import ( AssetQueryItem, AssetQueryRequest, @@ -55,6 +61,9 @@ __all__ = [ "ContextEntityQueryResult", "LexicalIndexRefreshResult", "MemoryGraphImportSummary", + "MemoryQueryRequest", + "MemoryRetrievalItem", + "MemoryRetrievalResult", "MemoryRuntimeService", "RelationshipChangeResult", "RepresentationContentResult", diff --git a/src/kontextual_engine/services/memory_service.py b/src/kontextual_engine/services/memory_service.py index 9371a2d..dec4228 100644 --- a/src/kontextual_engine/services/memory_service.py +++ b/src/kontextual_engine/services/memory_service.py @@ -9,10 +9,13 @@ from kontextual_engine.core import ( AuditEvent, AuditOutcome, MemoryGraphImportResult, + MemoryEdgeRecord, + MemoryNodeRecord, OperationContext, + PolicyDecision, ) -from kontextual_engine.errors import ValidationError -from kontextual_engine.ports import MemoryGraphRepository +from kontextual_engine.errors import Diagnostic, ValidationError +from kontextual_engine.ports import AllowAllPolicyGateway, MemoryGraphRepository, PolicyGateway @dataclass(frozen=True) @@ -41,9 +44,118 @@ class MemoryGraphImportSummary: return data +@dataclass(frozen=True) +class MemoryQueryRequest: + graph_id: str | None = None + node_ids: tuple[str, ...] = () + kinds: tuple[str, ...] = () + text_contains: str | None = None + include_edges: bool = True + limit: int = 50 + offset: int = 0 + + def __post_init__(self) -> None: + object.__setattr__(self, "node_ids", tuple(self.node_ids)) + object.__setattr__(self, "kinds", tuple(self.kinds)) + + def to_dict(self) -> dict[str, Any]: + return { + "graph_id": self.graph_id, + "node_ids": list(self.node_ids), + "kinds": list(self.kinds), + "text_contains": self.text_contains, + "include_edges": self.include_edges, + "limit": self.limit, + "offset": self.offset, + } + + +@dataclass(frozen=True) +class MemoryRetrievalItem: + node: MemoryNodeRecord + edges: tuple[MemoryEdgeRecord, ...] = () + policy_decision: PolicyDecision | None = None + + def __post_init__(self) -> None: + object.__setattr__(self, "edges", tuple(self.edges)) + + def to_context_item(self) -> dict[str, Any]: + return { + "node_id": self.node.node_id, + "contract_node_id": self.node.contract_node_id, + "kind": self.node.kind, + "text": self.node.text, + "source_spans": [span.to_dict() for span in self.node.source_spans], + "provenance": [dict(item) for item in self.node.provenance], + "metadata": { + "graph_id": self.node.graph_id, + "contract_graph_id": self.node.contract_graph_id, + "edges": [edge.to_dict() for edge in self.edges], + }, + } + + def to_dict(self) -> dict[str, Any]: + return { + "node": self.node.to_dict(), + "edges": [edge.to_dict() for edge in self.edges], + "context_item": self.to_context_item(), + "policy_decision": self.policy_decision.to_dict() if self.policy_decision else None, + } + + +@dataclass(frozen=True) +class MemoryRetrievalResult: + request: MemoryQueryRequest + correlation_id: str + total: int + items: tuple[MemoryRetrievalItem, ...] = () + diagnostics: tuple[Diagnostic, ...] = () + audit_event: AuditEvent | None = None + metadata: dict[str, Any] = field(default_factory=dict) + success: bool = True + + def __post_init__(self) -> None: + object.__setattr__(self, "items", tuple(self.items)) + object.__setattr__(self, "diagnostics", tuple(self.diagnostics)) + + @property + def result_count(self) -> int: + return len(self.items) + + @property + def next_offset(self) -> int | None: + next_offset = self.request.offset + self.result_count + return next_offset if next_offset < self.total else None + + @property + def context_items(self) -> tuple[dict[str, Any], ...]: + return tuple(item.to_context_item() for item in self.items) + + def to_dict(self) -> dict[str, Any]: + return { + "query": self.request.to_dict(), + "correlation_id": self.correlation_id, + "success": self.success, + "total": self.total, + "result_count": self.result_count, + "next_offset": self.next_offset, + "metadata": dict(self.metadata), + "results": [item.to_dict() for item in self.items], + "context_items": list(self.context_items), + "diagnostics": [diagnostic.to_dict() for diagnostic in self.diagnostics], + "audit_event": self.audit_event.to_dict() if self.audit_event else None, + } + + class MemoryRuntimeService: - def __init__(self, repository: MemoryGraphRepository) -> None: + def __init__( + self, + repository: MemoryGraphRepository, + *, + policy_gateway: PolicyGateway | None = None, + ) -> None: self.repository = repository + self.policy_gateway = policy_gateway or AllowAllPolicyGateway() def import_markitect_graph( self, @@ -98,3 +210,190 @@ class MemoryRuntimeService: "intent": imported.intent, }, ) + + def query_memory( + self, + request: MemoryQueryRequest, + context: OperationContext, + ) -> MemoryRetrievalResult: + diagnostics = _validate_query(request) + if diagnostics: + return MemoryRetrievalResult( + request=request, + correlation_id=context.correlation_id, + total=0, + diagnostics=tuple(diagnostics), + success=False, + ) + + scope_resource = f"memory-graph:{request.graph_id or '*'}" + scope_decision = self._authorize( + context, + "memory.query", + scope_resource, + resource_metadata={"query": request.to_dict()}, + ) + if not scope_decision.allowed: + audit_event = AuditEvent.from_context( + "memory.query", + scope_resource, + AuditOutcome.DENIED, + context, + policy_decision=scope_decision, + details={"query": request.to_dict()}, + ) + return MemoryRetrievalResult( + request=request, + correlation_id=context.correlation_id, + total=0, + diagnostics=(_permission_denied_diagnostic(scope_decision),), + audit_event=audit_event, + success=False, + metadata={"policy_enforced": True, "permission_filtered_count": 0}, + ) + + nodes = self.repository.list_memory_nodes(graph_id=request.graph_id) + if request.node_ids: + wanted = set(request.node_ids) + nodes = [node for node in nodes if node.node_id in wanted or node.contract_node_id in wanted] + if request.kinds: + wanted_kinds = set(request.kinds) + nodes = [node for node in nodes if node.kind in wanted_kinds] + if request.text_contains: + needle = request.text_contains.casefold() + nodes = [node for node in nodes if needle in node.text.casefold()] + + denied_count = 0 + item_diagnostics: list[Diagnostic] = [] + allowed_nodes: list[tuple[MemoryNodeRecord, PolicyDecision]] = [] + for node in nodes: + decision = self._authorize( + context, + "memory.node.retrieve", + node.resource_id, + resource_metadata=_node_policy_metadata(node), + ) + if decision.allowed: + allowed_nodes.append((node, decision)) + else: + denied_count += 1 + item_diagnostics.append(_permission_denied_diagnostic(decision, node=node)) + + allowed_node_ids = {str(node.node_id) for node, _ in allowed_nodes} + items: list[MemoryRetrievalItem] = [] + for node, decision in allowed_nodes: + edges: tuple[MemoryEdgeRecord, ...] = () + if request.include_edges: + edges = tuple( + edge + for edge in self.repository.list_memory_edges(graph_id=node.graph_id) + if ( + edge.source_node_id == node.node_id + or edge.target_node_id == node.node_id + ) + and edge.source_node_id in allowed_node_ids + and edge.target_node_id in allowed_node_ids + ) + items.append(MemoryRetrievalItem(node=node, edges=edges, policy_decision=decision)) + + total = len(items) + page = tuple(items[request.offset : request.offset + request.limit]) + outcome = AuditOutcome.PARTIAL if denied_count else AuditOutcome.SUCCESS + audit_event = AuditEvent.from_context( + "memory.query", + scope_resource, + outcome, + context, + policy_decision=scope_decision, + details={ + "query": request.to_dict(), + "matched_count": len(nodes), + "permission_filtered_count": denied_count, + "result_count": len(page), + }, + ) + return MemoryRetrievalResult( + request=request, + correlation_id=context.correlation_id, + total=total, + items=page, + diagnostics=tuple(item_diagnostics), + audit_event=audit_event, + metadata={ + "zero_result": total == 0, + "policy_enforced": True, + "permission_filtered_count": denied_count, + "context_assembly": "memory-node-source-spans", + }, + ) + + def _authorize( + self, + context: OperationContext, + action: str, + resource: str, + *, + resource_metadata: dict[str, Any] | None = None, + ) -> PolicyDecision: + try: + return self.policy_gateway.authorize( + context, + action, + resource, + resource_metadata=resource_metadata, + ) + except Exception as exc: + return PolicyDecision.fail_closed( + context.actor.id, + action, + resource, + reason=str(exc) or "Memory policy gateway failed", + context={ + "gateway_error": type(exc).__name__, + "resource_metadata": resource_metadata or {}, + }, + ) + + +def _validate_query(request: MemoryQueryRequest) -> list[Diagnostic]: + diagnostics: list[Diagnostic] = [] + if request.limit < 1: + diagnostics.append(Diagnostic("error", "memory.query.limit_invalid", "limit must be at least 1")) + if request.offset < 0: + diagnostics.append(Diagnostic("error", "memory.query.offset_invalid", "offset must not be negative")) + return diagnostics + + +def _node_policy_metadata(node: MemoryNodeRecord) -> dict[str, Any]: + return { + "node_id": node.node_id, + "contract_node_id": node.contract_node_id, + "graph_id": node.graph_id, + "contract_graph_id": node.contract_graph_id, + "kind": node.kind, + "namespace": dict(node.namespace), + "policy": dict(node.policy), + "metadata": dict(node.metadata), + "source_spans": [span.to_dict() for span in node.source_spans], + } + + +def _permission_denied_diagnostic( + decision: PolicyDecision, + *, + node: MemoryNodeRecord | None = None, +) -> Diagnostic: + details: dict[str, Any] = { + "policy_decision": decision.to_dict(), + "resource": decision.resource, + } + if node: + details["node_id"] = node.node_id + details["contract_node_id"] = node.contract_node_id + details["kind"] = node.kind + return Diagnostic( + "warning", + "memory.permission_denied", + decision.reason or "Memory retrieval denied by policy.", + details=details, + ) diff --git a/tests/test_memory_graph_runtime.py b/tests/test_memory_graph_runtime.py index cd16ad3..96f27c7 100644 --- a/tests/test_memory_graph_runtime.py +++ b/tests/test_memory_graph_runtime.py @@ -6,8 +6,10 @@ from kontextual_engine import ( DuplicateResourceError, InMemoryMemoryGraphRepository, MemoryGraphImportResult, + MemoryQueryRequest, MemoryRuntimeService, OperationContext, + PolicyDecision, ValidationError, ) @@ -71,6 +73,67 @@ def test_memory_runtime_service_imports_contracts_and_reports_audit_context() -> ) +def test_memory_query_returns_source_grounded_context_items_and_edges() -> None: + repo = InMemoryMemoryGraphRepository() + service = MemoryRuntimeService(repo) + summary = service.import_markitect_graph(_graph_contract()) + + result = service.query_memory( + MemoryQueryRequest(graph_id=summary.graph_id), + operation_context(), + ) + decision = next(item for item in result.items if item.node.contract_node_id == "decision.contract-boundary") + + assert result.success is True + assert result.total == 2 + assert decision.to_context_item()["source_spans"][0]["path"] == "workplans/MKTT-WP-0016.md" + assert decision.to_context_item()["metadata"]["edges"][0]["contract_edge_id"] == "edge.boundary-support" + assert result.metadata["policy_enforced"] is True + assert result.audit_event is not None + assert result.audit_event.operation == "memory.query" + + +def test_memory_query_filters_denied_nodes_without_leaking_denied_text() -> None: + repo = InMemoryMemoryGraphRepository() + service = MemoryRuntimeService(repo, policy_gateway=DenyInternalMemoryPolicy()) + graph = _graph_contract() + graph["nodes"].append( + { + "id": "claim.internal-secret", + "kind": "claim", + "text": "secret memory text must not leak", + "policy": {"labels": ["internal"]}, + } + ) + service.import_markitect_graph(graph) + + result = service.query_memory(MemoryQueryRequest(text_contains="memory"), operation_context()) + + assert result.success is True + assert result.metadata["permission_filtered_count"] == 1 + assert all(item.node.contract_node_id != "claim.internal-secret" for item in result.items) + assert "secret memory text must not leak" not in str(result.to_dict()) + assert result.diagnostics[0].code == "memory.permission_denied" + assert result.diagnostics[0].details["contract_node_id"] == "claim.internal-secret" + assert result.audit_event is not None + assert result.audit_event.outcome.value == "partial" + + +def test_memory_query_scope_policy_fail_closed_returns_empty_result() -> None: + repo = InMemoryMemoryGraphRepository() + service = MemoryRuntimeService(repo, policy_gateway=BrokenMemoryPolicy()) + service.import_markitect_graph(_graph_contract()) + + result = service.query_memory(MemoryQueryRequest(), operation_context()) + + assert result.success is False + assert result.items == () + assert result.total == 0 + assert result.diagnostics[0].details["policy_decision"]["effect"] == "fail_closed" + assert result.audit_event is not None + assert result.audit_event.outcome.value == "denied" + + def test_memory_runtime_service_rejects_invalid_edge_contracts() -> None: repo = InMemoryMemoryGraphRepository() service = MemoryRuntimeService(repo) @@ -83,6 +146,52 @@ def test_memory_runtime_service_rejects_invalid_edge_contracts() -> None: assert "unknown node" in str(exc.value) +def operation_context() -> OperationContext: + actor = Actor.create(ActorType.HUMAN, actor_id="user-memory", display_name="Memory Tester") + return OperationContext.create(actor, correlation_id="corr-memory") + + +class DenyInternalMemoryPolicy: + def authorize( + self, + context: OperationContext, + action: str, + resource: str, + *, + resource_metadata: dict | None = None, + ) -> PolicyDecision: + resource_metadata = resource_metadata or {} + labels = resource_metadata.get("policy", {}).get("labels", ()) + if action == "memory.node.retrieve" and "internal" in labels: + return PolicyDecision.fail_closed( + context.actor.id, + action, + resource, + reason="internal memory denied in test policy", + context={"resource_metadata": resource_metadata}, + ) + return PolicyDecision.allow( + context.actor.id, + action, + resource, + context={"resource_metadata": resource_metadata}, + ) + + +class BrokenMemoryPolicy: + def authorize( + self, + context: OperationContext, + action: str, + resource: str, + *, + resource_metadata: dict | None = None, + ) -> PolicyDecision: + if action == "memory.query": + raise RuntimeError("memory policy context unavailable") + return PolicyDecision.allow(context.actor.id, action, resource) + + def _profile_contract() -> dict: return { "schema_version": "markitect.memory.profile.v1", diff --git a/workplans/KONT-WP-0017-agentic-memory-graph-runtime.md b/workplans/KONT-WP-0017-agentic-memory-graph-runtime.md index 680bc5d..7514cfe 100644 --- a/workplans/KONT-WP-0017-agentic-memory-graph-runtime.md +++ b/workplans/KONT-WP-0017-agentic-memory-graph-runtime.md @@ -80,6 +80,12 @@ Primary files: - `src/kontextual_engine/services/memory_service.py` - `docs/memory-graph-runtime.md` +The permission-aware retrieval slice is also implemented in +`MemoryRuntimeService.query_memory()`. It performs a scope policy check, +per-node policy checks, returns source-grounded context items, preserves denied +diagnostics without leaking denied node text, and emits an audit event in the +result envelope. + ## P17.1 - Import and map Markitect memory contracts ```task @@ -130,7 +136,7 @@ tests. ```task id: KONT-WP-0017-T003 -status: todo +status: done priority: high state_hub_task_id: "711b945e-19e6-4548-8ddb-6c2c364148ca" ```