Files
kontextual-engine/tests/test_memory_graph_runtime.py

412 lines
16 KiB
Python

import pytest
from kontextual_engine import (
Actor,
ActorType,
DuplicateResourceError,
InMemoryMemoryGraphRepository,
LifecycleState,
MemoryCompactionRequest,
MemoryGraphImportResult,
MemoryNodeUpdateInstruction,
MemoryPackageExportRequest,
MemoryRefreshRequest,
MemoryRetentionRequest,
MemoryQueryRequest,
MemoryRuntimeService,
MemoryUpdateRequest,
OperationContext,
PolicyDecision,
ValidationError,
)
def test_markitect_memory_graph_import_preserves_contract_identity_and_spans() -> None:
imported = MemoryGraphImportResult.from_markitect_contract(
_graph_contract(),
profile_contract=_profile_contract(),
)
second_import = MemoryGraphImportResult.from_markitect_contract(_graph_contract())
assert imported.graph_id == second_import.graph_id
assert imported.contract_graph_id == "markitect-memory-decisions"
assert imported.profile is not None
assert imported.profile.contract_profile_id == "local-agent-memory"
assert imported.nodes[0].contract_node_id == "decision.contract-boundary"
assert imported.nodes[0].node_id == second_import.nodes[0].node_id
assert imported.nodes[0].source_spans[0].path == "workplans/MKTT-WP-0016.md"
assert imported.edges[0].source_node_id == imported.nodes[1].node_id
assert imported.events[0].package_refs == ("memory:package:example",)
def test_in_memory_memory_repository_persists_graph_and_append_only_events() -> None:
repo = InMemoryMemoryGraphRepository()
imported = MemoryGraphImportResult.from_markitect_contract(_graph_contract())
for node in imported.nodes:
repo.save_memory_node(node)
repo.save_memory_edge(imported.edges[0])
repo.append_memory_event(imported.events[0])
assert repo.list_memory_nodes(graph_id=imported.graph_id, kind="decision") == [imported.nodes[0]]
assert repo.list_memory_edges(source_node_id=imported.nodes[1].node_id) == [imported.edges[0]]
assert repo.list_memory_events(graph_id=imported.graph_id, kind="recorded") == [imported.events[0]]
with pytest.raises(DuplicateResourceError):
repo.append_memory_event(imported.events[0])
def test_memory_runtime_service_imports_contracts_and_reports_audit_context() -> None:
repo = InMemoryMemoryGraphRepository()
service = MemoryRuntimeService(repo)
actor = Actor.create(ActorType.AI_AGENT, actor_id="agent-codex", display_name="Codex")
context = OperationContext.create(actor, correlation_id="corr-memory")
summary = service.import_markitect_graph(
_graph_contract(),
profile_contract=_profile_contract(),
context=context,
)
assert summary.imported_nodes == 2
assert summary.imported_edges == 1
assert summary.appended_events == 1
assert summary.audit_event is not None
assert summary.audit_event.actor_id == "agent-codex"
assert summary.audit_event.correlation_id == "corr-memory"
assert repo.get_memory_profile(summary.profile_id).memory_kinds == (
"reasoning",
"knowledge",
"package",
)
def test_memory_query_returns_source_grounded_context_items_and_edges() -> None:
repo = InMemoryMemoryGraphRepository()
service = MemoryRuntimeService(repo)
summary = service.import_markitect_graph(_graph_contract())
result = service.query_memory(
MemoryQueryRequest(graph_id=summary.graph_id),
operation_context(),
)
decision = next(item for item in result.items if item.node.contract_node_id == "decision.contract-boundary")
assert result.success is True
assert result.total == 2
assert decision.to_context_item()["source_spans"][0]["path"] == "workplans/MKTT-WP-0016.md"
assert decision.to_context_item()["metadata"]["edges"][0]["contract_edge_id"] == "edge.boundary-support"
assert result.metadata["policy_enforced"] is True
assert result.audit_event is not None
assert result.audit_event.operation == "memory.query"
def test_memory_query_filters_denied_nodes_without_leaking_denied_text() -> None:
repo = InMemoryMemoryGraphRepository()
service = MemoryRuntimeService(repo, policy_gateway=DenyInternalMemoryPolicy())
graph = _graph_contract()
graph["nodes"].append(
{
"id": "claim.internal-secret",
"kind": "claim",
"text": "secret memory text must not leak",
"policy": {"labels": ["internal"]},
}
)
service.import_markitect_graph(graph)
result = service.query_memory(MemoryQueryRequest(text_contains="memory"), operation_context())
assert result.success is True
assert result.metadata["permission_filtered_count"] == 1
assert all(item.node.contract_node_id != "claim.internal-secret" for item in result.items)
assert "secret memory text must not leak" not in str(result.to_dict())
assert result.diagnostics[0].code == "memory.permission_denied"
assert result.diagnostics[0].details["contract_node_id"] == "claim.internal-secret"
assert result.audit_event is not None
assert result.audit_event.outcome.value == "partial"
def test_memory_query_scope_policy_fail_closed_returns_empty_result() -> None:
repo = InMemoryMemoryGraphRepository()
service = MemoryRuntimeService(repo, policy_gateway=BrokenMemoryPolicy())
service.import_markitect_graph(_graph_contract())
result = service.query_memory(MemoryQueryRequest(), operation_context())
assert result.success is False
assert result.items == ()
assert result.total == 0
assert result.diagnostics[0].details["policy_decision"]["effect"] == "fail_closed"
assert result.audit_event is not None
assert result.audit_event.outcome.value == "denied"
def test_memory_retention_marks_stale_refresh_clears_and_delete_requests() -> None:
repo = InMemoryMemoryGraphRepository()
service = MemoryRuntimeService(repo)
summary = service.import_markitect_graph(_graph_contract())
context = operation_context()
stale = service.apply_retention(
MemoryRetentionRequest(graph_id=summary.graph_id, stale_after_days=0),
context,
)
stale_node = repo.get_memory_node(stale.updated_nodes[0].node_id)
refreshed = service.refresh_memory(
MemoryRefreshRequest(graph_id=summary.graph_id, node_ids=(stale_node.contract_node_id,)),
context,
)
delete_requested = service.apply_retention(
MemoryRetentionRequest(graph_id=summary.graph_id, delete_after_days=0),
context,
)
assert stale.operation == "memory.retention.apply"
assert stale.appended_events[0].kind == "retention"
assert stale_node.metadata["review_state"] == "review_required"
assert stale_node.freshness["stale"] is True
assert refreshed.appended_events[0].kind == "refreshed"
assert repo.get_memory_node(stale_node.node_id).freshness["stale"] is False
assert all(update.after_lifecycle == LifecycleState.DELETE_REQUESTED.value for update in delete_requested.updated_nodes)
assert delete_requested.appended_events[0].kind == "retention"
def test_memory_compaction_creates_summary_preserves_spans_and_retires_sources() -> None:
repo = InMemoryMemoryGraphRepository()
service = MemoryRuntimeService(repo)
summary = service.import_markitect_graph(_graph_contract())
compacted = service.compact_memory(
MemoryCompactionRequest(
graph_id=summary.graph_id,
node_ids=("decision.contract-boundary", "constraint.no-runtime-services"),
summary_contract_node_id="compaction.boundary-summary",
),
operation_context(),
)
summary_node = compacted.created_nodes[0]
assert compacted.operation == "memory.compact"
assert summary_node.contract_node_id == "compaction.boundary-summary"
assert summary_node.kind == "memory"
assert summary_node.source_spans[0].path == "workplans/MKTT-WP-0016.md"
assert "decision.contract-boundary" in summary_node.metadata["compaction"]["source_contract_node_ids"]
assert compacted.appended_events[0].kind == "compacted"
source_nodes = [
repo.get_memory_node(update.node_id)
for update in compacted.updated_nodes
if update.action == "compacted_retired"
]
assert {node.lifecycle for node in source_nodes} == {LifecycleState.RETIRED}
def test_agent_safe_memory_update_plans_dry_run_and_requires_review_before_write() -> None:
repo = InMemoryMemoryGraphRepository()
service = MemoryRuntimeService(repo)
summary = service.import_markitect_graph(_graph_contract())
context = operation_context()
plan = service.plan_memory_update(
MemoryUpdateRequest(
graph_id=summary.graph_id,
instructions=(
MemoryNodeUpdateInstruction(
contract_node_id="claim.agent-safe-update",
kind="claim",
text="Agent-safe update is planned before durable write.",
source_spans=(
{
"path": "docs/memory-agent-safe.md",
"unit_kind": "section",
"selector": "sections[heading=Agent safe]",
"engine": "selector",
},
),
policy={"labels": ["public"]},
),
),
require_review=True,
reason="capture implementation decision",
),
context,
)
pending = service.apply_memory_update(plan, context)
assert plan.success is True
assert plan.dry_run is True
assert plan.review_required is True
assert plan.planned_updates[0].action == "create_node"
assert plan.planned_updates[0].source_explanation[0]["path"] == "docs/memory-agent-safe.md"
assert pending.success is False
assert pending.review_required is True
assert repo.list_memory_nodes(graph_id=summary.graph_id, kind="claim") == []
approved = service.apply_memory_update(plan, context, review_decision="approved")
assert approved.success is True
assert approved.appended_events[0].kind == "updated"
assert repo.list_memory_nodes(graph_id=summary.graph_id, kind="claim")[0].contract_node_id == (
"claim.agent-safe-update"
)
def test_memory_package_export_emits_markitect_context_package_inputs_without_denied_content() -> None:
repo = InMemoryMemoryGraphRepository()
service = MemoryRuntimeService(repo, policy_gateway=DenyInternalMemoryPolicy())
graph = _graph_contract()
graph["nodes"].append(
{
"id": "claim.internal-secret",
"kind": "claim",
"text": "secret memory text must not leak",
"policy": {"labels": ["internal"]},
}
)
summary = service.import_markitect_graph(graph)
export = service.export_context_package_inputs(
MemoryPackageExportRequest(
query=MemoryQueryRequest(graph_id=summary.graph_id, text_contains="memory"),
title="Memory Package Input",
intent="Export allowed runtime memories for Markitect packaging.",
namespace={"project": "kontextual-engine"},
budget={"max_items": 3},
),
operation_context(),
)
assert export.success is True
assert export.package_input["schema_version"] == "markitect.context-package.input.v1"
assert export.package_input["retrieval_recipes"][0]["kind"] == "memory-runtime-query"
assert export.package_input["items"][0]["source"]["path"] == "workplans/MKTT-WP-0016.md"
assert export.package_input["metadata"]["permission_filtered_count"] == 1
assert "secret memory text must not leak" not in str(export.to_dict())
assert export.audit_event is not None
assert export.audit_event.operation == "memory.package.export"
def test_memory_runtime_service_rejects_invalid_edge_contracts() -> None:
repo = InMemoryMemoryGraphRepository()
service = MemoryRuntimeService(repo)
graph = _graph_contract()
graph["edges"][0]["target"] = "missing-node"
with pytest.raises(ValidationError) as exc:
service.import_markitect_graph(graph)
assert "unknown node" in str(exc.value)
def operation_context() -> OperationContext:
actor = Actor.create(ActorType.HUMAN, actor_id="user-memory", display_name="Memory Tester")
return OperationContext.create(actor, correlation_id="corr-memory")
class DenyInternalMemoryPolicy:
def authorize(
self,
context: OperationContext,
action: str,
resource: str,
*,
resource_metadata: dict | None = None,
) -> PolicyDecision:
resource_metadata = resource_metadata or {}
labels = resource_metadata.get("policy", {}).get("labels", ())
if action == "memory.node.retrieve" and "internal" in labels:
return PolicyDecision.fail_closed(
context.actor.id,
action,
resource,
reason="internal memory denied in test policy",
context={"resource_metadata": resource_metadata},
)
return PolicyDecision.allow(
context.actor.id,
action,
resource,
context={"resource_metadata": resource_metadata},
)
class BrokenMemoryPolicy:
def authorize(
self,
context: OperationContext,
action: str,
resource: str,
*,
resource_metadata: dict | None = None,
) -> PolicyDecision:
if action == "memory.query":
raise RuntimeError("memory policy context unavailable")
return PolicyDecision.allow(context.actor.id, action, resource)
def _profile_contract() -> dict:
return {
"schema_version": "markitect.memory.profile.v1",
"id": "local-agent-memory",
"title": "Local Agent Memory",
"intent": "Runtime import fixture.",
"memory_kinds": ["reasoning", "knowledge", "package"],
"stores": {
"reasoning": "local-event-log",
"knowledge": "local-graph-store",
"package": "markitect-context-registry",
},
"activation": {"max_items": 6, "max_tokens": 1800},
"policy": {"required_labels": ["public"]},
}
def _graph_contract() -> dict:
return {
"schema_version": "markitect.memory.graph.v1",
"id": "markitect-memory-decisions",
"title": "Markitect Memory Decisions",
"intent": "Preserve memory boundary decisions for runtime import.",
"namespace": {"project": "markitect-tool", "task": "MKTT-WP-0016"},
"nodes": [
{
"id": "decision.contract-boundary",
"kind": "decision",
"text": "Markitect compiles memory graph selections into context packages.",
"source_spans": [
{
"path": "workplans/MKTT-WP-0016.md",
"unit_kind": "section",
"selector": "tasks[id=P16.5]",
"engine": "selector",
}
],
"metadata": {"title": "Contract boundary decision"},
},
{
"id": "constraint.no-runtime-services",
"kind": "constraint",
"text": "Runtime persistence belongs in kontextual-engine.",
"policy": {"labels": ["public"]},
},
],
"edges": [
{
"id": "edge.boundary-support",
"kind": "supports",
"source": "constraint.no-runtime-services",
"target": "decision.contract-boundary",
}
],
"events": [
{
"id": "event.initial-contract",
"kind": "recorded",
"timestamp": "2026-05-15T00:00:00Z",
"actor": "agent-codex",
"task": "KONT-WP-0017",
"package_refs": ["memory:package:example"],
"node_updates": [
{"node_id": "decision.contract-boundary", "operation": "import"}
],
}
],
}