Implement phase-memory foundation

This commit is contained in:
2026-05-18 01:55:34 +02:00
parent 751da54052
commit 87f104781a
22 changed files with 1705 additions and 12 deletions

View File

@@ -1,3 +1,50 @@
# repo-seed
# phase-memory
A git repository template to bootstrap coulomb projects from.
`phase-memory` is the profile-driven memory operating layer for agentic
systems. It interprets Markitect memory profiles as runtime plans, models
memory phases, and produces deterministic dry-run actions for retention,
refresh, compaction, stabilization, and activation.
The first implementation slice is local-first and dependency-light. It does not
launch a service or mutate durable memory stores by default.
## Repository Role
Adjacent repositories own nearby but separate concerns:
- `markitect-tool`: memory profile, graph, event, and selection contracts plus
context-package compilation.
- `kontextual-engine`: durable knowledge/runtime records, permission-aware
retrieval, audit records, and long-lived storage.
- `infospace-bench`: concrete pilots, restart-package evaluation, metrics, and
fixture feedback.
`phase-memory` owns the orchestration layer between them:
- phase semantics: ephemeral, fluid, stabilized, rigid
- profile execution planning
- retention, deletion, refresh, compaction, and stabilization decisions
- activation planning under token and item budgets
- adapter, policy, audit, and observability boundaries
## Quick Start
```bash
python3 -m pytest
```
The default test suite uses only deterministic local fixtures.
## Package Map
- `phase_memory.models`: domain records, phases, lifecycle states, diagnostics,
and plan envelopes.
- `phase_memory.contracts`: Markitect-compatible profile/graph ingress.
- `phase_memory.planner`: profile execution planning.
- `phase_memory.lifecycle`: dry-run lifecycle planning.
- `phase_memory.activation`: Markitect-compatible activation selection planning.
- `phase_memory.ports`: runtime port protocols.
- `phase_memory.adapters`: deterministic in-memory test adapters.
See [docs/architecture.md](docs/architecture.md) for the first architecture
sketch and [SCOPE.md](SCOPE.md) for repository boundaries.

40
SCOPE.md Normal file
View File

@@ -0,0 +1,40 @@
# SCOPE
## In Scope
`phase-memory` owns phase-aware memory runtime planning for agentic systems.
Current scope:
- memory phase semantics for ephemeral, fluid, stabilized, and rigid memory
- profile execution planning from Markitect-compatible profile dictionaries
- deterministic lifecycle plans for retention, deletion request, refresh,
compaction, stabilization, and activation
- local runtime port definitions for graph stores, event logs, context-package
compilers, semantic indexes, policy gateways, audit sinks, and runtime
registries
- deterministic in-memory adapters for tests and early integration
- fixture-driven compatibility with Markitect memory profile and graph shapes
## Out Of Scope
`phase-memory` does not own:
- Markitect memory syntax vocabulary or context-package internals
- generic content or knowledge artifact persistence
- durable knowledge query APIs already owned by `kontextual-engine`
- benchmark corpus ownership or evaluation dashboards
- full identity, authorization, or policy decision platforms
- generic vector database behavior
- provider-specific LLM memory plugins
- durable service deployment topology in the first implementation slice
## Adjacent Repositories
- `markitect-tool` owns memory contracts, validation, and package compilation.
- `kontextual-engine` owns durable runtime records, audit, policy-aware
retrieval, and persistence.
- `infospace-bench` owns applied pilots, fixture quality checks, and metrics.
`phase-memory` consumes their contracts and evidence while keeping its own
implementation centered on memory-native planning.

View File

@@ -0,0 +1,34 @@
# Candidate PMEM-WP-0002 Outline
Status: candidate outline
## Title
Local Runtime Service And Markitect Package Bridge
## Purpose
Turn the PMEM-WP-0001 planning foundation into a usable local runtime facade.
The second slice should remain local-first, but it can expose a concrete API
for importing profiles/graphs, planning lifecycle actions, and handing selected
activations to Markitect.
## Candidate Tasks
1. Add a small application service that combines contract ingress, profile
planning, lifecycle planning, activation planning, and local adapters.
2. Add a CLI command for `phase-memory profile plan <profile.json>`.
3. Add a CLI command for `phase-memory graph activate <graph.json>`.
4. Add optional Markitect compiler integration behind a narrow port.
5. Add JSONL file-backed store and event-log adapters.
6. Add a compatibility fixture that mirrors the Infospace Bench agentic memory
pilot in phase-memory terms.
7. Define the first external adapter contract for Kontextual Engine delegation.
## Guardrails
- Keep durable writes review-gated.
- Keep Markitect schema validation delegated or adapter-wrapped.
- Keep service and CLI outputs deterministic.
- Do not require live LLM, vector, graph database, or enterprise policy
services in the default test suite.

22
pyproject.toml Normal file
View File

@@ -0,0 +1,22 @@
[build-system]
requires = ["setuptools>=68"]
build-backend = "setuptools.build_meta"
[project]
name = "phase-memory"
version = "0.1.0"
description = "Profile-driven memory phase planning for agentic systems"
readme = "README.md"
requires-python = ">=3.11"
license = { text = "MIT" }
authors = [
{ name = "Coulomb" }
]
dependencies = []
[tool.setuptools.packages.find]
where = ["src"]
[tool.pytest.ini_options]
pythonpath = ["src"]
testpaths = ["tests"]

View File

@@ -0,0 +1,54 @@
"""Profile-driven memory phase planning."""
from .activation import plan_activation
from .contracts import graph_from_markitect, profile_from_markitect
from .lifecycle import (
plan_compaction,
plan_phase_transition,
plan_refresh,
plan_retention,
)
from .models import (
ActivationPlan,
Diagnostic,
LifecycleAction,
LifecycleActionKind,
LifecycleState,
MemoryEdge,
MemoryEvent,
MemoryGraph,
MemoryKind,
MemoryNode,
MemoryPhase,
PolicyDecision,
ProfileExecutionPlan,
ProfileIntent,
)
from .planner import plan_profile_execution
__all__ = [
"ActivationPlan",
"Diagnostic",
"LifecycleAction",
"LifecycleActionKind",
"LifecycleState",
"MemoryEdge",
"MemoryEvent",
"MemoryGraph",
"MemoryKind",
"MemoryNode",
"MemoryPhase",
"PolicyDecision",
"ProfileExecutionPlan",
"ProfileIntent",
"graph_from_markitect",
"plan_activation",
"plan_compaction",
"plan_phase_transition",
"plan_profile_execution",
"plan_refresh",
"plan_retention",
"profile_from_markitect",
]
__version__ = "0.1.0"

View File

