Files
kontextual-engine/src/kontextual_engine/services/memory_service.py

101 lines
3.4 KiB
Python

"""Application service for operational memory graph imports."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
from kontextual_engine.core import (
AuditEvent,
AuditOutcome,
MemoryGraphImportResult,
OperationContext,
)
from kontextual_engine.errors import ValidationError
from kontextual_engine.ports import MemoryGraphRepository
@dataclass(frozen=True)
class MemoryGraphImportSummary:
graph_id: str
contract_graph_id: str
imported_nodes: int
imported_edges: int
appended_events: int
profile_id: str | None = None
audit_event: AuditEvent | None = None
metadata: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
data = {
"graph_id": self.graph_id,
"contract_graph_id": self.contract_graph_id,
"profile_id": self.profile_id,
"imported_nodes": self.imported_nodes,
"imported_edges": self.imported_edges,
"appended_events": self.appended_events,
"metadata": dict(self.metadata),
}
if self.audit_event:
data["audit_event"] = self.audit_event.to_dict()
return data
class MemoryRuntimeService:
def __init__(self, repository: MemoryGraphRepository) -> None:
self.repository = repository
def import_markitect_graph(
self,
graph_contract: dict[str, Any],
*,
profile_contract: dict[str, Any] | None = None,
context: OperationContext | None = None,
) -> MemoryGraphImportSummary:
try:
imported = MemoryGraphImportResult.from_markitect_contract(
graph_contract,
profile_contract=profile_contract,
)
except ValueError as exc:
raise ValidationError(str(exc), details={"operation": "memory.import_markitect_graph"}) from exc
if imported.profile:
self.repository.save_memory_profile(imported.profile)
for node in imported.nodes:
self.repository.save_memory_node(node)
for edge in imported.edges:
self.repository.save_memory_edge(edge)
for event in imported.events:
self.repository.append_memory_event(event)
audit_event = None
if context:
audit_event = AuditEvent.from_context(
"memory.import_markitect_graph",
f"memory-graph:{imported.graph_id}",
AuditOutcome.SUCCESS,
context,
details={
"contract_graph_id": imported.contract_graph_id,
"profile_id": imported.profile.profile_id if imported.profile else None,
"nodes": len(imported.nodes),
"edges": len(imported.edges),
"events": len(imported.events),
},
)
return MemoryGraphImportSummary(
graph_id=imported.graph_id,
contract_graph_id=imported.contract_graph_id,
profile_id=imported.profile.profile_id if imported.profile else None,
imported_nodes=len(imported.nodes),
imported_edges=len(imported.edges),
appended_events=len(imported.events),
audit_event=audit_event,
metadata={
"schema_version": imported.schema_version,
"title": imported.title,
"intent": imported.intent,
},
)