feat(memory): add lifecycle operations

This commit is contained in:
2026-05-15 09:30:46 +02:00
parent 876f97c288
commit 1be8019fa1
6 changed files with 710 additions and 2 deletions

View File

@@ -26,6 +26,13 @@ with `markitect-tool`.
policy check plus per-node policy checks, returns source-grounded context
items, preserves safe denied diagnostics, and emits an audit event in the
result envelope.
- `MemoryRuntimeService.apply_retention()` marks stale memories for review or
transitions old memories to `delete_requested` without physical deletion.
- `MemoryRuntimeService.refresh_memory()` clears stale markers and records a
refresh event.
- `MemoryRuntimeService.compact_memory()` creates a deterministic summary node,
preserves source spans/provenance, optionally retires source nodes, and appends
a compaction event.
## Boundary

View File

@@ -135,10 +135,15 @@ from .services import (
ContextEntityQueryRequest,
ContextEntityQueryResult,
LexicalIndexRefreshResult,
MemoryCompactionRequest,
MemoryGraphImportSummary,
MemoryLifecycleNodeUpdate,
MemoryLifecycleResult,
MemoryQueryRequest,
MemoryRefreshRequest,
MemoryRetrievalItem,
MemoryRetrievalResult,
MemoryRetentionRequest,
MemoryRuntimeService,
RelationshipChangeResult,
RelationshipQueryItem,
@@ -257,16 +262,21 @@ __all__ = [
"MetadataSchemaAssignment",
"MetadataValidationIssue",
"MetadataValueType",
"MemoryCompactionRequest",
"MemoryEdgeRecord",
"MemoryEventRecord",
"MemoryGraphImportResult",
"MemoryGraphImportSummary",
"MemoryGraphRepository",
"MemoryLifecycleNodeUpdate",
"MemoryLifecycleResult",
"MemoryNodeRecord",
"MemoryProfileRecord",
"MemoryQueryRequest",
"MemoryRefreshRequest",
"MemoryRetrievalItem",
"MemoryRetrievalResult",
"MemoryRetentionRequest",
"MemoryRuntimeService",
"MemorySourceSpan",
"NormalizedDocument",

View File

@@ -8,10 +8,15 @@ from .asset_service import (
from .content_service import RepresentationContentResult, RepresentationContentStream, RepresentationContentService
from .ingestion_service import AssetIngestionResult, AssetIngestionService
from .memory_service import (
MemoryCompactionRequest,
MemoryGraphImportSummary,
MemoryLifecycleNodeUpdate,
MemoryLifecycleResult,
MemoryQueryRequest,
MemoryRefreshRequest,
MemoryRetrievalItem,
MemoryRetrievalResult,
MemoryRetentionRequest,
MemoryRuntimeService,
)
from .retrieval_service import (
@@ -60,10 +65,15 @@ __all__ = [
"ContextEntityQueryRequest",
"ContextEntityQueryResult",
"LexicalIndexRefreshResult",
"MemoryCompactionRequest",
"MemoryGraphImportSummary",
"MemoryLifecycleNodeUpdate",
"MemoryLifecycleResult",
"MemoryQueryRequest",
"MemoryRefreshRequest",
"MemoryRetrievalItem",
"MemoryRetrievalResult",
"MemoryRetentionRequest",
"MemoryRuntimeService",
"RelationshipChangeResult",
"RepresentationContentResult",

View File

@@ -2,17 +2,23 @@
from __future__ import annotations
from dataclasses import dataclass, field
from dataclasses import dataclass, field, replace
from datetime import datetime, timezone
from typing import Any
from kontextual_engine.core import (
AuditEvent,
AuditOutcome,
LifecycleState,
MemoryGraphImportResult,
MemoryEdgeRecord,
MemoryEventRecord,
MemoryNodeRecord,
MemorySourceSpan,
OperationContext,
PolicyDecision,
new_id,
utc_now,
)
from kontextual_engine.errors import Diagnostic, ValidationError
from kontextual_engine.ports import AllowAllPolicyGateway, MemoryGraphRepository, PolicyGateway
@@ -147,6 +153,129 @@ class MemoryRetrievalResult:
}
@dataclass(frozen=True)
class MemoryRetentionRequest:
graph_id: str
stale_after_days: int | None = None
delete_after_days: int | None = None
kinds: tuple[str, ...] = ()
dry_run: bool = False
def __post_init__(self) -> None:
object.__setattr__(self, "kinds", tuple(self.kinds))
def to_dict(self) -> dict[str, Any]:
return {
"graph_id": self.graph_id,
"stale_after_days": self.stale_after_days,
"delete_after_days": self.delete_after_days,
"kinds": list(self.kinds),
"dry_run": self.dry_run,
}
@dataclass(frozen=True)
class MemoryRefreshRequest:
graph_id: str
node_ids: tuple[str, ...] = ()
kinds: tuple[str, ...] = ()
dry_run: bool = False
def __post_init__(self) -> None:
object.__setattr__(self, "node_ids", tuple(self.node_ids))
object.__setattr__(self, "kinds", tuple(self.kinds))
def to_dict(self) -> dict[str, Any]:
return {
"graph_id": self.graph_id,
"node_ids": list(self.node_ids),
"kinds": list(self.kinds),
"dry_run": self.dry_run,
}
@dataclass(frozen=True)
class MemoryCompactionRequest:
graph_id: str
node_ids: tuple[str, ...] = ()
kinds: tuple[str, ...] = ()
summary_contract_node_id: str | None = None
summary_text: str | None = None
retire_source_nodes: bool = True
dry_run: bool = False
def __post_init__(self) -> None:
object.__setattr__(self, "node_ids", tuple(self.node_ids))
object.__setattr__(self, "kinds", tuple(self.kinds))
def to_dict(self) -> dict[str, Any]:
return {
"graph_id": self.graph_id,
"node_ids": list(self.node_ids),
"kinds": list(self.kinds),
"summary_contract_node_id": self.summary_contract_node_id,
"summary_text": self.summary_text,
"retire_source_nodes": self.retire_source_nodes,
"dry_run": self.dry_run,
}
@dataclass(frozen=True)
class MemoryLifecycleNodeUpdate:
node_id: str
contract_node_id: str
action: str
before_lifecycle: str
after_lifecycle: str
details: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
return {
"node_id": self.node_id,
"contract_node_id": self.contract_node_id,
"action": self.action,
"before_lifecycle": self.before_lifecycle,
"after_lifecycle": self.after_lifecycle,
"details": dict(self.details),
}
@dataclass(frozen=True)
class MemoryLifecycleResult:
operation: str
correlation_id: str
graph_id: str
dry_run: bool
updated_nodes: tuple[MemoryLifecycleNodeUpdate, ...] = ()
created_nodes: tuple[MemoryNodeRecord, ...] = ()
appended_events: tuple[MemoryEventRecord, ...] = ()
diagnostics: tuple[Diagnostic, ...] = ()
audit_event: AuditEvent | None = None
metadata: dict[str, Any] = field(default_factory=dict)
success: bool = True
def __post_init__(self) -> None:
object.__setattr__(self, "updated_nodes", tuple(self.updated_nodes))
object.__setattr__(self, "created_nodes", tuple(self.created_nodes))
object.__setattr__(self, "appended_events", tuple(self.appended_events))
object.__setattr__(self, "diagnostics", tuple(self.diagnostics))
def to_dict(self) -> dict[str, Any]:
return {
"operation": self.operation,
"correlation_id": self.correlation_id,
"graph_id": self.graph_id,
"dry_run": self.dry_run,
"success": self.success,
"updated_nodes": [update.to_dict() for update in self.updated_nodes],
"created_nodes": [node.to_dict() for node in self.created_nodes],
"appended_events": [event.to_dict() for event in self.appended_events],
"diagnostics": [diagnostic.to_dict() for diagnostic in self.diagnostics],
"audit_event": self.audit_event.to_dict() if self.audit_event else None,
"metadata": dict(self.metadata),
}
class MemoryRuntimeService:
def __init__(
self,
@@ -327,6 +456,240 @@ class MemoryRuntimeService:
},
)
def apply_retention(
self,
request: MemoryRetentionRequest,
context: OperationContext,
) -> MemoryLifecycleResult:
diagnostics = _validate_retention_request(request)
if diagnostics:
return MemoryLifecycleResult(
operation="memory.retention.apply",
correlation_id=context.correlation_id,
graph_id=request.graph_id,
dry_run=request.dry_run,
diagnostics=tuple(diagnostics),
success=False,
)
now = utc_now()
nodes = _select_nodes(
self.repository.list_memory_nodes(graph_id=request.graph_id),
node_ids=(),
kinds=request.kinds,
)
updates: list[MemoryLifecycleNodeUpdate] = []
updated_nodes: list[MemoryNodeRecord] = []
for node in nodes:
age_days = _node_age_days(node, now)
if request.delete_after_days is not None and age_days >= request.delete_after_days:
updated = _node_with_lifecycle_marker(
node,
action="delete_requested",
lifecycle=LifecycleState.DELETE_REQUESTED,
now=now,
details={"age_days": age_days, "threshold_days": request.delete_after_days},
)
updates.append(_node_update(node, updated, "delete_requested", age_days=age_days))
updated_nodes.append(updated)
elif request.stale_after_days is not None and age_days >= request.stale_after_days:
updated = _node_with_lifecycle_marker(
node,
action="stale_review_required",
lifecycle=node.lifecycle,
now=now,
details={"age_days": age_days, "threshold_days": request.stale_after_days},
)
updates.append(_node_update(node, updated, "stale_review_required", age_days=age_days))
updated_nodes.append(updated)
event = None
if updated_nodes and not request.dry_run:
for node in updated_nodes:
self.repository.save_memory_node(node)
event = self._append_lifecycle_event(
"retention",
request.graph_id,
context,
node_updates=[update.to_dict() for update in updates],
metadata={"request": request.to_dict()},
)
audit_event = AuditEvent.from_context(
"memory.retention.apply",
f"memory-graph:{request.graph_id}",
AuditOutcome.DRY_RUN if request.dry_run else AuditOutcome.SUCCESS,
context,
details={
"request": request.to_dict(),
"updated_nodes": len(updates),
"event_id": event.event_id if event else None,
},
)
return MemoryLifecycleResult(
operation="memory.retention.apply",
correlation_id=context.correlation_id,
graph_id=request.graph_id,
dry_run=request.dry_run,
updated_nodes=tuple(updates),
appended_events=(event,) if event else (),
audit_event=audit_event,
metadata={"matched_nodes": len(nodes), "updated_nodes": len(updates)},
)
def refresh_memory(
self,
request: MemoryRefreshRequest,
context: OperationContext,
) -> MemoryLifecycleResult:
diagnostics = _validate_node_selection_request(request.graph_id, request.node_ids)
if diagnostics:
return MemoryLifecycleResult(
operation="memory.refresh",
correlation_id=context.correlation_id,
graph_id=request.graph_id,
dry_run=request.dry_run,
diagnostics=tuple(diagnostics),
success=False,
)
now = utc_now()
nodes = _select_nodes(
self.repository.list_memory_nodes(graph_id=request.graph_id),
node_ids=request.node_ids,
kinds=request.kinds,
)
updates: list[MemoryLifecycleNodeUpdate] = []
refreshed_nodes: list[MemoryNodeRecord] = []
for node in nodes:
updated = _refreshed_node(node, now)
updates.append(_node_update(node, updated, "refreshed"))
refreshed_nodes.append(updated)
event = None
if refreshed_nodes and not request.dry_run:
for node in refreshed_nodes:
self.repository.save_memory_node(node)
event = self._append_lifecycle_event(
"refreshed",
request.graph_id,
context,
node_updates=[update.to_dict() for update in updates],
metadata={"request": request.to_dict()},
)
audit_event = AuditEvent.from_context(
"memory.refresh",
f"memory-graph:{request.graph_id}",
AuditOutcome.DRY_RUN if request.dry_run else AuditOutcome.SUCCESS,
context,
details={
"request": request.to_dict(),
"updated_nodes": len(updates),
"event_id": event.event_id if event else None,
},
)
return MemoryLifecycleResult(
operation="memory.refresh",
correlation_id=context.correlation_id,
graph_id=request.graph_id,
dry_run=request.dry_run,
updated_nodes=tuple(updates),
appended_events=(event,) if event else (),
audit_event=audit_event,
metadata={"matched_nodes": len(nodes), "updated_nodes": len(updates)},
)
def compact_memory(
self,
request: MemoryCompactionRequest,
context: OperationContext,
) -> MemoryLifecycleResult:
diagnostics = _validate_node_selection_request(request.graph_id, request.node_ids)
if diagnostics:
return MemoryLifecycleResult(
operation="memory.compact",
correlation_id=context.correlation_id,
graph_id=request.graph_id,
dry_run=request.dry_run,
diagnostics=tuple(diagnostics),
success=False,
)
now = utc_now()
nodes = _select_nodes(
self.repository.list_memory_nodes(graph_id=request.graph_id),
node_ids=request.node_ids,
kinds=request.kinds,
)
if not nodes:
return MemoryLifecycleResult(
operation="memory.compact",
correlation_id=context.correlation_id,
graph_id=request.graph_id,
dry_run=request.dry_run,
diagnostics=(
Diagnostic("warning", "memory.compaction.empty", "No memory nodes matched compaction request."),
),
metadata={"request": request.to_dict()},
)
operation_id = new_id("memcompact")
summary = _summary_node_for_compaction(request, nodes, operation_id, now)
updates: list[MemoryLifecycleNodeUpdate] = [
MemoryLifecycleNodeUpdate(
node_id=str(summary.node_id),
contract_node_id=summary.contract_node_id,
action="created_compaction_summary",
before_lifecycle="missing",
after_lifecycle=summary.lifecycle.value,
details={"source_node_ids": [node.node_id for node in nodes]},
)
]
retired_nodes: list[MemoryNodeRecord] = []
if request.retire_source_nodes:
for node in nodes:
retired = _node_with_lifecycle_marker(
node,
action="compacted_retired",
lifecycle=LifecycleState.RETIRED,
now=now,
details={"summary_node_id": summary.node_id, "operation_id": operation_id},
)
updates.append(_node_update(node, retired, "compacted_retired"))
retired_nodes.append(retired)
event = None
if not request.dry_run:
self.repository.save_memory_node(summary)
for node in retired_nodes:
self.repository.save_memory_node(node)
event = self._append_lifecycle_event(
"compacted",
request.graph_id,
context,
node_updates=[update.to_dict() for update in updates],
metadata={
"request": request.to_dict(),
"summary_node_id": summary.node_id,
"operation_id": operation_id,
},
)
audit_event = AuditEvent.from_context(
"memory.compact",
f"memory-graph:{request.graph_id}",
AuditOutcome.DRY_RUN if request.dry_run else AuditOutcome.SUCCESS,
context,
details={
"request": request.to_dict(),
"summary_node_id": summary.node_id,
"source_nodes": len(nodes),
"event_id": event.event_id if event else None,
},
)
return MemoryLifecycleResult(
operation="memory.compact",
correlation_id=context.correlation_id,
graph_id=request.graph_id,
dry_run=request.dry_run,
updated_nodes=tuple(updates),
created_nodes=(summary,),
appended_events=(event,) if event else (),
audit_event=audit_event,
metadata={"source_nodes": len(nodes), "retired_source_nodes": len(retired_nodes)},
)
def _authorize(
self,
context: OperationContext,
@@ -354,6 +717,32 @@ class MemoryRuntimeService:
},
)
def _append_lifecycle_event(
self,
event_kind: str,
graph_id: str,
context: OperationContext,
*,
node_updates: list[dict[str, Any]],
metadata: dict[str, Any],
) -> MemoryEventRecord:
nodes = self.repository.list_memory_nodes(graph_id=graph_id)
contract_graph_id = nodes[0].contract_graph_id if nodes else graph_id
event = MemoryEventRecord(
graph_id=graph_id,
contract_graph_id=contract_graph_id,
contract_event_id=f"runtime.{event_kind}.{new_id('event')}",
kind=event_kind,
timestamp=utc_now().isoformat(),
actor_id=context.actor.id,
thread=_optional_str(context.request_scope.get("thread")),
task=_optional_str(context.request_scope.get("task")),
node_updates=tuple(node_updates),
policy={"correlation_id": context.correlation_id},
metadata=metadata,
)
return self.repository.append_memory_event(event)
def _validate_query(request: MemoryQueryRequest) -> list[Diagnostic]:
diagnostics: list[Diagnostic] = []
@@ -397,3 +786,225 @@ def _permission_denied_diagnostic(
decision.reason or "Memory retrieval denied by policy.",
details=details,
)
def _validate_retention_request(request: MemoryRetentionRequest) -> list[Diagnostic]:
diagnostics = _validate_node_selection_request(request.graph_id, ())
if request.stale_after_days is None and request.delete_after_days is None:
diagnostics.append(
Diagnostic(
"error",
"memory.retention.threshold_missing",
"retention requires stale_after_days or delete_after_days.",
)
)
if request.stale_after_days is not None and request.stale_after_days < 0:
diagnostics.append(Diagnostic("error", "memory.retention.stale_invalid", "stale_after_days must be >= 0."))
if request.delete_after_days is not None and request.delete_after_days < 0:
diagnostics.append(Diagnostic("error", "memory.retention.delete_invalid", "delete_after_days must be >= 0."))
return diagnostics
def _validate_node_selection_request(graph_id: str, node_ids: tuple[str, ...]) -> list[Diagnostic]:
diagnostics: list[Diagnostic] = []
if not graph_id:
diagnostics.append(Diagnostic("error", "memory.graph_id_missing", "graph_id is required."))
if any(not node_id for node_id in node_ids):
diagnostics.append(Diagnostic("error", "memory.node_id_invalid", "node_ids must not contain empty ids."))
return diagnostics
def _select_nodes(
nodes: list[MemoryNodeRecord],
*,
node_ids: tuple[str, ...],
kinds: tuple[str, ...],
) -> list[MemoryNodeRecord]:
selected = nodes
if node_ids:
wanted = set(node_ids)
selected = [node for node in selected if node.node_id in wanted or node.contract_node_id in wanted]
if kinds:
wanted_kinds = set(kinds)
selected = [node for node in selected if node.kind in wanted_kinds]
return selected
def _node_age_days(node: MemoryNodeRecord, now: datetime) -> int:
timestamp = _node_reference_time(node)
if timestamp is None:
return 0
return max(0, (now - timestamp).days)
def _node_reference_time(node: MemoryNodeRecord) -> datetime | None:
for key in (
"last_verified_at",
"refreshed_at",
"observed_at",
"updated_at",
"created_at",
"compiled_at",
):
if key in node.freshness:
parsed = _parse_datetime(node.freshness[key])
if parsed:
return parsed
return _parse_datetime(node.updated_at) or _parse_datetime(node.created_at)
def _parse_datetime(value: Any) -> datetime | None:
if value is None:
return None
if isinstance(value, datetime):
parsed = value
else:
text = str(value)
if not text:
return None
try:
parsed = datetime.fromisoformat(text.replace("Z", "+00:00"))
except ValueError:
return None
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
return parsed.astimezone(timezone.utc)
def _node_with_lifecycle_marker(
node: MemoryNodeRecord,
*,
action: str,
lifecycle: LifecycleState,
now: datetime,
details: dict[str, Any],
) -> MemoryNodeRecord:
at = now.isoformat()
freshness = dict(node.freshness)
freshness["last_lifecycle_at"] = at
if action == "stale_review_required":
freshness["stale"] = True
freshness["stale_at"] = at
metadata = dict(node.metadata)
metadata["review_state"] = _review_state_for_action(action)
lifecycle_meta = dict(metadata.get("lifecycle") or {})
lifecycle_meta.update({"action": action, "at": at, "details": dict(details)})
metadata["lifecycle"] = lifecycle_meta
return replace(
node,
lifecycle=lifecycle,
freshness=freshness,
metadata=metadata,
updated_at=at,
)
def _review_state_for_action(action: str) -> str:
if action == "delete_requested":
return "delete_requested"
if action == "stale_review_required":
return "review_required"
if action == "compacted_retired":
return "compacted"
return action
def _refreshed_node(node: MemoryNodeRecord, now: datetime) -> MemoryNodeRecord:
at = now.isoformat()
freshness = dict(node.freshness)
freshness["refreshed_at"] = at
freshness["stale"] = False
freshness.pop("stale_at", None)
metadata = dict(node.metadata)
metadata["review_state"] = "current"
lifecycle_meta = dict(metadata.get("lifecycle") or {})
lifecycle_meta.update({"action": "refreshed", "at": at})
metadata["lifecycle"] = lifecycle_meta
return replace(node, freshness=freshness, metadata=metadata, updated_at=at)
def _node_update(
before: MemoryNodeRecord,
after: MemoryNodeRecord,
action: str,
**details: Any,
) -> MemoryLifecycleNodeUpdate:
return MemoryLifecycleNodeUpdate(
node_id=str(after.node_id),
contract_node_id=after.contract_node_id,
action=action,
before_lifecycle=before.lifecycle.value,
after_lifecycle=after.lifecycle.value,
details=details,
)
def _summary_node_for_compaction(
request: MemoryCompactionRequest,
nodes: list[MemoryNodeRecord],
operation_id: str,
now: datetime,
) -> MemoryNodeRecord:
first = nodes[0]
at = now.isoformat()
source_node_ids = [node.node_id for node in nodes]
source_contract_node_ids = [node.contract_node_id for node in nodes]
return MemoryNodeRecord(
graph_id=request.graph_id,
contract_graph_id=first.contract_graph_id,
contract_node_id=request.summary_contract_node_id or f"compaction.{operation_id}",
kind="memory",
text=request.summary_text or _deterministic_compaction_summary(nodes),
namespace=dict(first.namespace),
source_spans=tuple(_unique_source_spans(nodes)),
provenance=(
{
"kind": "memory-compaction",
"operation_id": operation_id,
"source_node_ids": source_node_ids,
"source_contract_node_ids": source_contract_node_ids,
},
),
freshness={"compacted_at": at, "stale": False},
metadata={
"title": "Compacted memory summary",
"review_state": "current",
"compaction": {
"operation_id": operation_id,
"source_node_ids": source_node_ids,
"source_contract_node_ids": source_contract_node_ids,
},
},
created_at=at,
updated_at=at,
)
def _unique_source_spans(nodes: list[MemoryNodeRecord]) -> list[MemorySourceSpan]:
spans: list[MemorySourceSpan] = []
seen: set[tuple] = set()
for node in nodes:
for span in node.source_spans:
key = tuple(sorted(span.to_dict().items()))
if key in seen:
continue
seen.add(key)
spans.append(span)
return spans
def _deterministic_compaction_summary(nodes: list[MemoryNodeRecord]) -> str:
lines = ["Compacted memory summary:"]
for node in nodes:
text = " ".join(node.text.split())
if len(text) > 180:
text = text[:177].rstrip() + "..."
lines.append(f"- {node.kind} {node.contract_node_id}: {text}")
return "\n".join(lines)
def _optional_str(value: Any) -> str | None:
if value is None:
return None
text = str(value)
return text if text else None

View File

@@ -5,7 +5,11 @@ from kontextual_engine import (
ActorType,
DuplicateResourceError,
InMemoryMemoryGraphRepository,
LifecycleState,
MemoryCompactionRequest,
MemoryGraphImportResult,
MemoryRefreshRequest,
MemoryRetentionRequest,
MemoryQueryRequest,
MemoryRuntimeService,
OperationContext,
@@ -134,6 +138,65 @@ def test_memory_query_scope_policy_fail_closed_returns_empty_result() -> None:
assert result.audit_event.outcome.value == "denied"
def test_memory_retention_marks_stale_refresh_clears_and_delete_requests() -> None:
repo = InMemoryMemoryGraphRepository()
service = MemoryRuntimeService(repo)
summary = service.import_markitect_graph(_graph_contract())
context = operation_context()
stale = service.apply_retention(
MemoryRetentionRequest(graph_id=summary.graph_id, stale_after_days=0),
context,
)
stale_node = repo.get_memory_node(stale.updated_nodes[0].node_id)
refreshed = service.refresh_memory(
MemoryRefreshRequest(graph_id=summary.graph_id, node_ids=(stale_node.contract_node_id,)),
context,
)
delete_requested = service.apply_retention(
MemoryRetentionRequest(graph_id=summary.graph_id, delete_after_days=0),
context,
)
assert stale.operation == "memory.retention.apply"
assert stale.appended_events[0].kind == "retention"
assert stale_node.metadata["review_state"] == "review_required"
assert stale_node.freshness["stale"] is True
assert refreshed.appended_events[0].kind == "refreshed"
assert repo.get_memory_node(stale_node.node_id).freshness["stale"] is False
assert all(update.after_lifecycle == LifecycleState.DELETE_REQUESTED.value for update in delete_requested.updated_nodes)
assert delete_requested.appended_events[0].kind == "retention"
def test_memory_compaction_creates_summary_preserves_spans_and_retires_sources() -> None:
repo = InMemoryMemoryGraphRepository()
service = MemoryRuntimeService(repo)
summary = service.import_markitect_graph(_graph_contract())
compacted = service.compact_memory(
MemoryCompactionRequest(
graph_id=summary.graph_id,
node_ids=("decision.contract-boundary", "constraint.no-runtime-services"),
summary_contract_node_id="compaction.boundary-summary",
),
operation_context(),
)
summary_node = compacted.created_nodes[0]
assert compacted.operation == "memory.compact"
assert summary_node.contract_node_id == "compaction.boundary-summary"
assert summary_node.kind == "memory"
assert summary_node.source_spans[0].path == "workplans/MKTT-WP-0016.md"
assert "decision.contract-boundary" in summary_node.metadata["compaction"]["source_contract_node_ids"]
assert compacted.appended_events[0].kind == "compacted"
source_nodes = [
repo.get_memory_node(update.node_id)
for update in compacted.updated_nodes
if update.action == "compacted_retired"
]
assert {node.lifecycle for node in source_nodes} == {LifecycleState.RETIRED}
def test_memory_runtime_service_rejects_invalid_edge_contracts() -> None:
repo = InMemoryMemoryGraphRepository()
service = MemoryRuntimeService(repo)

View File

@@ -86,6 +86,13 @@ per-node policy checks, returns source-grounded context items, preserves denied
diagnostics without leaking denied node text, and emits an audit event in the
result envelope.
The lifecycle slice is implemented with deterministic local operations:
`apply_retention()` marks stale memories for review or transitions old memories
to `delete_requested`; `refresh_memory()` clears stale markers; and
`compact_memory()` creates a provenance/source-span preserving summary node,
optionally retiring the source nodes. Each mutating operation appends a memory
event and returns audit metadata.
## P17.1 - Import and map Markitect memory contracts
```task
@@ -158,7 +165,7 @@ Output: service/domain API, permission tests, and denied-access diagnostics.
```task
id: KONT-WP-0017-T004
status: todo
status: done
priority: high
state_hub_task_id: "853807fe-53ac-440e-823f-8d9b0b7ce4b7"
```