@@ -0,0 +1,94 @@
"""Activation planning and Markitect selection handoff."""
from __future__ import annotations
from .models import ActivationPlan, Diagnostic, LifecycleAction, LifecycleActionKind, MemoryGraph
from .utils import stable_digest
def plan_activation(
graph: MemoryGraph,
*,
max_items: int,
max_tokens: int,
profile_id: str | None = None,
priority_node_ids: tuple[str, ...] = (),
include_events: bool = True,
) -> ActivationPlan:
selected: list[str] = []
omitted: list[dict[str, object]] = []
token_estimate = 0
ordered_nodes = _ordered_nodes(graph, priority_node_ids)
for node in ordered_nodes:
node_tokens = _estimate_tokens(node.text or node.kind)
if len(selected) >= max_items:
omitted.append({"id": node.node_id, "reason": "max_items"})
continue
if token_estimate + node_tokens > max_tokens:
omitted.append({"id": node.node_id, "reason": "max_tokens", "tokens": node_tokens})
continue
selected.append(node.node_id)
token_estimate += node_tokens
selected_event_ids: tuple[str, ...] = ()
if include_events:
selected_event_ids = tuple(event.event_id for event in graph.events if event.package_refs or event.activation_refs)
plan_id = f"activation:{stable_digest([graph.graph_id, selected, selected_event_ids, max_items, max_tokens])}"
diagnostics = tuple(
Diagnostic("info", "activation_omitted_items", "Some nodes were omitted by activation budget.", metadata={"count": len(omitted)})
for _ in [None]
if omitted
)
selection = {
"schema_version": "markitect.memory.selection.v1",
"id": plan_id,
"graph": graph.graph_id,
"profile": profile_id,
"nodes": selected,
"events": list(selected_event_ids),
"metadata": {
"planned_by": "phase-memory",
"token_estimate": token_estimate,
"max_items": max_items,
"max_tokens": max_tokens,
"omitted": omitted,
},
}
return ActivationPlan(
plan_id=plan_id,
graph_id=graph.graph_id,
selected_node_ids=tuple(selected),
selected_event_ids=selected_event_ids,
omitted=tuple(omitted),
token_estimate=token_estimate,
max_items=max_items,
max_tokens=max_tokens,
selection=selection,
diagnostics=diagnostics,
)
def activation_action(plan: ActivationPlan) -> LifecycleAction:
return LifecycleAction(
LifecycleActionKind.ACTIVATE,
target_id=plan.plan_id,
reason="compile planned selection through Markitect context-package boundary",
metadata={"selection": plan.selection},
)
def _ordered_nodes(graph: MemoryGraph, priority_node_ids: tuple[str, ...]):
by_id = graph.node_by_id()
priority = [by_id[node_id] for node_id in priority_node_ids if node_id in by_id]
remaining = sorted(
(node for node in graph.nodes if node.node_id not in set(priority_node_ids)),
key=lambda node: node.node_id,
)
return priority + remaining
def _estimate_tokens(text: str) -> int:
return max(1, len(text.split()))

View File

@@ -0,0 +1,87 @@
"""Deterministic local adapters used by tests and first integrations."""
from __future__ import annotations
from typing import Any
from .models import MemoryEdge, MemoryEvent, MemoryNode, PolicyDecision, ProfileIntent
class InMemoryMemoryGraphStore:
def __init__(self) -> None:
self._profiles: dict[str, ProfileIntent] = {}
self._nodes: dict[str, MemoryNode] = {}
self._edges: dict[str, MemoryEdge] = {}
def save_profile(self, profile: ProfileIntent) -> ProfileIntent:
self._profiles[profile.profile_id] = profile
return profile
def get_profile(self, profile_id: str) -> ProfileIntent:
return self._profiles[profile_id]
def save_node(self, node: MemoryNode) -> MemoryNode:
self._nodes[node.node_id] = node
return node
def get_node(self, node_id: str) -> MemoryNode:
return self._nodes[node_id]
def list_nodes(self, *, kind: str | None = None) -> list[MemoryNode]:
nodes = list(self._nodes.values())
if kind:
nodes = [node for node in nodes if node.kind == kind]
return sorted(nodes, key=lambda node: node.node_id)
def save_edge(self, edge: MemoryEdge) -> MemoryEdge:
self._edges[edge.edge_id] = edge
return edge
def list_edges(self, *, source: str | None = None, target: str | None = None) -> list[MemoryEdge]:
edges = list(self._edges.values())
if source:
edges = [edge for edge in edges if edge.source == source]
if target:
edges = [edge for edge in edges if edge.target == target]
return sorted(edges, key=lambda edge: edge.edge_id)
class InMemoryMemoryEventLog:
def __init__(self) -> None:
self._events: list[MemoryEvent] = []
def append(self, event: MemoryEvent) -> MemoryEvent:
if any(existing.event_id == event.event_id for existing in self._events):
raise ValueError(f"Duplicate memory event id: {event.event_id}")
self._events.append(event)
return event
def list_events(self, *, kind: str | None = None) -> list[MemoryEvent]:
events = list(self._events)
if kind:
events = [event for event in events if event.kind == kind]
return events
class NoopContextPackageCompiler:
def compile_selection(self, selection: dict[str, Any]) -> dict[str, Any]:
return {
"package_id": f"package:{selection.get('id', 'anonymous')}",
"selection": dict(selection),
"item_count": len(selection.get("nodes", ())) + len(selection.get("events", ())),
}
class AllowAllPolicyGateway:
def authorize(self, *, action: str, resource: str, context: dict[str, Any] | None = None) -> PolicyDecision:
return PolicyDecision(True, reason="local allow-all policy", metadata={"action": action, "resource": resource, "context": context or {}})
class RecordingAuditSink:
def __init__(self) -> None:
self.events: list[dict[str, Any]] = []
def record(self, event: dict[str, Any]) -> dict[str, Any]:
stored = dict(event)
self.events.append(stored)
return {"recorded": True, "index": len(self.events) - 1, "event": stored}

View File

