Files
phase-memory/tests/test_service_readiness.py

75 lines
2.6 KiB
Python

from phase_memory.models import LifecycleState, MemoryNode
from phase_memory.service import (
HEALTH_REPORT_SCHEMA,
KONTEXTUAL_DELEGATION_SCHEMA,
SERVICE_CONTRACT_SCHEMA,
LocalServiceRunner,
RuntimeConfig,
assert_audit_sink_conformance,
assert_context_compiler_conformance,
assert_event_log_conformance,
assert_graph_store_conformance,
assert_policy_gateway_conformance,
assert_runtime_registry_conformance,
assert_semantic_index_conformance,
default_conformance_adapters,
health_report,
kontextual_delegation_envelope,
service_contracts,
)
def test_service_contracts_list_runtime_operations() -> None:
contracts = service_contracts()
assert contracts["schema_version"] == SERVICE_CONTRACT_SCHEMA
assert "profile.plan" in contracts["operations"]
assert "health.check" in contracts["operations"]
def test_runtime_config_validation_and_health_report() -> None:
runner = LocalServiceRunner()
runner.runtime.graph_store.save_node(MemoryNode("node.stale", "episode", lifecycle=LifecycleState.STALE))
report = health_report(runner.runtime, config=RuntimeConfig.local_default())
assert report["schema_version"] == HEALTH_REPORT_SCHEMA
assert report["ok"] is True
assert report["store"]["stale_memory_count"] == 1
assert report["adapters"]["graph_store"] == "InMemoryMemoryGraphStore"
def test_service_runner_handles_health() -> None:
runner = LocalServiceRunner()
response = runner.handle("health.check")
assert response["schema_version"] == HEALTH_REPORT_SCHEMA
assert response["ok"] is True
def test_default_adapter_conformance_helpers() -> None:
adapters = default_conformance_adapters()
assert_graph_store_conformance(adapters["graph_store"])
assert_event_log_conformance(adapters["event_log"])
assert_context_compiler_conformance(adapters["context_compiler"])
assert_policy_gateway_conformance(adapters["policy_gateway"])
assert_audit_sink_conformance(adapters["audit_sink"])
assert_semantic_index_conformance(adapters["semantic_index"])
assert_runtime_registry_conformance(adapters["runtime_registry"])
def test_kontextual_delegation_envelope_is_explicit() -> None:
envelope = kontextual_delegation_envelope(
operation="persist_graph",
graph_id="graph.a",
profile_id="profile.a",
policy_decision={"allowed": True},
audit_ref="audit.a",
)
assert envelope["schema_version"] == KONTEXTUAL_DELEGATION_SCHEMA
assert "phase_policy" in envelope["phase_memory_owns"]
assert "durable_records" in envelope["kontextual_owns"]
assert envelope["imports"]["avoid_circular_imports"] is True