"""Local runtime facade for deterministic phase-memory operations.""" from __future__ import annotations from dataclasses import dataclass, field from dataclasses import replace from datetime import datetime from typing import Any, Iterable from .activation import activation_action, plan_activation from .adapters import ( AllowAllPolicyGateway, InMemoryMemoryEventLog, InMemoryMemoryGraphStore, NoopContextPackageCompiler, RecordingAuditSink, ) from .bridge import MARKITECT_PACKAGE_REQUEST_SCHEMA, package_request_from_selection, package_response_envelope from .contracts import ContractIngressResult, graph_from_markitect, profile_from_markitect from .lifecycle import LifecycleRuleConfig, plan_compaction, plan_lifecycle_from_profile, plan_refresh, plan_retention from .models import ( Diagnostic, LifecycleAction, LifecycleActionKind, LifecycleState, MemoryGraph, MemoryNode, MemoryPhase, PolicyDecision, ProfileIntent, ReviewRecord, ) from .planner import plan_profile_execution from .policy import audit_event, evaluate_activation_node, make_review_record, policy_denial_diagnostic, redacted_node from .ports import AuditSink, ContextPackageCompiler, MemoryEventLog, MemoryGraphStore, PolicyGateway from .utils import compact_dict, stable_digest, to_plain RUNTIME_ENVELOPE_SCHEMA = "phase_memory.runtime.envelope.v1" PACKAGE_REQUEST_SCHEMA = MARKITECT_PACKAGE_REQUEST_SCHEMA @dataclass class PhaseMemoryRuntime: """Dependency-light local runtime facade. The facade coordinates existing pure planners and local adapters while keeping every public operation dry-run and JSON-serializable by default. """ graph_store: MemoryGraphStore = field(default_factory=InMemoryMemoryGraphStore) event_log: MemoryEventLog = field(default_factory=InMemoryMemoryEventLog) package_compiler: ContextPackageCompiler = field(default_factory=NoopContextPackageCompiler) policy_gateway: PolicyGateway = field(default_factory=AllowAllPolicyGateway) audit_sink: AuditSink = field(default_factory=RecordingAuditSink) def import_profile(self, data: dict[str, Any], *, source_ref: str = "mapping") -> dict[str, Any]: result = profile_from_markitect(data) if result.valid: self.graph_store.save_profile(result.value) return self._contract_envelope("profile.import", result, source_ref=source_ref) def import_graph(self, data: dict[str, Any], *, source_ref: str = "mapping") -> dict[str, Any]: result = graph_from_markitect(data) if result.valid: graph: MemoryGraph = result.value for node in graph.nodes: self.graph_store.save_node(node) for edge in graph.edges: self.graph_store.save_edge(edge) for event in graph.events: self.event_log.append(event) return self._contract_envelope("graph.import", result, source_ref=source_ref) def plan_profile( self, data: dict[str, Any], *, source_ref: str = "mapping", available_adapters: Iterable[str] | None = None, ) -> dict[str, Any]: result = profile_from_markitect(data) profile: ProfileIntent | None = result.value if result.valid else None plan = plan_profile_execution(profile, available_adapters=available_adapters) if profile else None diagnostics = list(result.diagnostics) if plan is not None: diagnostics.extend(plan.diagnostics) return self._envelope( "profile.plan", subject_kind="memory_profile", subject_id=result.subject_id, valid=result.valid and plan is not None, diagnostics=diagnostics, source_ref=source_ref, data={ "profile": result.value.to_dict() if hasattr(result.value, "to_dict") else result.value, "plan": plan.to_dict() if plan else None, }, ) def plan_lifecycle( self, data: dict[str, Any], *, source_ref: str = "mapping", stale_after_days: int | None = None, delete_after_days: int | None = None, refresh_digests: dict[str, str] | None = None, compact_node_ids: tuple[str, ...] = (), now: datetime | None = None, ) -> dict[str, Any]: result = graph_from_markitect(data) graph: MemoryGraph | None = result.value if result.valid else None actions: list[LifecycleAction] = [] if graph is not None: actions.extend( plan_retention( graph.nodes, stale_after_days=stale_after_days, delete_after_days=delete_after_days, now=now, ) ) if refresh_digests: actions.extend(plan_refresh(graph.nodes, source_digest_by_node_id=refresh_digests)) if compact_node_ids: by_id = graph.node_by_id() compact_nodes = [by_id[node_id] for node_id in compact_node_ids if node_id in by_id] if compact_nodes: actions.append(plan_compaction(compact_nodes)) return self._envelope( "graph.lifecycle.plan", subject_kind="memory_graph", subject_id=result.subject_id, valid=result.valid, diagnostics=result.diagnostics, source_ref=source_ref, data={ "graph_id": result.subject_id, "dry_run_actions": [action.to_dict() for action in actions], "parameters": compact_dict( { "stale_after_days": stale_after_days, "delete_after_days": delete_after_days, "refresh_digests": refresh_digests or {}, "compact_node_ids": list(compact_node_ids), } ), }, ) def plan_lifecycle_with_profile( self, profile_data: dict[str, Any], graph_data: dict[str, Any], *, source_ref: str = "mapping", profile_source_ref: str = "profile", refresh_digests: dict[str, str] | None = None, compact_node_ids: tuple[str, ...] = (), now: datetime | None = None, ) -> dict[str, Any]: profile_result = profile_from_markitect(profile_data) graph_result = graph_from_markitect(graph_data) diagnostics = list(profile_result.diagnostics) + list(graph_result.diagnostics) actions: tuple[LifecycleAction, ...] = () rule_config = None if profile_result.valid and graph_result.valid: profile: ProfileIntent = profile_result.value graph: MemoryGraph = graph_result.value rule_config = LifecycleRuleConfig.from_profile(profile) actions = plan_lifecycle_from_profile( graph, profile, refresh_digests=refresh_digests or {}, compact_node_ids=compact_node_ids, now=now, ) return self._envelope( "graph.lifecycle.plan", subject_kind="memory_graph", subject_id=graph_result.subject_id, valid=profile_result.valid and graph_result.valid, diagnostics=diagnostics, source_ref=source_ref, data={ "graph_id": graph_result.subject_id, "profile_id": profile_result.subject_id, "dry_run_actions": [action.to_dict() for action in actions], "rule_config": rule_config.to_dict() if rule_config else None, "parameters": compact_dict( { "profile_source_ref": profile_source_ref, "refresh_digests": refresh_digests or {}, "compact_node_ids": list(compact_node_ids), } ), }, ) def plan_activation( self, data: dict[str, Any], *, source_ref: str = "mapping", max_items: int, max_tokens: int, profile_id: str | None = None, priority_node_ids: tuple[str, ...] = (), include_events: bool = True, policy_context: dict[str, Any] | None = None, ) -> dict[str, Any]: result = graph_from_markitect(data) graph: MemoryGraph | None = result.value if result.valid else None policy_denials: list[dict[str, Any]] = [] diagnostics = list(result.diagnostics) if graph is not None and policy_context: graph, policy_denials, policy_diagnostics = _policy_filtered_graph(graph, policy_context) diagnostics.extend(policy_diagnostics) plan = ( plan_activation( graph, max_items=max_items, max_tokens=max_tokens, profile_id=profile_id, priority_node_ids=priority_node_ids, include_events=include_events, ) if graph else None ) if plan is not None: diagnostics.extend(plan.diagnostics) return self._envelope( "graph.activation.plan", subject_kind="memory_graph", subject_id=result.subject_id, valid=result.valid and plan is not None, diagnostics=diagnostics, source_ref=source_ref, data={ "activation_plan": plan.to_dict() if plan else None, "activation_action": activation_action(plan).to_dict() if plan else None, "package_request": self.package_request(plan.selection) if plan else None, "policy_denials": policy_denials, }, ) def compile_package(self, selection: dict[str, Any], *, source_ref: str = "selection") -> dict[str, Any]: request = self.package_request(selection) response = self.package_compiler.compile_selection(selection) return self._envelope( "package.compile", subject_kind="memory_selection", subject_id=str(selection.get("id") or ""), valid=True, diagnostics=(), source_ref=source_ref, data={"package_request": request, "package_response": package_response_envelope(response, request_id=request["id"])}, ) def export_graph(self, *, graph_id: str = "local", source_ref: str = "local-store") -> dict[str, Any]: events = self.event_log.list_events() if hasattr(self.graph_store, "export_graph"): graph = self.graph_store.export_graph(graph_id=graph_id, events=events) else: graph = MemoryGraph(graph_id=graph_id, nodes=tuple(self.graph_store.list_nodes()), events=tuple(events)) return self._envelope( "graph.export", subject_kind="memory_graph", subject_id=graph.graph_id, valid=True, diagnostics=(), source_ref=source_ref, data={"graph": graph.to_dict()}, ) def repair_diagnostics(self, *, source_ref: str = "local-store") -> dict[str, Any]: event_diagnostics = self.event_log.diagnostics() if hasattr(self.event_log, "diagnostics") else () events = self.event_log.list_events() store_diagnostics = ( self.graph_store.repair_diagnostics(events=events) if hasattr(self.graph_store, "repair_diagnostics") else () ) diagnostics = tuple(event_diagnostics) + tuple(store_diagnostics) return self._envelope( "store.repair.diagnostics", subject_kind="local_store", subject_id=source_ref, valid=not any(diagnostic.severity == "error" for diagnostic in diagnostics), diagnostics=diagnostics, source_ref=source_ref, data={"diagnostic_count": len(diagnostics)}, ) def apply_lifecycle_actions( self, actions: Iterable[LifecycleAction | dict[str, Any]], *, approval_marker: str = "", review_record: ReviewRecord | dict[str, Any] | None = None, source_ref: str = "lifecycle-actions", ) -> dict[str, Any]: applied: list[dict[str, Any]] = [] denied: list[dict[str, Any]] = [] diagnostics: list[Diagnostic] = [] for raw_action in actions: action = _coerce_action(raw_action) review = _coerce_review(review_record, action=action, approval_marker=approval_marker) if action.requires_review and ( review is None or not review.approved or review.reviewed_action_id != _action_review_id(action) ): denied.append( { "target_id": action.target_id, "action": action.action.value, "reason": "review_required", } ) diagnostics.append( Diagnostic( "warn", "review_required", "Lifecycle action requires an approval marker before apply.", action.target_id, {"action": action.action.value}, ) ) continue applied.append(self._apply_action(action, approval_marker=approval_marker or (review.review_id if review else ""))) return self._envelope( "lifecycle.apply", subject_kind="lifecycle_actions", subject_id=f"batch:{stable_digest([applied, denied])}", valid=not denied, diagnostics=diagnostics, source_ref=source_ref, dry_run=False, data={ "applied": applied, "denied": denied, "approval_marker": approval_marker, "review_record": review_record.to_dict() if isinstance(review_record, ReviewRecord) else review_record, }, ) def package_request(self, selection: dict[str, Any]) -> dict[str, Any]: return package_request_from_selection(selection, compiler=self.package_compiler.__class__.__name__) def _contract_envelope( self, operation: str, result: ContractIngressResult, *, source_ref: str, ) -> dict[str, Any]: return self._envelope( operation, subject_kind=result.subject_kind, subject_id=result.subject_id, valid=result.valid, diagnostics=result.diagnostics, source_ref=source_ref, data={"value": result.value.to_dict() if hasattr(result.value, "to_dict") else result.value}, ) def _envelope( self, operation: str, *, subject_kind: str, subject_id: str, valid: bool, diagnostics: Iterable[Diagnostic], source_ref: str, data: dict[str, Any], dry_run: bool = True, ) -> dict[str, Any]: policy = self.policy_gateway.authorize( action=operation, resource=f"{subject_kind}:{subject_id}", context={"source_ref": source_ref, "dry_run": dry_run}, ) operation_id = f"op:{stable_digest([operation, subject_kind, subject_id, source_ref, data])}" audit = self.audit_sink.record( audit_event( operation_id=operation_id, operation=operation, subject={"kind": subject_kind, "id": subject_id}, policy_decision=policy, dry_run=dry_run, source_ref=source_ref, ) ) return { "schema_version": RUNTIME_ENVELOPE_SCHEMA, "operation_id": operation_id, "operation": operation, "dry_run": dry_run, "valid": valid and policy.allowed, "subject": {"kind": subject_kind, "id": subject_id}, "source": {"ref": source_ref}, "policy_decision": _policy_to_dict(policy), "audit_receipt": audit, "diagnostics": [diagnostic.to_dict() for diagnostic in diagnostics], "data": data, } def _apply_action(self, action: LifecycleAction, *, approval_marker: str) -> dict[str, Any]: if action.action == LifecycleActionKind.COMPACT: summary = MemoryNode( action.target_id, "summary", str(action.metadata.get("summary_text") or "Compacted memory summary."), phase=MemoryPhase.STABILIZED, metadata={ "source_node_ids": list(action.metadata.get("source_node_ids", ())), "approval_marker": approval_marker, }, ) self.graph_store.save_node(summary) return {"target_id": action.target_id, "action": action.action.value, "applied": True} if action.to_state is not None: try: node = self.graph_store.get_node(action.target_id) except (KeyError, FileNotFoundError): return {"target_id": action.target_id, "action": action.action.value, "applied": False, "reason": "missing_node"} self.graph_store.save_node( replace( node, lifecycle=action.to_state, metadata={**dict(node.metadata), "last_lifecycle_action": action.action.value, "approval_marker": approval_marker}, ) ) return {"target_id": action.target_id, "action": action.action.value, "applied": True} return {"target_id": action.target_id, "action": action.action.value, "applied": False, "reason": "no_state_change"} def _policy_to_dict(decision: PolicyDecision) -> dict[str, Any]: return decision.to_dict() if hasattr(decision, "to_dict") else to_plain(decision) def _coerce_action(data: LifecycleAction | dict[str, Any]) -> LifecycleAction: if isinstance(data, LifecycleAction): return data return LifecycleAction( action=LifecycleActionKind(str(data["action"])), target_id=str(data["target_id"]), from_state=LifecycleState(str(data["from_state"])) if data.get("from_state") else None, to_state=LifecycleState(str(data["to_state"])) if data.get("to_state") else None, reason=str(data.get("reason") or ""), requires_review=bool(data.get("requires_review")), metadata=dict(data.get("metadata") or {}), ) def _action_review_id(action: LifecycleAction) -> str: return f"action:{stable_digest(action.to_dict())}" def _coerce_review( data: ReviewRecord | dict[str, Any] | None, *, action: LifecycleAction, approval_marker: str, ) -> ReviewRecord | None: if isinstance(data, ReviewRecord): return data if isinstance(data, dict): return ReviewRecord.from_mapping(data) if approval_marker: return make_review_record( reviewed_action_id=_action_review_id(action), reviewer="local", approved=True, reason=approval_marker, ) return None def _policy_filtered_graph( graph: MemoryGraph, policy_context: dict[str, Any], ) -> tuple[MemoryGraph, list[dict[str, Any]], list[Diagnostic]]: allowed_nodes = [] denials: list[dict[str, Any]] = [] diagnostics: list[Diagnostic] = [] for node in graph.nodes: allowed, reasons = evaluate_activation_node( node, required_labels=tuple(policy_context.get("required_labels", ())), denied_labels=tuple(policy_context.get("denied_labels", ())), trust_zone=str(policy_context.get("trust_zone") or ""), secrets_allowed=bool(policy_context.get("secrets_allowed", True)), approved_reauthorizations=tuple(policy_context.get("approved_reauthorizations", ())), require_fresh=bool(policy_context.get("require_fresh", False)), ) if allowed: allowed_nodes.append(node) continue denials.append(redacted_node(node, reasons=reasons)) diagnostics.append(policy_denial_diagnostic(node, reasons)) allowed_ids = {node.node_id for node in allowed_nodes} filtered_edges = tuple(edge for edge in graph.edges if edge.source in allowed_ids and edge.target in allowed_ids) return replace(graph, nodes=tuple(allowed_nodes), edges=filtered_edges), denials, diagnostics