@@ -0,0 +1,156 @@
"""Ingress adapters for Markitect-compatible memory contracts."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
from .models import Diagnostic, MemoryGraph, MemoryEdge, MemoryEvent, MemoryKind, MemoryNode, ProfileIntent
MARKITECT_PROFILE_SCHEMA = "markitect.memory.profile.v1"
MARKITECT_GRAPH_SCHEMA = "markitect.memory.graph.v1"
MARKITECT_SELECTION_SCHEMA = "markitect.memory.selection.v1"
KNOWN_MEMORY_KINDS = {kind.value for kind in MemoryKind}
@dataclass(frozen=True)
class ContractIngressResult:
subject_kind: str
subject_id: str
valid: bool
diagnostics: tuple[Diagnostic, ...]
value: Any
def to_dict(self) -> dict[str, Any]:
return {
"subject_kind": self.subject_kind,
"subject_id": self.subject_id,
"valid": self.valid,
"diagnostics": [diagnostic.to_dict() for diagnostic in self.diagnostics],
"value": self.value.to_dict() if hasattr(self.value, "to_dict") else self.value,
}
def profile_from_markitect(data: dict[str, Any]) -> ContractIngressResult:
diagnostics: list[Diagnostic] = []
profile = ProfileIntent.from_mapping(data)
if profile.schema_version != MARKITECT_PROFILE_SCHEMA:
diagnostics.append(
Diagnostic(
"error",
"unsupported_profile_schema",
f"Expected {MARKITECT_PROFILE_SCHEMA}.",
"schema_version",
{"actual": profile.schema_version},
)
)
if not profile.profile_id:
diagnostics.append(Diagnostic("error", "missing_profile_id", "Profile must declare id.", "id"))
for index, memory_kind in enumerate(profile.memory_kinds):
if memory_kind not in KNOWN_MEMORY_KINDS:
diagnostics.append(
Diagnostic(
"warn",
"unknown_memory_kind",
"Memory kind is not in phase-memory's first-slice vocabulary.",
f"memory_kinds[{index}]",
{"kind": memory_kind},
)
)
return ContractIngressResult(
subject_kind="memory_profile",
subject_id=profile.profile_id,
valid=not _has_errors(diagnostics),
diagnostics=tuple(diagnostics),
value=profile,
)
def graph_from_markitect(data: dict[str, Any]) -> ContractIngressResult:
diagnostics: list[Diagnostic] = []
schema_version = str(data.get("schema_version") or data.get("schema") or "")
graph_id = str(data.get("id") or data.get("graph_id") or "")
if schema_version != MARKITECT_GRAPH_SCHEMA:
diagnostics.append(
Diagnostic(
"error",
"unsupported_graph_schema",
f"Expected {MARKITECT_GRAPH_SCHEMA}.",
"schema_version",
{"actual": schema_version},
)
)
if not graph_id:
diagnostics.append(Diagnostic("error", "missing_graph_id", "Graph must declare id.", "id"))
nodes = tuple(MemoryNode.from_mapping(item) for item in data.get("nodes", ()))
edges = tuple(MemoryEdge.from_mapping(item) for item in data.get("edges", ()))
events = tuple(MemoryEvent.from_mapping(item) for item in data.get("events", ()))
node_ids = {node.node_id for node in nodes}
for index, node in enumerate(nodes):
if not node.node_id:
diagnostics.append(Diagnostic("error", "missing_node_id", "Graph node must declare id.", f"nodes[{index}].id"))
if not node.kind:
diagnostics.append(Diagnostic("error", "missing_node_kind", "Graph node must declare kind.", f"nodes[{index}].kind"))
for index, edge in enumerate(edges):
if edge.source not in node_ids:
diagnostics.append(
Diagnostic("error", "missing_edge_source", "Edge source does not reference a node.", f"edges[{index}].source", {"source": edge.source})
)
if edge.target not in node_ids:
diagnostics.append(
Diagnostic("error", "missing_edge_target", "Edge target does not reference a node.", f"edges[{index}].target", {"target": edge.target})
)
graph = MemoryGraph(
graph_id=graph_id,
schema_version=schema_version,
nodes=nodes,
edges=edges,
events=events,
metadata=dict(data.get("metadata") or {}),
)
return ContractIngressResult(
subject_kind="memory_graph",
subject_id=graph_id,
valid=not _has_errors(diagnostics),
diagnostics=tuple(diagnostics),
value=graph,
)
def selection_from_markitect(data: dict[str, Any]) -> ContractIngressResult:
diagnostics: list[Diagnostic] = []
schema_version = str(data.get("schema_version") or data.get("schema") or "")
selection_id = str(data.get("id") or data.get("selection_id") or "")
if schema_version != MARKITECT_SELECTION_SCHEMA:
diagnostics.append(
Diagnostic(
"error",
"unsupported_selection_schema",
f"Expected {MARKITECT_SELECTION_SCHEMA}.",
"schema_version",
{"actual": schema_version},
)
)
if not selection_id:
diagnostics.append(Diagnostic("error", "missing_selection_id", "Selection must declare id.", "id"))
return ContractIngressResult(
subject_kind="memory_selection",
subject_id=selection_id,
valid=not _has_errors(diagnostics),
diagnostics=tuple(diagnostics),
value=dict(data),
)
def _has_errors(diagnostics: list[Diagnostic]) -> bool:
return any(diagnostic.severity == "error" for diagnostic in diagnostics)

View File

@@ -0,0 +1,127 @@
"""Dry-run lifecycle planners."""
from __future__ import annotations
from datetime import datetime, timezone
from .models import LifecycleAction, LifecycleActionKind, LifecycleState, MemoryNode, MemoryPhase
from .utils import parse_iso_datetime, stable_digest
def plan_phase_transition(
node: MemoryNode,
target_phase: MemoryPhase,
*,
reason: str = "",
) -> LifecycleAction:
requires_review = target_phase in {MemoryPhase.STABILIZED, MemoryPhase.RIGID}
target_state = LifecycleState.REVIEW_NEEDED if requires_review else LifecycleState.ACTIVE
return LifecycleAction(
LifecycleActionKind.TRANSITION_PHASE,
target_id=node.node_id,
from_state=node.lifecycle,
to_state=target_state,
reason=reason or f"transition {node.phase.value} to {target_phase.value}",
requires_review=requires_review,
metadata={"from_phase": node.phase.value, "to_phase": target_phase.value},
)
def plan_retention(
nodes: list[MemoryNode] | tuple[MemoryNode, ...],
*,
stale_after_days: int | None = None,
delete_after_days: int | None = None,
now: datetime | None = None,
) -> tuple[LifecycleAction, ...]:
now = now or datetime.now(timezone.utc)
actions: list[LifecycleAction] = []
for node in nodes:
age = _age_days(node, now)
if age is None:
continue
if delete_after_days is not None and age >= delete_after_days:
actions.append(
LifecycleAction(
LifecycleActionKind.REQUEST_DELETE,
node.node_id,
from_state=node.lifecycle,
to_state=LifecycleState.DELETE_REQUESTED,
reason=f"memory age {age}d exceeds delete_after_days={delete_after_days}",
requires_review=True,
metadata={"age_days": age, "physical_delete": False},
)
)
elif stale_after_days is not None and age >= stale_after_days:
actions.append(
LifecycleAction(
LifecycleActionKind.MARK_STALE,
node.node_id,
from_state=node.lifecycle,
to_state=LifecycleState.STALE,
reason=f"memory age {age}d exceeds stale_after_days={stale_after_days}",
metadata={"age_days": age},
)
)
return tuple(actions)
def plan_compaction(
nodes: list[MemoryNode] | tuple[MemoryNode, ...],
*,
summary_node_id: str | None = None,
retire_source_nodes: bool = True,
reason: str = "",
) -> LifecycleAction:
source_ids = tuple(node.node_id for node in nodes)
summary_id = summary_node_id or f"summary:{stable_digest(source_ids)}"
return LifecycleAction(
LifecycleActionKind.COMPACT,
target_id=summary_id,
reason=reason or "compact selected memory window into reviewable summary",
requires_review=True,
metadata={
"source_node_ids": list(source_ids),
"retire_source_nodes": retire_source_nodes,
"summary_text": _summary_text(nodes),
},
)
def plan_refresh(
nodes: list[MemoryNode] | tuple[MemoryNode, ...],
*,
source_digest_by_node_id: dict[str, str],
) -> tuple[LifecycleAction, ...]:
actions: list[LifecycleAction] = []
for node in nodes:
current = node.freshness.get("source_digest")
proposed = source_digest_by_node_id.get(node.node_id)
if proposed and proposed != current:
actions.append(
LifecycleAction(
LifecycleActionKind.REFRESH,
target_id=node.node_id,
from_state=node.lifecycle,
to_state=LifecycleState.REVIEW_NEEDED,
reason="source digest changed",
requires_review=True,
metadata={"current_digest": current, "proposed_digest": proposed},
)
)
return tuple(actions)
def _age_days(node: MemoryNode, now: datetime) -> int | None:
updated = parse_iso_datetime(str(node.freshness.get("updated_at") or node.updated_at))
if updated is None:
return None
return max((now - updated).days, 0)
def _summary_text(nodes: list[MemoryNode] | tuple[MemoryNode, ...]) -> str:
parts = [node.text.strip() for node in nodes if node.text.strip()]
if not parts:
return "Summary proposed for selected memory nodes."
return " ".join(parts)[:240]

437
src/phase_memory/models.py Normal file
View File

@@ -0,0 +1,437 @@
"""Core phase-memory records.
The model layer is deliberately small and serializable. Durable persistence,
schema ownership, and external adapter behavior live outside these records.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
from .utils import compact_dict, stable_digest, to_plain, utc_now_iso
class MemoryPhase(str, Enum):
EPHEMERAL = "ephemeral"
FLUID = "fluid"
STABILIZED = "stabilized"
RIGID = "rigid"
class MemoryKind(str, Enum):
REASONING = "reasoning"
CONVERSATION = "conversation"
KNOWLEDGE = "knowledge"
PACKAGE = "package"
IDENTITY = "identity"
PREFERENCE = "preference"
SOURCE = "source"
TASK = "task"
TOOL = "tool"
class LifecycleState(str, Enum):
ACTIVE = "active"
STALE = "stale"
REVIEW_NEEDED = "review_needed"
COMPACTED = "compacted"
SUPERSEDED = "superseded"
DELETE_REQUESTED = "delete_requested"
ARCHIVED = "archived"
class LifecycleActionKind(str, Enum):
TRANSITION_PHASE = "transition_phase"
MARK_STALE = "mark_stale"
REQUEST_DELETE = "request_delete"
COMPACT = "compact"
REFRESH = "refresh"
ACTIVATE = "activate"
DENY = "deny"
NO_OP = "no_op"
@dataclass(frozen=True)
class Diagnostic:
severity: str
code: str
message: str
path: str = ""
metadata: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
return compact_dict(
{
"severity": self.severity,
"code": self.code,
"message": self.message,
"path": self.path,
"metadata": self.metadata,
}
)
@dataclass(frozen=True)
class ProfileIntent:
profile_id: str
schema_version: str = "markitect.memory.profile.v1"
title: str = ""
intent: str = ""
memory_kinds: tuple[str, ...] = ()
stores: dict[str, str] = field(default_factory=dict)
limits: dict[str, Any] = field(default_factory=dict)
latency: dict[str, Any] = field(default_factory=dict)
retention: dict[str, Any] = field(default_factory=dict)
refresh: dict[str, Any] = field(default_factory=dict)
compaction: dict[str, Any] = field(default_factory=dict)
activation: dict[str, Any] = field(default_factory=dict)
policy: dict[str, Any] = field(default_factory=dict)
observability: dict[str, Any] = field(default_factory=dict)
failure: dict[str, Any] = field(default_factory=dict)
metadata: dict[str, Any] = field(default_factory=dict)
@classmethod
def from_mapping(cls, data: dict[str, Any]) -> "ProfileIntent":
return cls(
profile_id=str(data.get("id") or data.get("profile_id") or ""),
schema_version=str(data.get("schema_version") or data.get("schema") or ""),
title=str(data.get("title") or data.get("name") or ""),
intent=str(data.get("intent") or data.get("description") or ""),
memory_kinds=tuple(str(item) for item in data.get("memory_kinds", ())),
stores={str(key): str(value) for key, value in dict(data.get("stores") or {}).items()},
limits=dict(data.get("limits") or {}),
latency=dict(data.get("latency") or data.get("latency_targets") or {}),
retention=dict(data.get("retention") or data.get("retention_policy") or {}),
refresh=dict(data.get("refresh") or {}),
compaction=dict(data.get("compaction") or {}),
activation=dict(data.get("activation") or data.get("context_budget") or {}),
policy=dict(data.get("policy") or {}),
observability=dict(data.get("observability") or {}),
failure=dict(data.get("failure") or {}),
metadata=dict(data.get("metadata") or {}),
)
def to_dict(self) -> dict[str, Any]:
return compact_dict(
{
"id": self.profile_id,
"schema_version": self.schema_version,
"title": self.title,
"intent": self.intent,
"memory_kinds": list(self.memory_kinds),
"stores": dict(self.stores),
"limits": dict(self.limits),
"latency": dict(self.latency),
"retention": dict(self.retention),
"refresh": dict(self.refresh),
"compaction": dict(self.compaction),
"activation": dict(self.activation),
"policy": dict(self.policy),
"observability": dict(self.observability),
"failure": dict(self.failure),
"metadata": dict(self.metadata),
}
)
@dataclass(frozen=True)
class MemoryNode:
node_id: str
kind: str
text: str = ""
phase: MemoryPhase = MemoryPhase.FLUID
lifecycle: LifecycleState = LifecycleState.ACTIVE
namespace: dict[str, Any] = field(default_factory=dict)
source_spans: tuple[dict[str, Any], ...] = ()
provenance: tuple[dict[str, Any], ...] = ()
freshness: dict[str, Any] = field(default_factory=dict)
confidence: float | None = None
policy: dict[str, Any] = field(default_factory=dict)
metadata: dict[str, Any] = field(default_factory=dict)
created_at: str = field(default_factory=utc_now_iso)
updated_at: str = field(default_factory=utc_now_iso)
@classmethod
def from_mapping(cls, data: dict[str, Any]) -> "MemoryNode":
return cls(
node_id=str(data.get("id") or data.get("node_id") or ""),
kind=str(data.get("kind") or data.get("type") or ""),
text=str(data.get("text") or data.get("content") or data.get("summary") or ""),
phase=MemoryPhase(str(data.get("phase") or MemoryPhase.FLUID.value)),
lifecycle=LifecycleState(str(data.get("lifecycle") or LifecycleState.ACTIVE.value)),
namespace=dict(data.get("namespace") or {}),
source_spans=tuple(dict(item) for item in _mapping_list(data.get("source_spans"))),
provenance=tuple(dict(item) for item in _mapping_list(data.get("provenance"))),
freshness=dict(data.get("freshness") or {}),
confidence=_optional_float(data.get("confidence")),
policy=dict(data.get("policy") or {}),
metadata=dict(data.get("metadata") or {}),
created_at=str(data.get("created_at") or utc_now_iso()),
updated_at=str(data.get("updated_at") or utc_now_iso()),
)
@property
def id(self) -> str:
return self.node_id
def to_dict(self) -> dict[str, Any]:
return compact_dict(
{
"id": self.node_id,
"kind": self.kind,
"text": self.text,
"phase": self.phase,
"lifecycle": self.lifecycle,
"namespace": self.namespace,
"source_spans": list(self.source_spans),
"provenance": list(self.provenance),
"freshness": self.freshness,
"confidence": self.confidence,
"policy": self.policy,
"metadata": self.metadata,
"created_at": self.created_at,
"updated_at": self.updated_at,
}
)
@dataclass(frozen=True)
class MemoryEdge:
edge_id: str
kind: str
source: str
target: str
provenance: tuple[dict[str, Any], ...] = ()
metadata: dict[str, Any] = field(default_factory=dict)
@classmethod
def from_mapping(cls, data: dict[str, Any]) -> "MemoryEdge":
kind = str(data.get("kind") or data.get("type") or "")
source = str(data.get("source") or data.get("from") or "")
target = str(data.get("target") or data.get("to") or "")
edge_id = str(data.get("id") or f"edge:{stable_digest([kind, source, target])}")
return cls(
edge_id=edge_id,
kind=kind,
source=source,
target=target,
provenance=tuple(dict(item) for item in _mapping_list(data.get("provenance"))),
metadata=dict(data.get("metadata") or {}),
)
def to_dict(self) -> dict[str, Any]:
return compact_dict(
{
"id": self.edge_id,
"kind": self.kind,
"source": self.source,
"target": self.target,
"provenance": list(self.provenance),
"metadata": self.metadata,
}
)
@dataclass(frozen=True)
class MemoryEvent:
event_id: str
kind: str
timestamp: str = field(default_factory=utc_now_iso)
node_updates: tuple[dict[str, Any], ...] = ()
edge_updates: tuple[dict[str, Any], ...] = ()
package_refs: tuple[str, ...] = ()
activation_refs: tuple[str, ...] = ()
policy: dict[str, Any] = field(default_factory=dict)
metadata: dict[str, Any] = field(default_factory=dict)
@classmethod
def from_mapping(cls, data: dict[str, Any]) -> "MemoryEvent":
kind = str(data.get("kind") or data.get("type") or "recorded")
timestamp = str(data.get("timestamp") or data.get("at") or utc_now_iso())
event_id = str(data.get("id") or data.get("event_id") or f"event:{stable_digest([kind, timestamp, data])}")
return cls(
event_id=event_id,
kind=kind,
timestamp=timestamp,
node_updates=tuple(dict(item) for item in _mapping_list(data.get("node_updates"))),
edge_updates=tuple(dict(item) for item in _mapping_list(data.get("edge_updates"))),
package_refs=tuple(str(item) for item in data.get("package_refs", ())),
activation_refs=tuple(str(item) for item in data.get("activation_refs", ())),
policy=dict(data.get("policy") or {}),
metadata=dict(data.get("metadata") or {}),
)
def to_dict(self) -> dict[str, Any]:
return compact_dict(
{
"id": self.event_id,
"kind": self.kind,
"timestamp": self.timestamp,
"node_updates": list(self.node_updates),
"edge_updates": list(self.edge_updates),
"package_refs": list(self.package_refs),
"activation_refs": list(self.activation_refs),
"policy": self.policy,
"metadata": self.metadata,
}
)
@dataclass(frozen=True)
class MemoryGraph:
graph_id: str
schema_version: str = "markitect.memory.graph.v1"
nodes: tuple[MemoryNode, ...] = ()
edges: tuple[MemoryEdge, ...] = ()
events: tuple[MemoryEvent, ...] = ()
metadata: dict[str, Any] = field(default_factory=dict)
@property
def node_ids(self) -> set[str]:
return {node.node_id for node in self.nodes}
def node_by_id(self) -> dict[str, MemoryNode]:
return {node.node_id: node for node in self.nodes}
def to_dict(self) -> dict[str, Any]:
return compact_dict(
{
"id": self.graph_id,
"schema_version": self.schema_version,
"nodes": [node.to_dict() for node in self.nodes],
"edges": [edge.to_dict() for edge in self.edges],
"events": [event.to_dict() for event in self.events],
"metadata": self.metadata,
}
)
@dataclass(frozen=True)
class PolicyDecision:
allowed: bool
reason: str = ""
obligations: tuple[str, ...] = ()
metadata: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
return compact_dict(
{
"allowed": self.allowed,
"reason": self.reason,
"obligations": list(self.obligations),
"metadata": self.metadata,
}
)
@dataclass(frozen=True)
class LifecycleAction:
action: LifecycleActionKind
target_id: str
from_state: LifecycleState | None = None
to_state: LifecycleState | None = None
reason: str = ""
requires_review: bool = False
metadata: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
return compact_dict(
{
"action": self.action,
"target_id": self.target_id,
"from_state": self.from_state,
"to_state": self.to_state,
"reason": self.reason,
"requires_review": self.requires_review,
"metadata": self.metadata,
}
)
@dataclass(frozen=True)
class ProfileExecutionPlan:
profile_id: str
dry_run: bool = True
enabled_memory_kinds: tuple[str, ...] = ()
required_adapters: tuple[str, ...] = ()
missing_adapters: tuple[str, ...] = ()
capabilities: tuple[str, ...] = ()
activation_budget: dict[str, Any] = field(default_factory=dict)
policy_gates: tuple[str, ...] = ()
observability_events: tuple[str, ...] = ()
fallback_behavior: dict[str, Any] = field(default_factory=dict)
actions: tuple[LifecycleAction, ...] = ()
diagnostics: tuple[Diagnostic, ...] = ()
metadata: dict[str, Any] = field(default_factory=dict)
@property
def ready(self) -> bool:
return not self.missing_adapters and not any(
diagnostic.severity == "error" for diagnostic in self.diagnostics
)
def to_dict(self) -> dict[str, Any]:
return compact_dict(
{
"profile_id": self.profile_id,
"dry_run": self.dry_run,
"ready": self.ready,
"enabled_memory_kinds": list(self.enabled_memory_kinds),
"required_adapters": list(self.required_adapters),
"missing_adapters": list(self.missing_adapters),
"capabilities": list(self.capabilities),
"activation_budget": self.activation_budget,
"policy_gates": list(self.policy_gates),
"observability_events": list(self.observability_events),
"fallback_behavior": self.fallback_behavior,
"actions": [action.to_dict() for action in self.actions],
"diagnostics": [diagnostic.to_dict() for diagnostic in self.diagnostics],
"metadata": self.metadata,
}
)
@dataclass(frozen=True)
class ActivationPlan:
plan_id: str
graph_id: str
selected_node_ids: tuple[str, ...] = ()
selected_event_ids: tuple[str, ...] = ()
omitted: tuple[dict[str, Any], ...] = ()
token_estimate: int = 0
max_items: int = 0
max_tokens: int = 0
selection: dict[str, Any] = field(default_factory=dict)
diagnostics: tuple[Diagnostic, ...] = ()
def to_dict(self) -> dict[str, Any]:
return compact_dict(
{
"plan_id": self.plan_id,
"graph_id": self.graph_id,
"selected_node_ids": list(self.selected_node_ids),
"selected_event_ids": list(self.selected_event_ids),
"omitted": list(self.omitted),
"token_estimate": self.token_estimate,
"max_items": self.max_items,
"max_tokens": self.max_tokens,
"selection": to_plain(self.selection),
"diagnostics": [diagnostic.to_dict() for diagnostic in self.diagnostics],
}
)
def _mapping_list(value: Any) -> list[dict[str, Any]]:
if value is None:
return []
if isinstance(value, dict):
return [value]
return [dict(item) for item in value]
def _optional_float(value: Any) -> float | None:
if value is None or value == "":
return None
return float(value)

