import json from datetime import datetime, timezone from pathlib import Path from phase_memory.runtime import PACKAGE_REQUEST_SCHEMA, RUNTIME_ENVELOPE_SCHEMA, PhaseMemoryRuntime FIXTURES = Path(__file__).parent / "fixtures" def _load(name: str): return json.loads((FIXTURES / name).read_text(encoding="utf-8")) def test_runtime_profile_plan_envelope_is_json_serializable() -> None: runtime = PhaseMemoryRuntime() envelope = runtime.plan_profile(_load("memory-profile.json"), source_ref="tests/fixtures/memory-profile.json") assert envelope["schema_version"] == RUNTIME_ENVELOPE_SCHEMA assert envelope["operation"] == "profile.plan" assert envelope["valid"] is True assert envelope["dry_run"] is True assert envelope["subject"] == {"kind": "memory_profile", "id": "phase-memory-fixture-profile"} assert envelope["policy_decision"]["allowed"] is True assert envelope["audit_receipt"]["recorded"] is True assert envelope["data"]["plan"]["ready"] is True assert "activation.plan" in envelope["data"]["plan"]["capabilities"] json.dumps(envelope, sort_keys=True) def test_runtime_lifecycle_plan_collects_dry_run_actions() -> None: runtime = PhaseMemoryRuntime() envelope = runtime.plan_lifecycle( _load("memory-graph.json"), source_ref="tests/fixtures/memory-graph.json", stale_after_days=7, delete_after_days=30, refresh_digests={"event.restart": "new"}, compact_node_ids=("event.restart", "risk.durable-write"), now=datetime(2026, 5, 18, tzinfo=timezone.utc), ) actions = envelope["data"]["dry_run_actions"] assert envelope["operation"] == "graph.lifecycle.plan" assert [action["action"] for action in actions] == ["mark_stale", "refresh", "compact"] assert all("physical_delete" not in action.get("metadata", {}) or action["metadata"]["physical_delete"] is False for action in actions) json.dumps(envelope, sort_keys=True) def test_runtime_activation_plan_includes_package_request() -> None: runtime = PhaseMemoryRuntime() envelope = runtime.plan_activation( _load("memory-graph.json"), source_ref="tests/fixtures/memory-graph.json", max_items=2, max_tokens=18, profile_id="phase-memory-fixture-profile", priority_node_ids=("decision.boundary",), ) activation = envelope["data"]["activation_plan"] package_request = envelope["data"]["package_request"] assert envelope["operation"] == "graph.activation.plan" assert activation["selected_node_ids"][0] == "decision.boundary" assert activation["selection"]["schema_version"] == "markitect.memory.selection.v1" assert package_request["schema_version"] == PACKAGE_REQUEST_SCHEMA assert package_request["dry_run"] is True assert package_request["selection"]["id"] == activation["plan_id"] def test_runtime_compile_package_wraps_compiler_response() -> None: runtime = PhaseMemoryRuntime() selection = {"schema_version": "markitect.memory.selection.v1", "id": "selection.a", "nodes": ["node.a"], "events": []} envelope = runtime.compile_package(selection) assert envelope["operation"] == "package.compile" assert envelope["data"]["package_request"]["selection"] == selection assert envelope["data"]["package_response"]["package_ref"] == "package:selection.a" assert envelope["data"]["package_response"]["response"]["package_id"] == "package:selection.a"