"""Memory graph/profile contracts and context package compilation.""" from __future__ import annotations from dataclasses import asdict, dataclass, field from datetime import datetime, timezone import hashlib import json from pathlib import Path from typing import Any import yaml from markitect_tool.diagnostics import Diagnostic, SourceLocation, has_error from markitect_tool.memory.engine import ( ContextBudget, ContextPackage, ContextPackageError, ContextPackageItem, MemoryNamespace, RetrievalRecipe, SourceSpan, ) MEMORY_GRAPH_SCHEMA_VERSION = "markitect.memory.graph.v1" MEMORY_PROFILE_SCHEMA_VERSION = "markitect.memory.profile.v1" MEMORY_SELECTION_SCHEMA_VERSION = "markitect.memory.selection.v1" MEMORY_NODE_KINDS = { "assumption", "alternative", "artifact", "activation", "capability", "claim", "concept", "constraint", "contract", "context_package", "decision", "document", "edit", "entity", "episode", "evidence", "finding", "follow_up", "interruption", "memory", "observation", "outcome", "plan", "policy", "preference", "profile", "question", "risk", "source_fact", "task", "topic", "tool_call", "turn", "validation", } MEMORY_EDGE_KINDS = { "activates", "affects", "belongs_to", "contradicts", "depends_on", "derived_from", "governs", "led_to", "mentions", "references", "relates_to", "supersedes", "supports", } MEMORY_EVENT_KINDS = { "activated", "branched", "compacted", "deactivated", "forgotten", "merged", "policy_decision", "recorded", "refreshed", "updated", } MEMORY_PROFILE_KINDS = { "conversation", "identity", "knowledge", "package", "reasoning", "source", "task", "tool", } @dataclass(frozen=True) class MemoryValidationResult: """Validation outcome for a memory graph/profile contract.""" subject_kind: str subject_id: str | None valid: bool diagnostics: list[Diagnostic] = field(default_factory=list) metadata: dict[str, Any] = field(default_factory=dict) def to_dict(self) -> dict[str, Any]: return _drop_empty( { "subject_kind": self.subject_kind, "subject_id": self.subject_id, "valid": self.valid, "diagnostics": [diagnostic.to_dict() for diagnostic in self.diagnostics], "metadata": self.metadata, } ) @dataclass(frozen=True) class MemoryGraphNode: """A memory graph node that can be compiled into context.""" id: str kind: str text: str = "" namespace: MemoryNamespace = field(default_factory=MemoryNamespace) source_spans: list[SourceSpan] = field(default_factory=list) provenance: list[dict[str, Any]] = field(default_factory=list) freshness: dict[str, Any] = field(default_factory=dict) confidence: float | None = None policy: dict[str, Any] = field(default_factory=dict) metadata: dict[str, Any] = field(default_factory=dict) @classmethod def from_mapping(cls, data: dict[str, Any]) -> "MemoryGraphNode": spans = data.get("source_spans") or data.get("spans") or data.get("source_span") or [] if isinstance(spans, dict): spans = [spans] return cls( id=str(data.get("id") or ""), kind=str(data.get("kind") or data.get("type") or ""), text=str(data.get("text") or data.get("content") or data.get("summary") or ""), namespace=MemoryNamespace.from_mapping(data.get("namespace") or {}), source_spans=[SourceSpan.from_mapping(span) for span in _mapping_list(spans)], provenance=_mapping_list(data.get("provenance")), freshness=dict(data.get("freshness") or {}), confidence=_optional_float(data.get("confidence")), policy=dict(data.get("policy") or {}), metadata=dict(data.get("metadata") or {}), ) def to_dict(self) -> dict[str, Any]: return _drop_empty( { "id": self.id, "kind": self.kind, "text": self.text, "namespace": self.namespace.to_dict(), "source_spans": [span.to_dict() for span in self.source_spans], "provenance": self.provenance, "freshness": self.freshness, "confidence": self.confidence, "policy": self.policy, "metadata": self.metadata, } ) def context_text(self) -> str: title = self.metadata.get("title") or self.metadata.get("label") if title and self.text: return f"{title}\n\n{self.text}" if self.text: return self.text if title: return str(title) return f"{self.kind}: {self.id}" @dataclass(frozen=True) class MemoryGraphEdge: """A directed relationship between memory graph nodes.""" id: str kind: str source: str target: str provenance: list[dict[str, Any]] = field(default_factory=list) freshness: dict[str, Any] = field(default_factory=dict) confidence: float | None = None policy: dict[str, Any] = field(default_factory=dict) metadata: dict[str, Any] = field(default_factory=dict) @classmethod def from_mapping(cls, data: dict[str, Any]) -> "MemoryGraphEdge": kind = str(data.get("kind") or data.get("type") or "") source = str(data.get("source") or data.get("from") or "") target = str(data.get("target") or data.get("to") or "") edge_id = str(data.get("id") or f"edge:{_short_hash([kind, source, target])}") return cls( id=edge_id, kind=kind, source=source, target=target, provenance=_mapping_list(data.get("provenance")), freshness=dict(data.get("freshness") or {}), confidence=_optional_float(data.get("confidence")), policy=dict(data.get("policy") or {}), metadata=dict(data.get("metadata") or {}), ) def to_dict(self) -> dict[str, Any]: return _drop_empty(asdict(self)) @dataclass(frozen=True) class MemoryEvent: """An append-only event envelope for memory runtime handoff.""" id: str kind: str timestamp: str namespace: MemoryNamespace = field(default_factory=MemoryNamespace) actor: str | None = None thread: str | None = None task: str | None = None node_updates: list[dict[str, Any]] = field(default_factory=list) edge_updates: list[dict[str, Any]] = field(default_factory=list) package_refs: list[str] = field(default_factory=list) activation_refs: list[str] = field(default_factory=list) branch: dict[str, Any] = field(default_factory=dict) policy: dict[str, Any] = field(default_factory=dict) metadata: dict[str, Any] = field(default_factory=dict) @classmethod def from_mapping(cls, data: dict[str, Any]) -> "MemoryEvent": kind = str(data.get("kind") or data.get("type") or "recorded") timestamp = str(data.get("timestamp") or data.get("at") or _now()) event_id = str(data.get("id") or f"event:{_short_hash([kind, timestamp, data.get('metadata')])}") return cls( id=event_id, kind=kind, timestamp=timestamp, namespace=MemoryNamespace.from_mapping(data.get("namespace") or {}), actor=_optional_str(data.get("actor")), thread=_optional_str(data.get("thread")), task=_optional_str(data.get("task")), node_updates=_mapping_list(data.get("node_updates") or data.get("nodes")), edge_updates=_mapping_list(data.get("edge_updates") or data.get("edges")), package_refs=_string_list(data.get("package_refs") or data.get("packages")), activation_refs=_string_list(data.get("activation_refs") or data.get("activations")), branch=dict(data.get("branch") or {}), policy=dict(data.get("policy") or {}), metadata=dict(data.get("metadata") or {}), ) def to_dict(self) -> dict[str, Any]: return _drop_empty( { "id": self.id, "kind": self.kind, "timestamp": self.timestamp, "namespace": self.namespace.to_dict(), "actor": self.actor, "thread": self.thread, "task": self.task, "node_updates": self.node_updates, "edge_updates": self.edge_updates, "package_refs": self.package_refs, "activation_refs": self.activation_refs, "branch": self.branch, "policy": self.policy, "metadata": self.metadata, } ) @dataclass(frozen=True) class MemoryGraph: """Serializable graph contract for agentic memory state.""" id: str title: str intent: str nodes: list[MemoryGraphNode] = field(default_factory=list) edges: list[MemoryGraphEdge] = field(default_factory=list) events: list[MemoryEvent] = field(default_factory=list) namespace: MemoryNamespace = field(default_factory=MemoryNamespace) schema_version: str = MEMORY_GRAPH_SCHEMA_VERSION metadata: dict[str, Any] = field(default_factory=dict) @classmethod def from_mapping(cls, data: dict[str, Any]) -> "MemoryGraph": nodes = _mapping_values(data.get("nodes")) edges = _mapping_values(data.get("edges")) events = _mapping_values(data.get("events")) return cls( id=str(data.get("id") or ""), title=str(data.get("title") or data.get("name") or ""), intent=str(data.get("intent") or data.get("description") or ""), nodes=[MemoryGraphNode.from_mapping(node) for node in nodes], edges=[MemoryGraphEdge.from_mapping(edge) for edge in edges], events=[MemoryEvent.from_mapping(event) for event in events], namespace=MemoryNamespace.from_mapping(data.get("namespace") or {}), schema_version=str(data.get("schema_version") or data.get("schema") or MEMORY_GRAPH_SCHEMA_VERSION), metadata=dict(data.get("metadata") or {}), ) def to_dict(self) -> dict[str, Any]: return _drop_empty( { "schema_version": self.schema_version, "id": self.id, "title": self.title, "intent": self.intent, "namespace": self.namespace.to_dict(), "nodes": [node.to_dict() for node in self.nodes], "edges": [edge.to_dict() for edge in self.edges], "events": [event.to_dict() for event in self.events], "metadata": self.metadata, } ) def node_index(self) -> dict[str, MemoryGraphNode]: return {node.id: node for node in self.nodes} def edge_index(self) -> dict[str, MemoryGraphEdge]: return {edge.id: edge for edge in self.edges} def event_index(self) -> dict[str, MemoryEvent]: return {event.id: event for event in self.events} @dataclass(frozen=True) class MemoryProfile: """Service-agnostic memory blueprint/profile contract.""" id: str title: str intent: str memory_kinds: list[str] = field(default_factory=list) stores: dict[str, Any] = field(default_factory=dict) limits: dict[str, Any] = field(default_factory=dict) latency: dict[str, Any] = field(default_factory=dict) retention: dict[str, Any] = field(default_factory=dict) refresh: dict[str, Any] = field(default_factory=dict) compaction: dict[str, Any] = field(default_factory=dict) activation: ContextBudget = field(default_factory=ContextBudget) policy: dict[str, Any] = field(default_factory=dict) observability: dict[str, Any] = field(default_factory=dict) failure: dict[str, Any] = field(default_factory=dict) schema_version: str = MEMORY_PROFILE_SCHEMA_VERSION metadata: dict[str, Any] = field(default_factory=dict) @classmethod def from_mapping(cls, data: dict[str, Any]) -> "MemoryProfile": profile_blocks = dict(data.get("profiles") or {}) memory_kinds = _string_list( data.get("memory_kinds") or data.get("enabled_memory_kinds") or list(profile_blocks.keys()) ) stores = dict(data.get("stores") or {}) limits = dict(data.get("limits") or {}) latency = dict(data.get("latency") or data.get("latency_targets") or {}) retention = dict(data.get("retention") or data.get("retention_policy") or {}) for kind, block in profile_blocks.items(): if not isinstance(block, dict): continue if block.get("store") is not None and kind not in stores: stores[kind] = block["store"] limit_values = {key: block[key] for key in ("max_nodes", "max_tokens", "max_items") if key in block} if limit_values and kind not in limits: limits[kind] = limit_values if block.get("latency") is not None and kind not in latency: latency[kind] = block["latency"] if block.get("retention") is not None and kind not in retention: retention[kind] = block["retention"] activation_data = ( data.get("activation") or data.get("context_budget") or (data.get("outputs") or {}).get("context_packages") or {} ) if isinstance(activation_data, dict) and "budget" in activation_data: activation_data = activation_data["budget"] return cls( id=str(data.get("id") or ""), title=str(data.get("title") or data.get("name") or ""), intent=str(data.get("intent") or data.get("description") or ""), memory_kinds=memory_kinds, stores=stores, limits=limits, latency=latency, retention=retention, refresh=dict(data.get("refresh") or {}), compaction=dict(data.get("compaction") or {}), activation=ContextBudget.from_mapping(activation_data if isinstance(activation_data, dict) else {}), policy=dict(data.get("policy") or {}), observability=dict(data.get("observability") or {}), failure=dict(data.get("failure") or {}), schema_version=str(data.get("schema_version") or data.get("schema") or MEMORY_PROFILE_SCHEMA_VERSION), metadata=dict(data.get("metadata") or {}), ) def to_dict(self) -> dict[str, Any]: return _drop_empty( { "schema_version": self.schema_version, "id": self.id, "title": self.title, "intent": self.intent, "memory_kinds": self.memory_kinds, "stores": self.stores, "limits": self.limits, "latency": self.latency, "retention": self.retention, "refresh": self.refresh, "compaction": self.compaction, "activation": self.activation.to_dict(), "policy": self.policy, "observability": self.observability, "failure": self.failure, "metadata": self.metadata, } ) @dataclass(frozen=True) class MemoryGraphSelection: """A packageable selection of a memory graph.""" graph: str | None = None profile: str | None = None title: str | None = None intent: str | None = None package_id: str | None = None namespace: MemoryNamespace = field(default_factory=MemoryNamespace) node_ids: list[str] = field(default_factory=list) edge_ids: list[str] = field(default_factory=list) event_ids: list[str] = field(default_factory=list) budget: ContextBudget = field(default_factory=ContextBudget) metadata: dict[str, Any] = field(default_factory=dict) schema_version: str = MEMORY_SELECTION_SCHEMA_VERSION @classmethod def from_mapping(cls, data: dict[str, Any]) -> "MemoryGraphSelection": return cls( graph=_optional_str(data.get("graph") or data.get("graph_path") or data.get("source")), profile=_optional_str(data.get("profile") or data.get("profile_path") or data.get("blueprint")), title=_optional_str(data.get("title")), intent=_optional_str(data.get("intent") or data.get("description")), package_id=_optional_str(data.get("package_id")), namespace=MemoryNamespace.from_mapping(data.get("namespace") or {}), node_ids=_string_list(data.get("node_ids") or data.get("nodes")), edge_ids=_string_list(data.get("edge_ids") or data.get("edges")), event_ids=_string_list(data.get("event_ids") or data.get("events")), budget=ContextBudget.from_mapping(data.get("budget") or data.get("context_budget") or {}), metadata=dict(data.get("metadata") or {}), schema_version=str(data.get("schema_version") or data.get("schema") or MEMORY_SELECTION_SCHEMA_VERSION), ) def to_dict(self) -> dict[str, Any]: return _drop_empty( { "schema_version": self.schema_version, "graph": self.graph, "profile": self.profile, "title": self.title, "intent": self.intent, "package_id": self.package_id, "namespace": self.namespace.to_dict(), "node_ids": self.node_ids, "edge_ids": self.edge_ids, "event_ids": self.event_ids, "budget": self.budget.to_dict(), "metadata": self.metadata, } ) def load_memory_graph_file(path: Path) -> MemoryGraph: """Load a memory graph YAML/JSON file.""" return MemoryGraph.from_mapping(_load_mapping_file(path, "Memory graph")) def load_memory_profile_file(path: Path) -> MemoryProfile: """Load a memory profile/blueprint YAML/JSON file.""" return MemoryProfile.from_mapping(_load_mapping_file(path, "Memory profile")) def load_memory_graph_selection_file(path: Path) -> MemoryGraphSelection: """Load a memory graph selection YAML/JSON file.""" return MemoryGraphSelection.from_mapping(_load_mapping_file(path, "Memory graph selection")) def validate_memory_graph(graph: MemoryGraph | dict[str, Any], path: Path | None = None) -> MemoryValidationResult: """Validate a memory graph against the Markitect graph vocabulary.""" loaded = graph if isinstance(graph, MemoryGraph) else MemoryGraph.from_mapping(graph) diagnostics: list[Diagnostic] = [] contract = SourceLocation(path=str(path)) if path else None if not loaded.id: diagnostics.append(_error("memory.graph.missing_id", "Memory graph must declare an id.", contract)) if loaded.schema_version != MEMORY_GRAPH_SCHEMA_VERSION: diagnostics.append( _warning( "memory.graph.schema_version", f"Expected schema_version `{MEMORY_GRAPH_SCHEMA_VERSION}`.", contract, {"actual": loaded.schema_version}, ) ) if not loaded.nodes: diagnostics.append(_warning("memory.graph.empty", "Memory graph has no nodes.", contract)) node_ids = [node.id for node in loaded.nodes] _add_duplicate_diagnostics(diagnostics, node_ids, "memory.graph.duplicate_node", "Duplicate node id", contract) for node in loaded.nodes: if not node.id: diagnostics.append(_error("memory.graph.node.missing_id", "Memory graph node must declare an id.", contract)) if node.kind not in MEMORY_NODE_KINDS: diagnostics.append( _error( "memory.graph.node.unknown_kind", f"Unknown memory node kind `{node.kind}`.", contract, {"node_id": node.id, "allowed": sorted(MEMORY_NODE_KINDS)}, ) ) known_nodes = {node.id for node in loaded.nodes if node.id} edge_ids = [edge.id for edge in loaded.edges] _add_duplicate_diagnostics(diagnostics, edge_ids, "memory.graph.duplicate_edge", "Duplicate edge id", contract) for edge in loaded.edges: if edge.kind not in MEMORY_EDGE_KINDS: diagnostics.append( _error( "memory.graph.edge.unknown_kind", f"Unknown memory edge kind `{edge.kind}`.", contract, {"edge_id": edge.id, "allowed": sorted(MEMORY_EDGE_KINDS)}, ) ) if edge.source not in known_nodes: diagnostics.append( _error( "memory.graph.edge.unknown_source", f"Edge `{edge.id}` references unknown source node `{edge.source}`.", contract, {"edge_id": edge.id}, ) ) if edge.target not in known_nodes: diagnostics.append( _error( "memory.graph.edge.unknown_target", f"Edge `{edge.id}` references unknown target node `{edge.target}`.", contract, {"edge_id": edge.id}, ) ) event_ids = [event.id for event in loaded.events] _add_duplicate_diagnostics(diagnostics, event_ids, "memory.graph.duplicate_event", "Duplicate event id", contract) for event in loaded.events: if event.kind not in MEMORY_EVENT_KINDS: diagnostics.append( _error( "memory.graph.event.unknown_kind", f"Unknown memory event kind `{event.kind}`.", contract, {"event_id": event.id, "allowed": sorted(MEMORY_EVENT_KINDS)}, ) ) return MemoryValidationResult( subject_kind="memory_graph", subject_id=loaded.id or None, valid=not has_error(diagnostics), diagnostics=diagnostics, metadata={ "nodes": len(loaded.nodes), "edges": len(loaded.edges), "events": len(loaded.events), }, ) def validate_memory_profile( profile: MemoryProfile | dict[str, Any], path: Path | None = None ) -> MemoryValidationResult: """Validate a memory profile/blueprint contract.""" loaded = profile if isinstance(profile, MemoryProfile) else MemoryProfile.from_mapping(profile) diagnostics: list[Diagnostic] = [] contract = SourceLocation(path=str(path)) if path else None if not loaded.id: diagnostics.append(_error("memory.profile.missing_id", "Memory profile must declare an id.", contract)) if loaded.schema_version != MEMORY_PROFILE_SCHEMA_VERSION: diagnostics.append( _warning( "memory.profile.schema_version", f"Expected schema_version `{MEMORY_PROFILE_SCHEMA_VERSION}`.", contract, {"actual": loaded.schema_version}, ) ) if not loaded.memory_kinds: diagnostics.append( _error("memory.profile.no_memory_kinds", "Memory profile must declare at least one memory kind.", contract) ) for kind in loaded.memory_kinds: if kind not in MEMORY_PROFILE_KINDS: diagnostics.append( _error( "memory.profile.unknown_kind", f"Unknown memory profile kind `{kind}`.", contract, {"allowed": sorted(MEMORY_PROFILE_KINDS)}, ) ) if kind not in loaded.stores: diagnostics.append( _warning( "memory.profile.store_missing", f"Memory kind `{kind}` has no store contract.", contract, {"memory_kind": kind}, ) ) if not _has_budget_limit(loaded.activation): diagnostics.append( _warning( "memory.profile.activation_budget_missing", "Profile has no context package activation budget.", contract, ) ) return MemoryValidationResult( subject_kind="memory_profile", subject_id=loaded.id or None, valid=not has_error(diagnostics), diagnostics=diagnostics, metadata={ "memory_kinds": loaded.memory_kinds, "stores": sorted(loaded.stores), }, ) def plan_memory_profile(profile: MemoryProfile) -> dict[str, Any]: """Explain how a memory profile maps to runtime responsibilities.""" kind_plans = [] for kind in profile.memory_kinds: kind_plans.append( _drop_empty( { "kind": kind, "store": profile.stores.get(kind), "limits": profile.limits.get(kind), "latency": profile.latency.get(kind), "retention": profile.retention.get(kind), } ) ) return _drop_empty( { "profile": profile.to_dict(), "memory_kinds": kind_plans, "activation": profile.activation.to_dict(), "runtime_boundary": { "markitect_tool": [ "validate profile and graph contracts", "compile selected graph nodes into context packages", "emit deterministic package/provenance metadata", ], "kontextual_engine": [ "execute stores, retrieval, refresh, compaction, and policy decisions", "emit memory events that conform to the envelope", ], "infospace_bench": [ "measure retrieval quality, latency, budget pressure, and regression behavior", ], "services_launched_by_markitect_tool": False, }, "handoff_contracts": [ MEMORY_PROFILE_SCHEMA_VERSION, MEMORY_GRAPH_SCHEMA_VERSION, MEMORY_SELECTION_SCHEMA_VERSION, ], } ) def compile_memory_graph_selection_to_context_package( graph: MemoryGraph, selection: MemoryGraphSelection | dict[str, Any] | None = None, profile: MemoryProfile | None = None, ) -> ContextPackage: """Compile a memory graph selection into a deterministic context package.""" selected = selection if selected is None: selected = MemoryGraphSelection() if isinstance(selected, dict): selected = MemoryGraphSelection.from_mapping(selected) graph_validation = validate_memory_graph(graph) if not graph_validation.valid: messages = "; ".join(diagnostic.message for diagnostic in graph_validation.diagnostics) raise ContextPackageError(f"Cannot compile invalid memory graph: {messages}") if profile: profile_validation = validate_memory_profile(profile) if not profile_validation.valid: messages = "; ".join(diagnostic.message for diagnostic in profile_validation.diagnostics) raise ContextPackageError(f"Cannot compile with invalid memory profile: {messages}") node_index = graph.node_index() edge_index = graph.edge_index() event_index = graph.event_index() node_ids = selected.node_ids or [node.id for node in graph.nodes] missing_nodes = [node_id for node_id in node_ids if node_id not in node_index] if missing_nodes: raise ContextPackageError(f"Memory graph selection references unknown node ids: {', '.join(missing_nodes)}") if selected.edge_ids: edge_ids = selected.edge_ids else: selected_node_ids = set(node_ids) edge_ids = [ edge.id for edge in graph.edges if edge.source in selected_node_ids and edge.target in selected_node_ids ] missing_edges = [edge_id for edge_id in edge_ids if edge_id not in edge_index] if missing_edges: raise ContextPackageError(f"Memory graph selection references unknown edge ids: {', '.join(missing_edges)}") missing_events = [event_id for event_id in selected.event_ids if event_id not in event_index] if missing_events: raise ContextPackageError(f"Memory graph selection references unknown event ids: {', '.join(missing_events)}") items = [_context_item_for_node(graph, node_index[node_id]) for node_id in node_ids] items.extend(_context_item_for_event(graph, event_index[event_id]) for event_id in selected.event_ids) namespace = selected.namespace if selected.namespace.to_dict() else graph.namespace metadata = { "memory_graph": { "schema_version": graph.schema_version, "graph_id": graph.id, "selection": selected.to_dict(), "selected_nodes": node_ids, "selected_edges": edge_ids, "selected_events": selected.event_ids, "edges": [edge_index[edge_id].to_dict() for edge_id in edge_ids], }, "memory_profile": profile.to_dict() if profile else None, } retrieval = RetrievalRecipe( kind="memory-graph-selection", query=",".join(node_ids), engine="memory-graph", sources=[selected.graph or f"memory://{graph.id}"], metadata={ "graph_id": graph.id, "profile_id": profile.id if profile else None, "selection_schema": selected.schema_version, }, ) return ContextPackage.create( title=selected.title or graph.title or "Memory graph package", intent=selected.intent or graph.intent or "Selected memory graph context.", namespace=namespace, items=items, retrieval_recipes=[retrieval], budget=selected.budget if _has_budget_limit(selected.budget) else profile.activation if profile else None, package_id=selected.package_id, freshness={"compiled_at": _now(), "source": "memory-graph"}, provenance=[ { "kind": "memory-graph-selection", "graph_id": graph.id, "profile_id": profile.id if profile else None, } ], metadata=metadata, ) def _context_item_for_node(graph: MemoryGraph, node: MemoryGraphNode) -> ContextPackageItem: source = node.source_spans[0] if node.source_spans else _synthetic_span(graph.id, "nodes", node.id, node.kind) provenance = [ { "kind": "memory-graph-node", "graph_id": graph.id, "node_id": node.id, "node_kind": node.kind, } ] + node.provenance metadata = { "memory_graph": { "graph_id": graph.id, "node_id": node.id, "node_kind": node.kind, "freshness": node.freshness, "confidence": node.confidence, }, **node.metadata, } return ContextPackageItem.create( source=source, text=node.context_text(), summary=_optional_str(node.metadata.get("summary")), policy=node.policy, provenance=provenance, metadata=metadata, ) def _context_item_for_event(graph: MemoryGraph, event: MemoryEvent) -> ContextPackageItem: source = _synthetic_span(graph.id, "events", event.id, event.kind) text = json.dumps(event.to_dict(), indent=2, ensure_ascii=False) return ContextPackageItem.create( source=source, text=text, summary=f"{event.kind} event {event.id}", policy=event.policy, provenance=[ { "kind": "memory-graph-event", "graph_id": graph.id, "event_id": event.id, "event_kind": event.kind, } ], metadata={"memory_graph": {"graph_id": graph.id, "event_id": event.id, "event_kind": event.kind}}, ) def _synthetic_span(graph_id: str, collection: str, item_id: str, item_kind: str) -> SourceSpan: return SourceSpan( path=f"memory://{graph_id}/{collection}/{item_id}", unit_kind=item_kind, selector=f"{collection}[id={item_id}]", engine="memory-graph", ) def _load_mapping_file(path: Path, label: str) -> dict[str, Any]: try: data = yaml.safe_load(path.read_text(encoding="utf-8")) or {} except OSError as exc: raise ContextPackageError(str(exc)) from exc if not isinstance(data, dict): raise ContextPackageError(f"{label} file `{path}` must contain a mapping.") return data def _mapping_values(value: Any) -> list[dict[str, Any]]: if value is None: return [] if isinstance(value, dict): items = [] for key, raw in value.items(): if not isinstance(raw, dict): continue item = dict(raw) item.setdefault("id", str(key)) items.append(item) return items return _mapping_list(value) def _mapping_list(value: Any) -> list[dict[str, Any]]: if value is None: return [] if isinstance(value, dict): return [dict(value)] if isinstance(value, list): return [dict(item) for item in value if isinstance(item, dict)] return [] def _string_list(value: Any) -> list[str]: if value is None: return [] if isinstance(value, list): return [str(item) for item in value if item is not None] if isinstance(value, tuple): return [str(item) for item in value if item is not None] return [str(value)] def _optional_str(value: Any) -> str | None: if value is None: return None text = str(value) return text if text else None def _optional_float(value: Any) -> float | None: if value is None or value == "": return None return float(value) def _short_hash(value: Any) -> str: return hashlib.sha256( json.dumps(value, sort_keys=True, ensure_ascii=False, default=str).encode("utf-8") ).hexdigest()[:16] def _has_budget_limit(budget: ContextBudget) -> bool: return budget.max_tokens is not None or budget.max_items is not None or budget.reserve_tokens > 0 def _now() -> str: return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") def _error( code: str, message: str, contract: SourceLocation | None, details: dict[str, Any] | None = None, ) -> Diagnostic: return Diagnostic(severity="error", code=code, message=message, contract=contract, details=details or {}) def _warning( code: str, message: str, contract: SourceLocation | None, details: dict[str, Any] | None = None, ) -> Diagnostic: return Diagnostic(severity="warning", code=code, message=message, contract=contract, details=details or {}) def _add_duplicate_diagnostics( diagnostics: list[Diagnostic], values: list[str], code: str, message: str, contract: SourceLocation | None, ) -> None: seen: set[str] = set() duplicates: set[str] = set() for value in values: if not value: continue if value in seen: duplicates.add(value) seen.add(value) for duplicate in sorted(duplicates): diagnostics.append(_error(code, f"{message} `{duplicate}`.", contract, {"id": duplicate})) def _drop_empty(data: dict[str, Any]) -> dict[str, Any]: return { key: value for key, value in data.items() if value not in (None, [], {}, "") }