129
src/phase_memory/planner.py Normal file
View File

@@ -0,0 +1,129 @@
"""Profile execution planning."""
from __future__ import annotations
from collections.abc import Iterable
from .models import (
Diagnostic,
LifecycleAction,
LifecycleActionKind,
ProfileExecutionPlan,
ProfileIntent,
)
DEFAULT_LOCAL_ADAPTERS = {
"local-event-log",
"local-graph-store",
"markitect-context-package",
"markitect-context-registry",
"markitect-memory-graph-fixture",
"infospace-workflow-trace-fixtures",
"infospace-artifact-neighborhood",
}
def plan_profile_execution(
profile: ProfileIntent,
*,
available_adapters: Iterable[str] | None = None,
) -> ProfileExecutionPlan:
available = set(DEFAULT_LOCAL_ADAPTERS if available_adapters is None else available_adapters)
required_adapters = tuple(sorted(set(profile.stores.values())))
missing_adapters = tuple(adapter for adapter in required_adapters if adapter not in available)
diagnostics: list[Diagnostic] = []
actions: list[LifecycleAction] = []
if not profile.profile_id:
diagnostics.append(Diagnostic("error", "missing_profile_id", "Profile execution requires a profile id."))
for adapter in missing_adapters:
diagnostics.append(
Diagnostic(
"warn",
"missing_adapter",
"Required adapter is not available; plan will use fallback behavior.",
"stores",
{"adapter": adapter},
)
)
actions.append(
LifecycleAction(
LifecycleActionKind.NO_OP,
target_id=adapter,
reason="adapter unavailable in dry-run planner",
metadata={"fallback": _missing_store_fallback(profile)},
)
)
capabilities = tuple(sorted(_capabilities_for(profile)))
policy_gates = tuple(_policy_gates(profile))
observability_events = tuple(_observability_events(profile, missing_adapters))
return ProfileExecutionPlan(
profile_id=profile.profile_id,
enabled_memory_kinds=tuple(profile.memory_kinds),
required_adapters=required_adapters,
missing_adapters=missing_adapters,
capabilities=capabilities,
activation_budget=dict(profile.activation),
policy_gates=policy_gates,
observability_events=observability_events,
fallback_behavior={
"missing_runtime_store": _missing_store_fallback(profile),
**dict(profile.failure),
},
actions=tuple(actions),
diagnostics=tuple(diagnostics),
metadata={
"stores": dict(profile.stores),
"limits": dict(profile.limits),
"retention": dict(profile.retention),
"refresh": dict(profile.refresh),
"compaction": dict(profile.compaction),
},
)
def _capabilities_for(profile: ProfileIntent) -> set[str]:
capabilities = {"profile.inspect", "profile.plan"}
if profile.retention:
capabilities.add("retention.plan")
if profile.refresh:
capabilities.add("refresh.plan")
if profile.compaction:
capabilities.add("compaction.plan")
if profile.activation:
capabilities.add("activation.plan")
if profile.policy:
capabilities.add("policy.gate")
return capabilities
def _policy_gates(profile: ProfileIntent) -> list[str]:
policy = profile.policy
gates: list[str] = []
for label in policy.get("required_labels", ()):
gates.append(f"label:{label}")
durable_writes = policy.get("durable_writes")
if durable_writes:
gates.append(f"durable_writes:{durable_writes}")
if policy.get("secrets_allowed") is False:
gates.append("secrets:denied")
if policy.get("reauthorization"):
gates.append(f"reauthorization:{policy['reauthorization']}")
return gates
def _observability_events(profile: ProfileIntent, missing_adapters: tuple[str, ...]) -> list[str]:
observability = profile.observability
events = list(observability.get("events") or observability.get("emit") or ())
if observability.get("emit_events") is True and not events:
events.append("phase_memory.profile.planned")
if missing_adapters:
events.append("phase_memory.adapter.missing")
return events
def _missing_store_fallback(profile: ProfileIntent) -> str:
return str(profile.failure.get("missing_runtime_store") or "degrade-to-dry-run")

