generated from coulomb/repo-seed
71 lines
2.4 KiB
Python
71 lines
2.4 KiB
Python
from phase_memory.models import LifecycleState, MemoryNode
|
|
from phase_memory.service import (
|
|
HEALTH_REPORT_SCHEMA,
|
|
KONTEXTUAL_DELEGATION_SCHEMA,
|
|
SERVICE_CONTRACT_SCHEMA,
|
|
LocalServiceRunner,
|
|
RuntimeConfig,
|
|
assert_audit_sink_conformance,
|
|
assert_context_compiler_conformance,
|
|
assert_event_log_conformance,
|
|
assert_graph_store_conformance,
|
|
assert_policy_gateway_conformance,
|
|
default_conformance_adapters,
|
|
health_report,
|
|
kontextual_delegation_envelope,
|
|
service_contracts,
|
|
)
|
|
|
|
|
|
def test_service_contracts_list_runtime_operations() -> None:
|
|
contracts = service_contracts()
|
|
|
|
assert contracts["schema_version"] == SERVICE_CONTRACT_SCHEMA
|
|
assert "profile.plan" in contracts["operations"]
|
|
assert "health.check" in contracts["operations"]
|
|
|
|
|
|
def test_runtime_config_validation_and_health_report() -> None:
|
|
runner = LocalServiceRunner()
|
|
runner.runtime.graph_store.save_node(MemoryNode("node.stale", "episode", lifecycle=LifecycleState.STALE))
|
|
report = health_report(runner.runtime, config=RuntimeConfig.local_default())
|
|
|
|
assert report["schema_version"] == HEALTH_REPORT_SCHEMA
|
|
assert report["ok"] is True
|
|
assert report["store"]["stale_memory_count"] == 1
|
|
assert report["adapters"]["graph_store"] == "InMemoryMemoryGraphStore"
|
|
|
|
|
|
def test_service_runner_handles_health() -> None:
|
|
runner = LocalServiceRunner()
|
|
|
|
response = runner.handle("health.check")
|
|
|
|
assert response["schema_version"] == HEALTH_REPORT_SCHEMA
|
|
assert response["ok"] is True
|
|
|
|
|
|
def test_default_adapter_conformance_helpers() -> None:
|
|
adapters = default_conformance_adapters()
|
|
|
|
assert_graph_store_conformance(adapters["graph_store"])
|
|
assert_event_log_conformance(adapters["event_log"])
|
|
assert_context_compiler_conformance(adapters["context_compiler"])
|
|
assert_policy_gateway_conformance(adapters["policy_gateway"])
|
|
assert_audit_sink_conformance(adapters["audit_sink"])
|
|
|
|
|
|
def test_kontextual_delegation_envelope_is_explicit() -> None:
|
|
envelope = kontextual_delegation_envelope(
|
|
operation="persist_graph",
|
|
graph_id="graph.a",
|
|
profile_id="profile.a",
|
|
policy_decision={"allowed": True},
|
|
audit_ref="audit.a",
|
|
)
|
|
|
|
assert envelope["schema_version"] == KONTEXTUAL_DELEGATION_SCHEMA
|
|
assert "phase_policy" in envelope["phase_memory_owns"]
|
|
assert "durable_records" in envelope["kontextual_owns"]
|
|
assert envelope["imports"]["avoid_circular_imports"] is True
|