graph store memory

This commit is contained in:
2026-05-15 11:02:11 +02:00
parent 6b3e391acf
commit c5110f61b0
3 changed files with 101 additions and 0 deletions

View File

@@ -6,6 +6,7 @@ from dataclasses import dataclass, field
from typing import Iterable
from kontextual_engine.core import (
AuditEvent,
LifecycleState,
MemoryEdgeRecord,
MemoryEventRecord,
@@ -21,6 +22,7 @@ class InMemoryMemoryGraphRepository:
nodes: dict[str, MemoryNodeRecord] = field(default_factory=dict)
edges: dict[str, MemoryEdgeRecord] = field(default_factory=dict)
events: dict[str, MemoryEventRecord] = field(default_factory=dict)
audit_events: dict[str, AuditEvent] = field(default_factory=dict)
def save_memory_profile(self, profile: MemoryProfileRecord) -> MemoryProfileRecord:
self.profiles[str(profile.profile_id)] = profile
@@ -124,3 +126,38 @@ class InMemoryMemoryGraphRepository:
if kind is not None:
events = [event for event in events if event.kind == kind]
return sorted(events, key=lambda event: (event.graph_id, event.timestamp, event.contract_event_id))
def save_memory_audit_event(self, event: AuditEvent) -> AuditEvent:
self.audit_events[str(event.event_id)] = event
return event
def get_memory_audit_event(self, event_id: str) -> AuditEvent:
try:
return self.audit_events[event_id]
except KeyError as exc:
raise NotFoundError("Memory audit event not found", details={"event_id": event_id}) from exc
def list_memory_audit_events(
self,
*,
graph_id: str | None = None,
correlation_id: str | None = None,
operation: str | None = None,
) -> list[AuditEvent]:
events: Iterable[AuditEvent] = self.audit_events.values()
if graph_id is not None:
events = [event for event in events if _audit_event_matches_graph(event, graph_id)]
if correlation_id is not None:
events = [event for event in events if event.correlation_id == correlation_id]
if operation is not None:
events = [event for event in events if event.operation == operation]
return sorted(events, key=lambda event: (event.occurred_at, event.event_id))
def _audit_event_matches_graph(event: AuditEvent, graph_id: str) -> bool:
if event.target == f"memory-graph:{graph_id}":
return True
if event.details.get("graph_id") == graph_id:
return True
request = event.details.get("request")
return isinstance(request, dict) and request.get("graph_id") == graph_id

View File

@@ -5,6 +5,7 @@ from __future__ import annotations
from typing import Protocol
from kontextual_engine.core import (
AuditEvent,
LifecycleState,
MemoryEdgeRecord,
MemoryEventRecord,
@@ -45,3 +46,13 @@ class MemoryGraphRepository(Protocol):
graph_id: str | None = None,
kind: str | None = None,
) -> list[MemoryEventRecord]: ...
def save_memory_audit_event(self, event: AuditEvent) -> AuditEvent: ...
def get_memory_audit_event(self, event_id: str) -> AuditEvent: ...
def list_memory_audit_events(
self,
*,
graph_id: str | None = None,
correlation_id: str | None = None,
operation: str | None = None,
) -> list[AuditEvent]: ...

View File

@@ -448,6 +448,59 @@ class MemoryPackageExportResult:
}
@dataclass(frozen=True)
class MemoryRuntimeExportRequest:
graph_id: str
include_nodes: bool = True
include_edges: bool = True
include_events: bool = True
include_audit_events: bool = True
include_retired: bool = True
event_kinds: tuple[str, ...] = ()
operations: tuple[str, ...] = ()
correlation_id: str | None = None
def __post_init__(self) -> None:
object.__setattr__(self, "event_kinds", tuple(self.event_kinds))
object.__setattr__(self, "operations", tuple(self.operations))
def to_dict(self) -> dict[str, Any]:
return {
"graph_id": self.graph_id,
"include_nodes": self.include_nodes,
"include_edges": self.include_edges,
"include_events": self.include_events,
"include_audit_events": self.include_audit_events,
"include_retired": self.include_retired,
"event_kinds": list(self.event_kinds),
"operations": list(self.operations),
"correlation_id": self.correlation_id,
}
@dataclass(frozen=True)
class MemoryRuntimeExportResult:
request: MemoryRuntimeExportRequest
correlation_id: str
envelope: dict[str, Any] = field(default_factory=dict)
audit_event: AuditEvent | None = None
diagnostics: tuple[Diagnostic, ...] = ()
success: bool = True
def __post_init__(self) -> None:
object.__setattr__(self, "diagnostics", tuple(self.diagnostics))
def to_dict(self) -> dict[str, Any]:
return {
"request": self.request.to_dict(),
"correlation_id": self.correlation_id,
"success": self.success,
"envelope": dict(self.envelope),
"audit_event": self.audit_event.to_dict() if self.audit_event else None,
"diagnostics": [diagnostic.to_dict() for diagnostic in self.diagnostics],
}
class MemoryRuntimeService:
def __init__(
self,