50
src/phase_memory/ports.py Normal file
View File

@@ -0,0 +1,50 @@
"""Runtime port protocols for phase-memory adapters."""
from __future__ import annotations
from typing import Any, Protocol
from .models import MemoryEdge, MemoryEvent, MemoryGraph, MemoryNode, PolicyDecision, ProfileIntent
class MemoryGraphStore(Protocol):
def save_profile(self, profile: ProfileIntent) -> ProfileIntent: ...
def get_profile(self, profile_id: str) -> ProfileIntent: ...
def save_node(self, node: MemoryNode) -> MemoryNode: ...
def get_node(self, node_id: str) -> MemoryNode: ...
def list_nodes(self, *, kind: str | None = None) -> list[MemoryNode]: ...
def save_edge(self, edge: MemoryEdge) -> MemoryEdge: ...
def list_edges(self, *, source: str | None = None, target: str | None = None) -> list[MemoryEdge]: ...
class MemoryEventLog(Protocol):
def append(self, event: MemoryEvent) -> MemoryEvent: ...
def list_events(self, *, kind: str | None = None) -> list[MemoryEvent]: ...
class ContextPackageCompiler(Protocol):
def compile_selection(self, selection: dict[str, Any]) -> dict[str, Any]: ...
class SemanticIndex(Protocol):
def upsert_nodes(self, nodes: list[MemoryNode]) -> dict[str, Any]: ...
def query(self, *, graph_id: str, query: str, limit: int = 10) -> list[dict[str, Any]]: ...
class PolicyGateway(Protocol):
def authorize(self, *, action: str, resource: str, context: dict[str, Any] | None = None) -> PolicyDecision: ...
class AuditSink(Protocol):
def record(self, event: dict[str, Any]) -> dict[str, Any]: ...
class RuntimeRegistry(Protocol):
def publish_runtime_envelope(self, envelope: dict[str, Any]) -> dict[str, Any]: ...
def fetch_runtime_envelope(self, reference: str) -> dict[str, Any]: ...
def graph_from_store(store: MemoryGraphStore, *, graph_id: str = "local") -> MemoryGraph:
return MemoryGraph(graph_id=graph_id, nodes=tuple(store.list_nodes()))

