import json from pathlib import Path from phase_memory.adapters import FileBackedMemoryGraphStore, JsonlAuditSink, JsonlMemoryEventLog, LOCAL_STORE_SCHEMA from phase_memory.lifecycle import plan_compaction, plan_retention from phase_memory.models import LifecycleAction, LifecycleActionKind, LifecycleState, MemoryEdge, MemoryEvent, MemoryNode from phase_memory.paths import abandon_path, branch_path, create_path, merge_path, path_event from phase_memory.runtime import PhaseMemoryRuntime FIXTURES = Path(__file__).parent / "fixtures" def _load(name: str): return json.loads((FIXTURES / name).read_text(encoding="utf-8")) def test_file_backed_store_round_trips_and_exports_graph(tmp_path) -> None: store = FileBackedMemoryGraphStore(tmp_path) event_log = JsonlMemoryEventLog(tmp_path / "events.jsonl") runtime = PhaseMemoryRuntime(graph_store=store, event_log=event_log, audit_sink=JsonlAuditSink(tmp_path / "audit.jsonl")) runtime.import_profile(_load("memory-profile.json"), source_ref="profile") runtime.import_graph(_load("memory-graph.json"), source_ref="graph") assert store.get_profile("phase-memory-fixture-profile").profile_id == "phase-memory-fixture-profile" assert store.get_node("decision.boundary").kind == "decision" assert store.list_edges(source="decision.boundary")[0].target == "artifact.profile" exported = runtime.export_graph(graph_id="exported")["data"]["graph"] assert exported["schema_version"] == "markitect.memory.graph.v1" assert exported["id"] == "exported" assert len(exported["nodes"]) == 4 assert len(exported["edges"]) == 2 assert len(exported["events"]) == 1 def test_jsonl_event_log_detects_duplicates_and_corruption(tmp_path) -> None: log = JsonlMemoryEventLog(tmp_path / "events.jsonl") event = MemoryEvent("event.a", "recorded", timestamp="2026-05-18T00:00:00+00:00") log.append(event) try: log.append(event) except ValueError as exc: assert "Duplicate memory event id" in str(exc) else: raise AssertionError("duplicate event id should fail") with (tmp_path / "events.jsonl").open("a", encoding="utf-8") as handle: handle.write("{not-json}\n") handle.write(json.dumps({"schema_version": "unknown.event.v9", "id": "event.b", "kind": "recorded"}) + "\n") diagnostics = log.diagnostics() assert [diagnostic.code for diagnostic in diagnostics] == ["malformed_event_log_line", "unknown_event_schema"] assert log.list_events(kind="recorded")[0].event_id == "event.a" def test_memory_paths_model_branch_merge_and_abandon_as_events(tmp_path) -> None: store = FileBackedMemoryGraphStore(tmp_path) root = create_path("path.root", event_ids=("event.root",)) branch = branch_path(root, "path.branch", event_ids=("event.branch",)) merged = merge_path(branch, "path.root") abandoned = abandon_path(branch, "superseded by main path") store.save_path(root) store.save_path(merged) assert store.get_path("path.branch").merged_into == "path.root" assert path_event(merged, "path.merged").metadata["path_state"] == "merged" assert abandoned.abandoned_reason == "superseded by main path" def test_repair_diagnostics_report_missing_edges_and_orphaned_path_events(tmp_path) -> None: store = FileBackedMemoryGraphStore(tmp_path) log = JsonlMemoryEventLog(tmp_path / "events.jsonl") runtime = PhaseMemoryRuntime(graph_store=store, event_log=log) store.save_node(MemoryNode("node.a", "decision")) store.save_edge(MemoryEdge("edge.bad", "depends_on", "node.a", "missing")) store.save_path(create_path("path.root", event_ids=("missing.event",))) envelope = runtime.repair_diagnostics(source_ref=str(tmp_path)) assert envelope["valid"] is False assert [diagnostic["code"] for diagnostic in envelope["diagnostics"]] == ["missing_edge_target", "orphaned_path_event"] def test_file_backed_store_reports_migration_needs_and_uses_atomic_json_writes(tmp_path) -> None: store = FileBackedMemoryGraphStore(tmp_path) metadata_path = tmp_path / "phase-memory.json" metadata_path.write_text( json.dumps( { "schema_version": "phase_memory.local_store.v0", "planned_migrations": ["v0-to-v1"], } ), encoding="utf-8", ) store.save_node(MemoryNode("node.atomic", "decision", "Atomic write target")) runtime = PhaseMemoryRuntime(graph_store=store, event_log=JsonlMemoryEventLog(tmp_path / "events.jsonl")) envelope = runtime.repair_diagnostics(source_ref=str(tmp_path)) codes = [diagnostic["code"] for diagnostic in envelope["diagnostics"]] assert envelope["valid"] is True assert "store_migration_required" in codes assert "planned_store_migrations" in codes assert not list(tmp_path.rglob("*.tmp")) def test_local_store_migration_apply_updates_metadata_and_audits(tmp_path) -> None: store = FileBackedMemoryGraphStore(tmp_path) runtime = PhaseMemoryRuntime(graph_store=store, event_log=JsonlMemoryEventLog(tmp_path / "events.jsonl")) (tmp_path / "phase-memory.json").write_text( json.dumps( { "schema_version": "phase_memory.local_store.v0", "planned_migrations": ["v0-to-v1"], } ), encoding="utf-8", ) planned = runtime.plan_store_migration(source_ref=str(tmp_path)) applied = runtime.apply_store_migration(planned["data"]["migration_plan"], actor="pytest", source_ref=str(tmp_path)) metadata = store.metadata() audit = runtime.query_audit({"operation": "store.migration.apply", "dry_run": False}) assert planned["valid"] is True assert [action["action"] for action in planned["data"]["migration_plan"]["actions"]] == [ "set_schema_version", "complete_planned_migration", ] assert applied["valid"] is True assert applied["data"]["migration_result"]["changed"] is True assert metadata["schema_version"] == LOCAL_STORE_SCHEMA assert metadata["completed_migrations"] == ["v0-to-v1"] assert metadata["last_migration"]["actor"] == "pytest" assert audit["count"] == 1 def test_local_store_migration_noop_and_corrupt_metadata_paths(tmp_path) -> None: store = FileBackedMemoryGraphStore(tmp_path) runtime = PhaseMemoryRuntime(graph_store=store, event_log=JsonlMemoryEventLog(tmp_path / "events.jsonl")) noop = runtime.apply_store_migration(source_ref=str(tmp_path)) assert noop["valid"] is True assert noop["data"]["migration_result"]["changed"] is False (tmp_path / "phase-memory.json").write_text("{not-json}\n", encoding="utf-8") planned = runtime.plan_store_migration(source_ref=str(tmp_path)) failed = runtime.apply_store_migration(source_ref=str(tmp_path)) assert planned["valid"] is False assert planned["diagnostics"][0]["code"] == "corrupt_store_metadata" assert failed["valid"] is False assert failed["data"]["migration_result"]["applied"] is False def test_repair_diagnostics_distinguish_corrupt_store_records(tmp_path) -> None: store = FileBackedMemoryGraphStore(tmp_path) runtime = PhaseMemoryRuntime(graph_store=store, event_log=JsonlMemoryEventLog(tmp_path / "events.jsonl")) (tmp_path / "nodes" / "broken.json").write_text("{not-json}\n", encoding="utf-8") envelope = runtime.repair_diagnostics(source_ref=str(tmp_path)) assert envelope["valid"] is False assert envelope["diagnostics"][0]["code"] == "corrupt_store_record" assert envelope["diagnostics"][0]["metadata"]["record_type"] == "node" def test_lifecycle_apply_requires_approval_for_reviewable_actions(tmp_path) -> None: store = FileBackedMemoryGraphStore(tmp_path) runtime = PhaseMemoryRuntime(graph_store=store, event_log=JsonlMemoryEventLog(tmp_path / "events.jsonl")) node = store.save_node(MemoryNode("node.a", "episode", "Trace text")) compact = plan_compaction([node]) denied = runtime.apply_lifecycle_actions([compact]) assert denied["valid"] is False assert denied["data"]["denied"][0]["reason"] == "review_required" assert [node.node_id for node in store.list_nodes()] == ["node.a"] applied = runtime.apply_lifecycle_actions([compact], approval_marker="review:local") assert applied["valid"] is True assert store.get_node(compact.target_id).kind == "summary" def test_lifecycle_apply_can_mark_non_review_actions(tmp_path) -> None: store = FileBackedMemoryGraphStore(tmp_path) runtime = PhaseMemoryRuntime(graph_store=store, event_log=JsonlMemoryEventLog(tmp_path / "events.jsonl")) store.save_node(MemoryNode("stale", "episode", freshness={"updated_at": "2026-05-01T00:00:00+00:00"})) action = LifecycleAction( LifecycleActionKind.MARK_STALE, "stale", from_state=LifecycleState.ACTIVE, to_state=LifecycleState.STALE, ) envelope = runtime.apply_lifecycle_actions([action]) assert envelope["valid"] is True assert store.get_node("stale").lifecycle == LifecycleState.STALE