Files
phase-memory/tests/test_runtime.py

84 lines
3.4 KiB
Python

import json
from datetime import datetime, timezone
from pathlib import Path
from phase_memory.runtime import PACKAGE_REQUEST_SCHEMA, RUNTIME_ENVELOPE_SCHEMA, PhaseMemoryRuntime
FIXTURES = Path(__file__).parent / "fixtures"
def _load(name: str):
return json.loads((FIXTURES / name).read_text(encoding="utf-8"))
def test_runtime_profile_plan_envelope_is_json_serializable() -> None:
runtime = PhaseMemoryRuntime()
envelope = runtime.plan_profile(_load("memory-profile.json"), source_ref="tests/fixtures/memory-profile.json")
assert envelope["schema_version"] == RUNTIME_ENVELOPE_SCHEMA
assert envelope["operation"] == "profile.plan"
assert envelope["valid"] is True
assert envelope["dry_run"] is True
assert envelope["subject"] == {"kind": "memory_profile", "id": "phase-memory-fixture-profile"}
assert envelope["policy_decision"]["allowed"] is True
assert envelope["audit_receipt"]["recorded"] is True
assert envelope["data"]["plan"]["ready"] is True
assert "activation.plan" in envelope["data"]["plan"]["capabilities"]
json.dumps(envelope, sort_keys=True)
def test_runtime_lifecycle_plan_collects_dry_run_actions() -> None:
runtime = PhaseMemoryRuntime()
envelope = runtime.plan_lifecycle(
_load("memory-graph.json"),
source_ref="tests/fixtures/memory-graph.json",
stale_after_days=7,
delete_after_days=30,
refresh_digests={"event.restart": "new"},
compact_node_ids=("event.restart", "risk.durable-write"),
now=datetime(2026, 5, 18, tzinfo=timezone.utc),
)
actions = envelope["data"]["dry_run_actions"]
assert envelope["operation"] == "graph.lifecycle.plan"
assert [action["action"] for action in actions] == ["mark_stale", "refresh", "compact"]
assert all("physical_delete" not in action.get("metadata", {}) or action["metadata"]["physical_delete"] is False for action in actions)
json.dumps(envelope, sort_keys=True)
def test_runtime_activation_plan_includes_package_request() -> None:
runtime = PhaseMemoryRuntime()
envelope = runtime.plan_activation(
_load("memory-graph.json"),
source_ref="tests/fixtures/memory-graph.json",
max_items=2,
max_tokens=18,
profile_id="phase-memory-fixture-profile",
priority_node_ids=("decision.boundary",),
)
activation = envelope["data"]["activation_plan"]
package_request = envelope["data"]["package_request"]
assert envelope["operation"] == "graph.activation.plan"
assert activation["selected_node_ids"][0] == "decision.boundary"
assert activation["selection"]["schema_version"] == "markitect.memory.selection.v1"
assert package_request["schema_version"] == PACKAGE_REQUEST_SCHEMA
assert package_request["dry_run"] is True
assert package_request["selection"]["id"] == activation["plan_id"]
def test_runtime_compile_package_wraps_compiler_response() -> None:
runtime = PhaseMemoryRuntime()
selection = {"schema_version": "markitect.memory.selection.v1", "id": "selection.a", "nodes": ["node.a"], "events": []}
envelope = runtime.compile_package(selection)
assert envelope["operation"] == "package.compile"
assert envelope["data"]["package_request"]["selection"] == selection
assert envelope["data"]["package_response"]["package_ref"] == "package:selection.a"
assert envelope["data"]["package_response"]["response"]["package_id"] == "package:selection.a"