58
src/phase_memory/utils.py Normal file
View File

@@ -0,0 +1,58 @@
"""Small deterministic helpers."""
from __future__ import annotations
from dataclasses import is_dataclass, asdict
from datetime import datetime, timezone
from enum import Enum
import hashlib
import json
from typing import Any
def utc_now_iso() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat()
def stable_digest(value: Any, *, length: int = 12) -> str:
encoded = json.dumps(to_plain(value), sort_keys=True, separators=(",", ":")).encode()
return hashlib.sha256(encoded).hexdigest()[:length]
def compact_dict(mapping: dict[str, Any]) -> dict[str, Any]:
return {
key: to_plain(value)
for key, value in mapping.items()
if value not in (None, "", [], {}, ())
}
def to_plain(value: Any) -> Any:
if isinstance(value, Enum):
return value.value
if is_dataclass(value):
return to_plain(asdict(value))
if isinstance(value, dict):
return {
str(key): to_plain(item)
for key, item in value.items()
if item not in (None, "", [], {}, ())
}
if isinstance(value, (list, tuple, set)):
return [to_plain(item) for item in value]
return value
def parse_iso_datetime(value: str | None) -> datetime | None:
if not value:
return None
raw = value.strip()
if raw.endswith("Z"):
raw = raw[:-1] + "+00:00"
try:
parsed = datetime.fromisoformat(raw)
except ValueError:
return None
if parsed.tzinfo is None:
return parsed.replace(tzinfo=timezone.utc)
return parsed.astimezone(timezone.utc)

56
tests/fixtures/memory-graph.json vendored Normal file
View File

@@ -0,0 +1,56 @@
{
"schema_version": "markitect.memory.graph.v1",
"id": "phase-memory-fixture-graph",
"nodes": [
{
"id": "decision.boundary",
"kind": "decision",
"text": "Markitect owns syntax contracts; phase-memory owns runtime phase planning.",
"phase": "stabilized",
"source_spans": [{"path": "docs/architecture.md", "line_start": 1}],
"metadata": {"title": "Boundary decision"}
},
{
"id": "event.restart",
"kind": "episode",
"text": "Restart package should include boundary decision and active graph neighborhood.",
"phase": "fluid",
"freshness": {"updated_at": "2026-05-01T00:00:00+00:00", "source_digest": "old"}
},
{
"id": "artifact.profile",
"kind": "artifact",
"text": "Memory profile declares budgets, stores, retention, activation, policy, and fallback behavior.",
"phase": "stabilized",
"freshness": {"updated_at": "2026-05-18T00:00:00+00:00", "source_digest": "fresh"}
},
{
"id": "risk.durable-write",
"kind": "risk",
"text": "Durable writes must stay review gated until the runtime plan is explicit.",
"phase": "fluid"
}
],
"edges": [
{
"id": "edge.boundary-profile",
"kind": "governs",
"source": "decision.boundary",
"target": "artifact.profile"
},
{
"id": "edge.risk-boundary",
"kind": "depends_on",
"source": "risk.durable-write",
"target": "decision.boundary"
}
],
"events": [
{
"id": "event.activation",
"kind": "activated",
"timestamp": "2026-05-18T00:00:00+00:00",
"activation_refs": ["activation.fixture"]
}
]
}

45
tests/fixtures/memory-profile.json vendored Normal file
View File

@@ -0,0 +1,45 @@
{
"schema_version": "markitect.memory.profile.v1",
"id": "phase-memory-fixture-profile",
"title": "Phase Memory Fixture Profile",
"intent": "Plan reasoning, conversation, knowledge, and activation memory behavior.",
"memory_kinds": ["reasoning", "conversation", "knowledge", "package"],
"stores": {
"reasoning": "local-graph-store",
"conversation": "local-event-log",
"knowledge": "local-graph-store",
"package": "markitect-context-package"
},
"limits": {
"reasoning": {"max_nodes": 40},
"conversation": {"max_nodes": 20},
"package": {"max_items": 3}
},
"retention": {
"conversation": {"stale_after_days": 7, "delete_after_days": 30}
},
"refresh": {
"trigger": "source-artifact-or-profile-digest-change"
},
"compaction": {
"strategy": "summarize-trace-after-review"
},
"activation": {
"max_items": 3,
"max_tokens": 30
},
"policy": {
"required_labels": ["project-local"],
"durable_writes": "review-gated",
"secrets_allowed": false
},
"observability": {
"emit_events": true
},
"failure": {
"missing_runtime_store": "degrade-to-dry-run"
},
"metadata": {
"derived_from": ["MKTT-WP-0016", "IB-WP-0017"]
}
}

36
tests/test_activation.py Normal file
View File

@@ -0,0 +1,36 @@
import json
from pathlib import Path
from phase_memory.activation import activation_action, plan_activation
from phase_memory.contracts import graph_from_markitect
from phase_memory.models import LifecycleActionKind
def test_activation_planner_emits_markitect_selection_under_budget() -> None:
graph_data = json.loads((Path(__file__).parent / "fixtures" / "memory-graph.json").read_text(encoding="utf-8"))
graph = graph_from_markitect(graph_data).value
plan = plan_activation(
graph,
max_items=2,
max_tokens=18,
profile_id="phase-memory-fixture-profile",
priority_node_ids=("decision.boundary",),
)
assert plan.selected_node_ids[0] == "decision.boundary"
assert len(plan.selected_node_ids) <= 2
assert plan.token_estimate <= 18
assert plan.selection["schema_version"] == "markitect.memory.selection.v1"
assert plan.selection["profile"] == "phase-memory-fixture-profile"
assert plan.omitted
def test_activation_action_wraps_selection_for_context_package_boundary() -> None:
graph = graph_from_markitect(json.loads((Path(__file__).parent / "fixtures" / "memory-graph.json").read_text(encoding="utf-8"))).value
plan = plan_activation(graph, max_items=1, max_tokens=20)
action = activation_action(plan)
assert action.action == LifecycleActionKind.ACTIVATE
assert action.metadata["selection"]["id"] == plan.plan_id

44
tests/test_adapters.py Normal file
View File

@@ -0,0 +1,44 @@
from phase_memory.adapters import (
AllowAllPolicyGateway,
InMemoryMemoryEventLog,
InMemoryMemoryGraphStore,
NoopContextPackageCompiler,
RecordingAuditSink,
)
from phase_memory.models import MemoryEdge, MemoryEvent, MemoryNode, ProfileIntent
from phase_memory.ports import graph_from_store
def test_in_memory_store_and_event_log_are_deterministic() -> None:
store = InMemoryMemoryGraphStore()
profile = ProfileIntent(profile_id="profile")
node = MemoryNode("node.a", "decision", "Boundary decision")
edge = MemoryEdge("edge.a", "governs", "node.a", "node.a")
store.save_profile(profile)
store.save_node(node)
store.save_edge(edge)
assert store.get_profile("profile") == profile
assert store.list_nodes() == [node]
assert store.list_edges(source="node.a") == [edge]
assert graph_from_store(store).nodes == (node,)
log = InMemoryMemoryEventLog()
event = MemoryEvent("event.a", "recorded")
assert log.append(event) == event
assert log.list_events(kind="recorded") == [event]
def test_local_policy_audit_and_compiler_adapters() -> None:
policy = AllowAllPolicyGateway()
audit = RecordingAuditSink()
compiler = NoopContextPackageCompiler()
decision = policy.authorize(action="read", resource="memory-node:node.a")
receipt = audit.record({"operation": "read", "allowed": decision.allowed})
package = compiler.compile_selection({"id": "selection.a", "nodes": ["node.a"], "events": []})
assert decision.allowed
assert receipt["recorded"] is True
assert package["package_id"] == "package:selection.a"

44
tests/test_contracts.py Normal file
View File

@@ -0,0 +1,44 @@
import json
from pathlib import Path
from phase_memory.contracts import graph_from_markitect, profile_from_markitect
FIXTURES = Path(__file__).parent / "fixtures"
def test_profile_ingress_preserves_markitect_profile_intent() -> None:
data = json.loads((FIXTURES / "memory-profile.json").read_text(encoding="utf-8"))
result = profile_from_markitect(data)
assert result.valid, [diagnostic.to_dict() for diagnostic in result.diagnostics]
assert result.value.profile_id == "phase-memory-fixture-profile"
assert result.value.memory_kinds == ("reasoning", "conversation", "knowledge", "package")
assert result.value.policy["durable_writes"] == "review-gated"
def test_graph_ingress_checks_edge_integrity_without_owning_vocabulary() -> None:
data = json.loads((FIXTURES / "memory-graph.json").read_text(encoding="utf-8"))
result = graph_from_markitect(data)
assert result.valid, [diagnostic.to_dict() for diagnostic in result.diagnostics]
assert result.value.graph_id == "phase-memory-fixture-graph"
assert len(result.value.nodes) == 4
assert result.value.node_ids == {
"decision.boundary",
"event.restart",
"artifact.profile",
"risk.durable-write",
}
def test_graph_ingress_rejects_missing_edge_endpoint() -> None:
data = json.loads((FIXTURES / "memory-graph.json").read_text(encoding="utf-8"))
data["edges"][0]["target"] = "missing"
result = graph_from_markitect(data)
assert not result.valid
assert any(diagnostic.code == "missing_edge_target" for diagnostic in result.diagnostics)

49
tests/test_lifecycle.py Normal file
View File

@@ -0,0 +1,49 @@
from datetime import datetime, timezone
from phase_memory.lifecycle import plan_compaction, plan_phase_transition, plan_refresh, plan_retention
from phase_memory.models import LifecycleActionKind, LifecycleState, MemoryNode, MemoryPhase
def test_phase_transition_to_stabilized_requires_review() -> None:
node = MemoryNode("event.restart", "episode", "Useful restart trace", phase=MemoryPhase.FLUID)
action = plan_phase_transition(node, MemoryPhase.STABILIZED)
assert action.action == LifecycleActionKind.TRANSITION_PHASE
assert action.requires_review
assert action.to_state == LifecycleState.REVIEW_NEEDED
assert action.metadata["to_phase"] == "stabilized"
def test_retention_plans_stale_and_delete_requested_without_physical_delete() -> None:
now = datetime(2026, 5, 18, tzinfo=timezone.utc)
stale = MemoryNode(
"stale",
"episode",
freshness={"updated_at": "2026-05-01T00:00:00+00:00"},
)
old = MemoryNode(
"old",
"episode",
freshness={"updated_at": "2026-04-01T00:00:00+00:00"},
)
actions = plan_retention([stale, old], stale_after_days=7, delete_after_days=30, now=now)
by_target = {action.target_id: action for action in actions}
assert by_target["stale"].action == LifecycleActionKind.MARK_STALE
assert by_target["old"].action == LifecycleActionKind.REQUEST_DELETE
assert by_target["old"].metadata["physical_delete"] is False
def test_compaction_and_refresh_are_reviewable_plans() -> None:
node = MemoryNode("artifact.profile", "artifact", "Profile text", freshness={"source_digest": "old"})
compact = plan_compaction([node])
refresh = plan_refresh([node], source_digest_by_node_id={"artifact.profile": "new"})[0]
assert compact.action == LifecycleActionKind.COMPACT
assert compact.requires_review
assert refresh.action == LifecycleActionKind.REFRESH
assert refresh.requires_review
assert refresh.metadata["proposed_digest"] == "new"

32
tests/test_planner.py Normal file
View File

@@ -0,0 +1,32 @@
import json
from pathlib import Path
from phase_memory.contracts import profile_from_markitect
from phase_memory.planner import plan_profile_execution
def test_profile_execution_plan_reports_capabilities_and_gates() -> None:
profile_data = json.loads((Path(__file__).parent / "fixtures" / "memory-profile.json").read_text(encoding="utf-8"))
profile = profile_from_markitect(profile_data).value
plan = plan_profile_execution(profile)
assert plan.ready
assert "activation.plan" in plan.capabilities
assert "retention.plan" in plan.capabilities
assert "durable_writes:review-gated" in plan.policy_gates
assert "secrets:denied" in plan.policy_gates
assert "phase_memory.profile.planned" in plan.observability_events
def test_profile_execution_plan_degrades_when_adapter_is_missing() -> None:
profile_data = json.loads((Path(__file__).parent / "fixtures" / "memory-profile.json").read_text(encoding="utf-8"))
profile_data["stores"]["knowledge"] = "external-graph-store"
profile = profile_from_markitect(profile_data).value
plan = plan_profile_execution(profile, available_adapters={"local-event-log"})
assert not plan.ready
assert "external-graph-store" in plan.missing_adapters
assert plan.fallback_behavior["missing_runtime_store"] == "degrade-to-dry-run"
assert any(diagnostic.code == "missing_adapter" for diagnostic in plan.diagnostics)

6
tests/test_smoke.py Normal file
View File

@@ -0,0 +1,6 @@
import phase_memory
def test_package_imports() -> None:
assert phase_memory.__version__ == "0.1.0"
assert phase_memory.MemoryPhase.FLUID.value == "fluid"

View File

@@ -4,7 +4,7 @@ type: workplan
title: "Phase Memory Architecture And Foundation"
domain: markitect
repo: phase-memory
status: active
status: finished
owner: phase-memory
topic_slug: markitect
planning_priority: P1
@@ -58,11 +58,32 @@ See `docs/architecture.md` for the initial architecture sketch.
enterprise PDPs, or remote registries in the default test path.
- Do not perform durable memory writes without explicit review-gated plans.
## Implementation Update - 2026-05-18
The first implementation slice is complete.
Implemented outputs:
- `README.md` and `SCOPE.md` now describe the real repository boundary.
- `pyproject.toml` defines a dependency-light Python package.
- `src/phase_memory/` contains deterministic domain models, Markitect contract
ingress, profile execution planning, lifecycle planning, activation planning,
runtime ports, and local in-memory adapters.
- `tests/fixtures/` contains small Markitect-compatible profile and graph
fixtures derived from the adjacent memory work.
- `tests/` covers contract ingress, profile planning, lifecycle planning,
activation handoff, local adapters, and smoke import behavior.
- `docs/pmem-wp-0002-outline.md` sketches the next local runtime/service slice.
Validation:
- `python3 -m pytest` -> 13 passed.
## T01 - Establish repository foundation
```task
id: PMEM-WP-0001-T01
status: todo
status: done
priority: high
state_hub_task_id: "718df85f-3bed-46ca-8699-46d9819bb0ad"
```
@@ -82,7 +103,7 @@ with `INTENT.md`.
```task
id: PMEM-WP-0001-T02
status: todo
status: done
priority: high
state_hub_task_id: "47abf09f-d9cd-4d7e-adc0-7cc75d9f1e07"
```
@@ -103,7 +124,7 @@ Output: typed models, serialization helpers, and unit tests.
```task
id: PMEM-WP-0001-T03
status: todo
status: done
priority: high
state_hub_task_id: "9d47d569-113d-4126-a511-16ae3f31c8a0"
```
@@ -125,7 +146,7 @@ Output: adapter module and tests against copied/minimal fixtures.
```task
id: PMEM-WP-0001-T04
status: todo
status: done
priority: high
state_hub_task_id: "2c9e29c3-1e22-45a5-bc9e-a5a1af188eb3"
```
@@ -147,7 +168,7 @@ Output: `ProfileExecutionPlan` API, fixture outputs, and tests.
```task
id: PMEM-WP-0001-T05
status: todo
status: done
priority: medium
state_hub_task_id: "a7a0c110-644a-47cf-8935-0d5d73f8f5ed"
```
@@ -170,7 +191,7 @@ Output: protocol/adapter definitions and deterministic adapter tests.
```task
id: PMEM-WP-0001-T06
status: todo
status: done
priority: high
state_hub_task_id: "e7d7d59f-cb63-4786-8bbf-a178340b61fe"
```
@@ -189,7 +210,7 @@ Output: lifecycle planner APIs, diagnostics, fixtures, and tests.
```task
id: PMEM-WP-0001-T07
status: todo
status: done
priority: medium
state_hub_task_id: "881bc81b-e316-416d-af2a-fbe93224b617"
```
@@ -210,7 +231,7 @@ Output: activation planner and selection fixture tests.
```task
id: PMEM-WP-0001-T08
status: todo
status: done
priority: medium
state_hub_task_id: "e841d580-c77e-4aad-8bce-08c5aaf19258"
```
@@ -229,7 +250,7 @@ Output: fixture catalog, tests, and documentation of what each fixture proves.
```task
id: PMEM-WP-0001-T09
status: todo
status: done
priority: medium
state_hub_task_id: "9927eebd-18f6-40e6-8b6a-fd0056759ebf"
```
@@ -245,3 +266,28 @@ This includes:
Output: StateHub consistency pass or documented residual warnings, plus a
candidate `PMEM-WP-0002` outline.
## Closure Review - 2026-05-18
**Outcome:** All tasks completed.
### Completed
- PMEM-WP-0001-T01 - Establish repository foundation
- PMEM-WP-0001-T02 - Codify phase-memory domain model
- PMEM-WP-0001-T03 - Add Markitect contract ingress adapter
- PMEM-WP-0001-T04 - Implement profile execution planner
- PMEM-WP-0001-T05 - Define local runtime ports and adapters
- PMEM-WP-0001-T06 - Implement first lifecycle planners
- PMEM-WP-0001-T07 - Implement activation planning handoff
- PMEM-WP-0001-T08 - Add adjacent-repo fixture set
- PMEM-WP-0001-T09 - Sync StateHub and document the next slice
### Cancelled
None.
### Carried Forward
The next slice is outlined in `docs/pmem-wp-0002-outline.md`; no open task is
carried forward from this workplan.