feat(PMEM-WP-0017): federated store management CLI and activity reports

Add phase_memory.management for cross-store discovery and windowed
activity reporting. Extend the phase-memory CLI with stores list and
report commands, plus make report-7d for the default weekly operator view.
This commit is contained in:
2026-07-03 01:24:16 +02:00
parent 0320b112fd
commit feb60c1d96
16 changed files with 1150 additions and 4 deletions

View File

@@ -3,7 +3,7 @@ ROOT := $(dir $(abspath $(lastword $(MAKEFILE_LIST))))
.DEFAULT_GOAL := help .DEFAULT_GOAL := help
.PHONY: help install install-editable check test .PHONY: help install install-editable check test report-7d report
help: help:
@echo "phase-memory make targets" @echo "phase-memory make targets"
@@ -12,6 +12,8 @@ help:
@echo " make install-editable # editable install for local development" @echo " make install-editable # editable install for local development"
@echo " make check # verify import and print version" @echo " make check # verify import and print version"
@echo " make test # run pytest" @echo " make test # run pytest"
@echo " make report-7d # federated activity report (last 7 days)"
@echo " make report # federated report (REPORT_DAYS=7 default)"
@echo "" @echo ""
@echo "For ops-warden integration, prefer: cd ../ops-warden && make install-all" @echo "For ops-warden integration, prefer: cd ../ops-warden && make install-all"
@@ -24,4 +26,12 @@ check:
@PYTHONPATH="$(ROOT)src" python3 -c "import phase_memory; print('phase-memory', phase_memory.__version__, 'ok')" @PYTHONPATH="$(ROOT)src" python3 -c "import phase_memory; print('phase-memory', phase_memory.__version__, 'ok')"
test: test:
PYTHONPATH="$(ROOT)src" python3 -m pytest PYTHONPATH="$(ROOT)src" python3 -m pytest
REPORT_DAYS ?= 7
report-7d:
PYTHONPATH="$(ROOT)src" python3 -m phase_memory.cli report --days 7 --format summary
report:
PYTHONPATH="$(ROOT)src" python3 -m phase_memory.cli report --days $(REPORT_DAYS) --format summary

View File

@@ -54,8 +54,25 @@ PYTHONPATH=src python3 -m phase_memory.cli store import --store .phase-memory-lo
When installed, the package exposes the same commands as `phase-memory`. When installed, the package exposes the same commands as `phase-memory`.
Commands emit JSON runtime envelopes by default and accept `--format summary` Commands emit JSON runtime envelopes by default and accept `--format summary`
for a concise human-readable view. All current commands are dry-run planning for a concise human-readable view. Profile, graph, and store import/export
operations; they do not mutate durable memory stores. commands are dry-run planning operations; they do not mutate durable memory
stores by default.
## Management Reports
Inspect activity across every discovered phase-memory store (ops-warden
coordination store, registry entries, and `PHASE_MEMORY_STORE_PATHS`):
```bash
phase-memory stores list --format summary
phase-memory report --days 7 --format summary
phase-memory report --store ops-warden-default --days 30
make report-7d
```
Discovery honours `WARDEN_MEMORY_STORE`, `PHASE_MEMORY_REGISTRY`, and
`PHASE_MEMORY_STORE_PATHS`. Reports are metadata-only and never include secret
values.
## Local Runtime ## Local Runtime

View File

@@ -55,6 +55,20 @@ Routes:
- `POST /operations/{operation}` - `POST /operations/{operation}`
- `POST /operations` with `{"operation": "...", "payload": {...}}` - `POST /operations` with `{"operation": "...", "payload": {...}}`
## Cross-Store Activity
Use the phase-memory CLI as the canonical cross-store activity check:
```bash
phase-memory stores list --format summary
phase-memory report --days 7 --format summary
make report-7d
```
Federated reports aggregate ops-warden coordination episodes and local graph
store events/audit lines for the requested window. Drill into one store with
`phase-memory report --store <store_id-or-path> --days N`.
## Readiness Checks ## Readiness Checks
Before accepting traffic: Before accepting traffic:

View File

@@ -126,6 +126,15 @@ from .pilot import (
managed_deployment_pilot_report, managed_deployment_pilot_report,
write_live_pilot_evidence, write_live_pilot_evidence,
) )
from .management import (
FEDERATED_REPORT_SCHEMA,
STORE_LIST_SCHEMA,
STORE_REGISTRY_SCHEMA,
build_federated_report,
build_store_list,
classify_store,
discover_memory_stores,
)
from .ops_warden import ( from .ops_warden import (
KNOWN_AGENT_IDS, KNOWN_AGENT_IDS,
OPS_WARDEN_ACTIVATION_SCHEMA, OPS_WARDEN_ACTIVATION_SCHEMA,
@@ -179,6 +188,9 @@ __all__ = [
"FakeExternalPolicyGateway", "FakeExternalPolicyGateway",
"FakeExternalSemanticIndex", "FakeExternalSemanticIndex",
"FakeKontextualRuntimeRegistry", "FakeKontextualRuntimeRegistry",
"FEDERATED_REPORT_SCHEMA",
"STORE_LIST_SCHEMA",
"STORE_REGISTRY_SCHEMA",
"FakeMarkitectPackageCompiler", "FakeMarkitectPackageCompiler",
"FakeTelemetryAuditSink", "FakeTelemetryAuditSink",
"LiveShapedKontextualEventLog", "LiveShapedKontextualEventLog",
@@ -268,7 +280,9 @@ __all__ = [
"WordCountTokenEstimator", "WordCountTokenEstimator",
"activate_ops_warden_memory", "activate_ops_warden_memory",
"activation_quality_report", "activation_quality_report",
"build_federated_report",
"build_session_event", "build_session_event",
"build_store_list",
"default_memory_store_path", "default_memory_store_path",
"memory_enabled", "memory_enabled",
"ops_warden_adapter_pack", "ops_warden_adapter_pack",
@@ -298,7 +312,9 @@ __all__ = [
"WARDEN_ACCESS_NEED", "WARDEN_ACCESS_NEED",
"WARDEN_ROUTE_FIND_QUERY", "WARDEN_ROUTE_FIND_QUERY",
"build_service_binding", "build_service_binding",
"classify_store",
"create_wsgi_app", "create_wsgi_app",
"discover_memory_stores",
"health_report", "health_report",
"resolve_credentialed_environ", "resolve_credentialed_environ",
"resolve_runtime_adapters", "resolve_runtime_adapters",

View File

@@ -9,6 +9,7 @@ from pathlib import Path
from typing import Any, Sequence from typing import Any, Sequence
from .adapters import FileBackedMemoryGraphStore, JsonlAuditSink, JsonlMemoryEventLog from .adapters import FileBackedMemoryGraphStore, JsonlAuditSink, JsonlMemoryEventLog
from .management import build_federated_report, build_store_list
from .runtime import PhaseMemoryRuntime from .runtime import PhaseMemoryRuntime
@@ -80,6 +81,33 @@ def build_parser() -> argparse.ArgumentParser:
_add_format(store_repair) _add_format(store_repair)
store_repair.set_defaults(func=_store_repair) store_repair.set_defaults(func=_store_repair)
stores = subparsers.add_parser("stores", help="Federated store discovery")
stores_subparsers = stores.add_subparsers(dest="stores_command")
stores_list = stores_subparsers.add_parser(
"list",
help="List discovered phase-memory stores (defaults, registry, PHASE_MEMORY_STORE_PATHS)",
)
_add_format(stores_list)
stores_list.set_defaults(func=_stores_list)
report = subparsers.add_parser(
"report",
help="Activity report across discovered stores for the last N days",
epilog=(
"Discovery: ops-warden default store, PHASE_MEMORY_REGISTRY, "
"PHASE_MEMORY_STORE_PATHS (colon-separated). Set WARDEN_MEMORY_STORE "
"to override the ops-warden store path."
),
formatter_class=argparse.RawDescriptionHelpFormatter,
)
report.add_argument("--days", type=int, default=7, help="Look back N days (default: 7)")
report.add_argument(
"--store",
help="Focus on one store by registry store_id or absolute path",
)
_add_format(report)
report.set_defaults(func=_report)
return parser return parser
@@ -148,6 +176,14 @@ def _store_repair(args: argparse.Namespace, runtime: PhaseMemoryRuntime) -> dict
return _runtime_for_store(args.store).repair_diagnostics(source_ref=str(args.store)) return _runtime_for_store(args.store).repair_diagnostics(source_ref=str(args.store))
def _stores_list(args: argparse.Namespace, runtime: PhaseMemoryRuntime) -> dict[str, Any]:
return build_store_list()
def _report(args: argparse.Namespace, runtime: PhaseMemoryRuntime) -> dict[str, Any]:
return build_federated_report(days=args.days, focus_store_id=args.store, focus_store_path=args.store)
def _missing_command(args: argparse.Namespace, runtime: PhaseMemoryRuntime) -> dict[str, Any]: def _missing_command(args: argparse.Namespace, runtime: PhaseMemoryRuntime) -> dict[str, Any]:
raise SystemExit("Missing command. Use --help for usage.") raise SystemExit("Missing command. Use --help for usage.")
@@ -200,6 +236,36 @@ def _print_summary(envelope: dict[str, Any]) -> None:
if activation: if activation:
print(f"selected_nodes={len(activation.get('selected_node_ids', []))}") print(f"selected_nodes={len(activation.get('selected_node_ids', []))}")
print(f"omitted={len(activation.get('omitted', []))}") print(f"omitted={len(activation.get('omitted', []))}")
if envelope.get("schema_version") == "phase_memory.management.store_list.v1":
print(f"stores={envelope.get('store_count', 0)}")
for store in envelope.get("stores", ()):
label = store.get("label") or store.get("store_id")
print(f"- {label} [{store.get('store_kind')}] {store.get('path')}")
if envelope.get("schema_version") == "phase_memory.management.federated_report.v1":
aggregate = envelope.get("aggregate") or {}
print(
f"window={envelope.get('window_start')}..{envelope.get('window_end')} "
f"stores={envelope.get('store_count', 0)}"
)
print(
f"episodes={aggregate.get('episode_count', 0)} "
f"audit={aggregate.get('audit_event_count', 0)} "
f"active_stores={aggregate.get('active_store_count', 0)}"
)
for store in envelope.get("stores", ()):
print(
f"- {store.get('store_id')} [{store.get('store_kind')}] "
f"episodes={store.get('episode_count', 0)} "
f"audit={store.get('audit_event_count', 0)} "
f"last={store.get('last_activity_at') or ''}"
)
detail = envelope.get("store_detail")
if detail:
print(f"detail={detail.get('store_id')} schema={detail.get('schema_version') or ''}")
if detail.get("repair_diagnostics"):
print(f"repair_diagnostics={len(detail.get('repair_diagnostics', []))}")
if detail.get("episode_timeline"):
print(f"timeline={len(detail.get('episode_timeline', []))}")
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -0,0 +1,713 @@
"""Federated store discovery and activity reporting."""
from __future__ import annotations
import json
import os
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Mapping
from .adapters import LOCAL_STORE_SCHEMA, FileBackedMemoryGraphStore, JsonlAuditSink, JsonlMemoryEventLog
from .models import Diagnostic
from .ops_warden import OPS_WARDEN_PROFILE_ID, OPS_WARDEN_RUNTIME_SCHEMA, OpsWardenMemoryStore, default_memory_store_path
from .utils import parse_iso_datetime, stable_digest, utc_now_iso
STORE_REGISTRY_SCHEMA = "phase_memory.management.store_registry.v1"
STORE_LIST_SCHEMA = "phase_memory.management.store_list.v1"
FEDERATED_REPORT_SCHEMA = "phase_memory.management.federated_report.v1"
STORE_KIND_OPS_WARDEN = "ops_warden_coordination"
STORE_KIND_LOCAL_GRAPH = "local_graph"
DEFAULT_OPS_WARDEN_STORE_ID = "ops-warden-default"
TOP_ROUTE_IDS = 10
DETAIL_EPISODE_LIMIT = 20
@dataclass(frozen=True)
class MemoryStoreDescriptor:
store_id: str
path: Path
store_kind: str
profile_id: str = ""
label: str = ""
source: str = "discovered"
def default_registry_path(environ: Mapping[str, str] | None = None) -> Path:
environ = environ or os.environ
override = str(environ.get("PHASE_MEMORY_REGISTRY") or "").strip()
if override:
return Path(override).expanduser()
xdg_data = str(environ.get("XDG_DATA_HOME") or "").strip()
base = Path(xdg_data).expanduser() if xdg_data else Path.home() / ".local" / "share"
return base / "phase-memory" / "stores.json"
def classify_store(path: str | Path) -> str | None:
root = Path(path).expanduser().resolve()
local_metadata = root / "phase-memory.json"
if local_metadata.is_file():
try:
data = json.loads(local_metadata.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return None
if str(data.get("schema_version") or "") == LOCAL_STORE_SCHEMA:
return STORE_KIND_LOCAL_GRAPH
ops_metadata = root / "metadata.json"
if ops_metadata.is_file():
try:
data = json.loads(ops_metadata.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return None
if str(data.get("schema_version") or "") == OPS_WARDEN_RUNTIME_SCHEMA:
return STORE_KIND_OPS_WARDEN
return None
def load_store_registry(
path: str | Path | None = None,
environ: Mapping[str, str] | None = None,
) -> dict[str, Any]:
registry_path = Path(path).expanduser() if path is not None else default_registry_path(environ)
diagnostics: list[dict[str, Any]] = []
if not registry_path.is_file():
return {
"schema_version": STORE_REGISTRY_SCHEMA,
"valid": True,
"registry_path": str(registry_path),
"entries": [],
"diagnostics": diagnostics,
}
try:
payload = json.loads(registry_path.read_text(encoding="utf-8"))
except json.JSONDecodeError as exc:
return {
"schema_version": STORE_REGISTRY_SCHEMA,
"valid": False,
"registry_path": str(registry_path),
"entries": [],
"diagnostics": [
Diagnostic(
"warn",
"corrupt_store_registry",
"Store registry file is not valid JSON.",
str(registry_path),
{"error": str(exc)},
).to_dict()
],
}
if not isinstance(payload, dict):
diagnostics.append(
Diagnostic(
"warn",
"invalid_store_registry_shape",
"Store registry must be a JSON object.",
str(registry_path),
).to_dict()
)
return {
"schema_version": STORE_REGISTRY_SCHEMA,
"valid": False,
"registry_path": str(registry_path),
"entries": [],
"diagnostics": diagnostics,
}
schema = str(payload.get("schema_version") or "")
if schema and schema != STORE_REGISTRY_SCHEMA:
diagnostics.append(
Diagnostic(
"warn",
"unknown_store_registry_schema",
"Store registry declares an unknown schema version.",
str(registry_path),
{"schema_version": schema},
).to_dict()
)
entries: list[dict[str, Any]] = []
for item in payload.get("stores") or ():
if not isinstance(item, dict):
continue
store_path = str(item.get("path") or "").strip()
store_kind = str(item.get("store_kind") or "").strip()
store_id = str(item.get("store_id") or "").strip()
if not store_path or not store_kind or not store_id:
diagnostics.append(
Diagnostic(
"warn",
"incomplete_registry_entry",
"Registry entry must include store_id, path, and store_kind.",
store_id or store_path or "unknown",
).to_dict()
)
continue
entries.append(item)
return {
"schema_version": STORE_REGISTRY_SCHEMA,
"valid": not any(item.get("severity") == "error" for item in diagnostics),
"registry_path": str(registry_path),
"entries": entries,
"diagnostics": diagnostics,
}
def _store_id_for_path(path: Path, environ: Mapping[str, str] | None = None) -> str:
resolved = path.expanduser().resolve()
default_ops = default_memory_store_path(environ).expanduser().resolve()
if resolved == default_ops:
return DEFAULT_OPS_WARDEN_STORE_ID
return f"store:{stable_digest(str(resolved))}"
def _descriptor_from_path(
path: Path,
*,
store_kind: str,
source: str,
profile_id: str = "",
label: str = "",
store_id: str = "",
environ: Mapping[str, str] | None = None,
) -> MemoryStoreDescriptor:
resolved = path.expanduser().resolve()
return MemoryStoreDescriptor(
store_id=store_id or _store_id_for_path(resolved, environ),
path=resolved,
store_kind=store_kind,
profile_id=profile_id,
label=label,
source=source,
)
def discover_memory_stores(
environ: Mapping[str, str] | None = None,
*,
registry_path: str | Path | None = None,
extra_paths: tuple[str | Path, ...] = (),
) -> tuple[list[MemoryStoreDescriptor], list[dict[str, Any]]]:
environ = environ or os.environ
diagnostics: list[dict[str, Any]] = []
by_path: dict[str, MemoryStoreDescriptor] = {}
def add_descriptor(descriptor: MemoryStoreDescriptor) -> None:
key = str(descriptor.path)
existing = by_path.get(key)
if existing is None:
by_path[key] = descriptor
return
if descriptor.source == "registry" and existing.source != "registry":
by_path[key] = descriptor
default_ops = default_memory_store_path(environ)
kind = classify_store(default_ops)
if kind:
profile_id = OPS_WARDEN_PROFILE_ID if kind == STORE_KIND_OPS_WARDEN else ""
add_descriptor(
_descriptor_from_path(
default_ops,
store_kind=kind,
source="default",
profile_id=profile_id,
label="ops-warden coordination store",
store_id=DEFAULT_OPS_WARDEN_STORE_ID,
environ=environ,
)
)
registry = load_store_registry(registry_path, environ=environ)
diagnostics.extend(registry.get("diagnostics", ()))
registry_file = Path(str(registry.get("registry_path") or "")).expanduser()
for entry in registry.get("entries", ()):
entry_path = Path(str(entry.get("path") or "")).expanduser()
if not entry_path.is_absolute() and registry_file.is_file():
entry_path = (registry_file.parent / entry_path).resolve()
if not entry_path.is_dir():
diagnostics.append(
Diagnostic(
"warn",
"registry_store_missing",
"Registry entry path does not exist.",
str(entry.get("store_id") or entry_path),
{"path": str(entry_path)},
).to_dict()
)
continue
declared_kind = str(entry.get("store_kind") or "")
actual_kind = classify_store(entry_path)
if actual_kind and actual_kind != declared_kind:
diagnostics.append(
Diagnostic(
"warn",
"registry_store_kind_mismatch",
"Registry store_kind does not match on-disk classification.",
str(entry.get("store_id") or entry_path),
{"declared": declared_kind, "actual": actual_kind},
).to_dict()
)
store_kind = actual_kind or declared_kind
if not store_kind:
continue
add_descriptor(
_descriptor_from_path(
entry_path,
store_kind=store_kind,
source="registry",
profile_id=str(entry.get("profile_id") or ""),
label=str(entry.get("label") or ""),
store_id=str(entry.get("store_id") or ""),
environ=environ,
)
)
env_paths = str(environ.get("PHASE_MEMORY_STORE_PATHS") or "").strip()
path_candidates = [item.strip() for item in env_paths.split(":") if item.strip()]
path_candidates.extend(str(item) for item in extra_paths)
for candidate in path_candidates:
candidate_path = Path(candidate).expanduser()
if not candidate_path.is_dir():
continue
store_kind = classify_store(candidate_path)
if not store_kind:
continue
add_descriptor(
_descriptor_from_path(
candidate_path,
store_kind=store_kind,
source="env",
environ=environ,
)
)
stores = sorted(by_path.values(), key=lambda item: (item.store_kind, item.store_id))
return stores, diagnostics
def resolve_store_reference(
reference: str,
stores: list[MemoryStoreDescriptor],
environ: Mapping[str, str] | None = None,
) -> MemoryStoreDescriptor | None:
ref = str(reference or "").strip()
if not ref:
return None
for store in stores:
if ref == store.store_id or ref == str(store.path):
return store
candidate = Path(ref).expanduser()
if candidate.is_dir():
store_kind = classify_store(candidate)
if store_kind:
return _descriptor_from_path(candidate, store_kind=store_kind, source="explicit", environ=environ)
for store in stores:
if candidate.resolve() == store.path:
return store
return None
def compute_report_window(
*,
days: int,
window_end: datetime | None = None,
) -> tuple[datetime, datetime]:
if days < 1:
raise ValueError("days must be at least 1")
end = window_end or datetime.now(timezone.utc)
if end.tzinfo is None:
end = end.replace(tzinfo=timezone.utc)
else:
end = end.astimezone(timezone.utc)
start = end - timedelta(days=days)
return start, end
def _event_in_window(timestamp: str | None, *, window_start: datetime, window_end: datetime) -> bool | None:
parsed = parse_iso_datetime(timestamp)
if parsed is None:
return None
return window_start <= parsed <= window_end
def _read_jsonl(path: Path) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
if not path.is_file():
return [], []
events: list[dict[str, Any]] = []
diagnostics: list[dict[str, Any]] = []
for line_number, raw in enumerate(path.read_text(encoding="utf-8").splitlines(), start=1):
if not raw.strip():
continue
try:
data = json.loads(raw)
except json.JSONDecodeError as exc:
diagnostics.append(
Diagnostic(
"warn",
"malformed_jsonl_line",
"JSONL line is not valid JSON.",
f"{path}:line:{line_number}",
{"error": str(exc)},
).to_dict()
)
continue
if isinstance(data, dict):
events.append(data)
return events, diagnostics
def _increment(counter: dict[str, int], key: str) -> None:
normalized = key or "unknown"
counter[normalized] = counter.get(normalized, 0) + 1
def _top_counter(counter: dict[str, int], *, limit: int) -> dict[str, int]:
return dict(sorted(counter.items(), key=lambda item: (-item[1], item[0]))[:limit])
def aggregate_ops_warden_activity(
store: MemoryStoreDescriptor,
*,
window_start: datetime,
window_end: datetime,
) -> dict[str, Any]:
events_path = store.path / "events.jsonl"
raw_events, line_diagnostics = _read_jsonl(events_path)
diagnostics = list(line_diagnostics)
by_session_kind: dict[str, int] = {}
by_command: dict[str, int] = {}
by_outcome: dict[str, int] = {}
by_route_id: dict[str, int] = {}
episode_timeline: list[dict[str, Any]] = []
first_activity_at = ""
last_activity_at = ""
in_window = 0
missing_timestamp = 0
for event in raw_events:
timestamp = str(event.get("recorded_at") or "")
in_range = _event_in_window(timestamp, window_start=window_start, window_end=window_end)
if in_range is None:
missing_timestamp += 1
diagnostics.append(
Diagnostic(
"warn",
"episode_missing_timestamp",
"Episode lacks a parseable recorded_at timestamp.",
str(event.get("event_id") or "unknown"),
).to_dict()
)
continue
if not in_range:
continue
in_window += 1
_increment(by_session_kind, str(event.get("session_kind") or ""))
_increment(by_command, str(event.get("command") or ""))
_increment(by_outcome, str(event.get("outcome") or ""))
_increment(by_route_id, str(event.get("route_id") or ""))
if not first_activity_at or timestamp < first_activity_at:
first_activity_at = timestamp
if not last_activity_at or timestamp > last_activity_at:
last_activity_at = timestamp
episode_timeline.append(
{
"event_id": str(event.get("event_id") or ""),
"recorded_at": timestamp,
"session_kind": str(event.get("session_kind") or ""),
"command": str(event.get("command") or ""),
"route_id": str(event.get("route_id") or ""),
"outcome": str(event.get("outcome") or ""),
}
)
episode_timeline.sort(key=lambda item: item.get("recorded_at", ""), reverse=True)
metadata_path = store.path / "metadata.json"
schema_version = ""
if metadata_path.is_file():
try:
schema_version = str(json.loads(metadata_path.read_text(encoding="utf-8")).get("schema_version") or "")
except json.JSONDecodeError:
pass
return {
"store_id": store.store_id,
"store_kind": store.store_kind,
"path": str(store.path),
"profile_id": store.profile_id or OPS_WARDEN_PROFILE_ID,
"schema_version": schema_version,
"episode_count": in_window,
"audit_event_count": 0,
"missing_timestamp_count": missing_timestamp,
"first_activity_at": first_activity_at,
"last_activity_at": last_activity_at,
"by_session_kind": dict(sorted(by_session_kind.items())),
"by_command": dict(sorted(by_command.items())),
"by_outcome": dict(sorted(by_outcome.items())),
"by_route_id": _top_counter(by_route_id, limit=TOP_ROUTE_IDS),
"by_operation": {},
"episode_timeline": episode_timeline[:DETAIL_EPISODE_LIMIT],
"repair_diagnostics": [],
"diagnostics": diagnostics,
}
def aggregate_local_graph_activity(
store: MemoryStoreDescriptor,
*,
window_start: datetime,
window_end: datetime,
) -> dict[str, Any]:
events_path = store.path / "events.jsonl"
audit_path = store.path / "audit.jsonl"
raw_events, event_line_diagnostics = _read_jsonl(events_path)
raw_audit, audit_line_diagnostics = _read_jsonl(audit_path)
diagnostics = list(event_line_diagnostics)
diagnostics.extend(audit_line_diagnostics)
by_session_kind: dict[str, int] = {}
by_command: dict[str, int] = {}
by_outcome: dict[str, int] = {}
by_operation: dict[str, int] = {}
episode_timeline: list[dict[str, Any]] = []
first_activity_at = ""
last_activity_at = ""
episode_count = 0
audit_count = 0
missing_timestamp = 0
for event in raw_events:
timestamp = str(event.get("timestamp") or event.get("recorded_at") or "")
in_range = _event_in_window(timestamp, window_start=window_start, window_end=window_end)
if in_range is None:
missing_timestamp += 1
diagnostics.append(
Diagnostic(
"warn",
"episode_missing_timestamp",
"Event lacks a parseable timestamp.",
str(event.get("event_id") or event.get("id") or "unknown"),
).to_dict()
)
continue
if not in_range:
continue
episode_count += 1
kind = str(event.get("kind") or event.get("session_kind") or "")
_increment(by_session_kind, kind)
_increment(by_command, str(event.get("command") or kind))
_increment(by_outcome, str(event.get("outcome") or event.get("state") or ""))
if not first_activity_at or timestamp < first_activity_at:
first_activity_at = timestamp
if not last_activity_at or timestamp > last_activity_at:
last_activity_at = timestamp
episode_timeline.append(
{
"event_id": str(event.get("event_id") or event.get("id") or ""),
"recorded_at": timestamp,
"session_kind": kind,
"command": str(event.get("command") or kind),
"route_id": "",
"outcome": str(event.get("outcome") or event.get("state") or ""),
}
)
for event in raw_audit:
timestamp = str(event.get("timestamp") or event.get("recorded_at") or "")
in_range = _event_in_window(timestamp, window_start=window_start, window_end=window_end)
if in_range is None:
missing_timestamp += 1
diagnostics.append(
Diagnostic(
"warn",
"audit_missing_timestamp",
"Audit event lacks a parseable timestamp.",
str(event.get("operation_id") or event.get("operation") or "unknown"),
).to_dict()
)
continue
if not in_range:
continue
audit_count += 1
_increment(by_operation, str(event.get("operation") or ""))
if not first_activity_at or timestamp < first_activity_at:
first_activity_at = timestamp
if not last_activity_at or timestamp > last_activity_at:
last_activity_at = timestamp
episode_timeline.sort(key=lambda item: item.get("recorded_at", ""), reverse=True)
graph_store = FileBackedMemoryGraphStore(store.path)
repair = graph_store.repair_diagnostics()
repair_diagnostics = [
item.to_dict()
for item in repair
if item.severity in {"error", "warn"}
]
schema_version = ""
try:
schema_version = str(graph_store.metadata().get("schema_version") or "")
except (json.JSONDecodeError, OSError):
pass
return {
"store_id": store.store_id,
"store_kind": store.store_kind,
"path": str(store.path),
"profile_id": store.profile_id,
"schema_version": schema_version,
"episode_count": episode_count,
"audit_event_count": audit_count,
"missing_timestamp_count": missing_timestamp,
"first_activity_at": first_activity_at,
"last_activity_at": last_activity_at,
"by_session_kind": dict(sorted(by_session_kind.items())),
"by_command": dict(sorted(by_command.items())),
"by_outcome": dict(sorted(by_outcome.items())),
"by_route_id": {},
"by_operation": dict(sorted(by_operation.items())),
"episode_timeline": episode_timeline[:DETAIL_EPISODE_LIMIT],
"repair_diagnostics": repair_diagnostics,
"diagnostics": diagnostics,
}
def aggregate_store_activity(
store: MemoryStoreDescriptor,
*,
window_start: datetime,
window_end: datetime,
) -> dict[str, Any]:
if store.store_kind == STORE_KIND_OPS_WARDEN:
return aggregate_ops_warden_activity(store, window_start=window_start, window_end=window_end)
return aggregate_local_graph_activity(store, window_start=window_start, window_end=window_end)
def build_store_list(
environ: Mapping[str, str] | None = None,
*,
registry_path: str | Path | None = None,
extra_paths: tuple[str | Path, ...] = (),
) -> dict[str, Any]:
stores, diagnostics = discover_memory_stores(environ, registry_path=registry_path, extra_paths=extra_paths)
return {
"schema_version": STORE_LIST_SCHEMA,
"valid": not any(item.get("severity") == "error" for item in diagnostics),
"store_count": len(stores),
"stores": [
{
"store_id": store.store_id,
"path": str(store.path),
"store_kind": store.store_kind,
"profile_id": store.profile_id,
"label": store.label,
"source": store.source,
}
for store in stores
],
"diagnostics": diagnostics,
}
def build_federated_report(
*,
days: int = 7,
focus_store_id: str | None = None,
focus_store_path: str | Path | None = None,
environ: Mapping[str, str] | None = None,
registry_path: str | Path | None = None,
extra_paths: tuple[str | Path, ...] = (),
window_end: datetime | None = None,
) -> dict[str, Any]:
environ = environ or os.environ
stores, discovery_diagnostics = discover_memory_stores(
environ,
registry_path=registry_path,
extra_paths=extra_paths,
)
diagnostics = list(discovery_diagnostics)
focus_reference = str(focus_store_id or focus_store_path or "").strip()
focus_store: MemoryStoreDescriptor | None = None
if focus_reference:
focus_store = resolve_store_reference(focus_reference, stores, environ=environ)
if focus_store is None:
diagnostics.append(
Diagnostic(
"error",
"unknown_store_reference",
"Requested store was not discovered and could not be classified.",
focus_reference,
).to_dict()
)
return {
"schema_version": FEDERATED_REPORT_SCHEMA,
"valid": False,
"window_days": days,
"window_start": "",
"window_end": "",
"focus_store_id": focus_reference,
"store_count": 0,
"aggregate": {},
"stores": [],
"store_detail": None,
"diagnostics": diagnostics,
}
stores = [focus_store]
window_start, window_end_dt = compute_report_window(days=days, window_end=window_end)
summaries: list[dict[str, Any]] = []
aggregate = {
"episode_count": 0,
"audit_event_count": 0,
"active_store_count": 0,
"by_outcome": {},
"by_session_kind": {},
"by_operation": {},
}
by_outcome: dict[str, int] = {}
by_session_kind: dict[str, int] = {}
by_operation: dict[str, int] = {}
store_detail: dict[str, Any] | None = None
for store in stores:
summary = aggregate_store_activity(store, window_start=window_start, window_end=window_end_dt)
diagnostics.extend(summary.get("diagnostics", ()))
public_summary = {
key: value
for key, value in summary.items()
if key not in {"episode_timeline", "repair_diagnostics", "diagnostics"}
}
summaries.append(public_summary)
aggregate["episode_count"] += int(summary.get("episode_count") or 0)
aggregate["audit_event_count"] += int(summary.get("audit_event_count") or 0)
if int(summary.get("episode_count") or 0) or int(summary.get("audit_event_count") or 0):
aggregate["active_store_count"] += 1
for key, value in (summary.get("by_outcome") or {}).items():
by_outcome[key] = by_outcome.get(key, 0) + int(value)
for key, value in (summary.get("by_session_kind") or {}).items():
by_session_kind[key] = by_session_kind.get(key, 0) + int(value)
for key, value in (summary.get("by_operation") or {}).items():
by_operation[key] = by_operation.get(key, 0) + int(value)
if focus_store is not None and store.store_id == focus_store.store_id:
store_detail = summary
aggregate["by_outcome"] = dict(sorted(by_outcome.items()))
aggregate["by_session_kind"] = dict(sorted(by_session_kind.items()))
aggregate["by_operation"] = dict(sorted(by_operation.items()))
return {
"schema_version": FEDERATED_REPORT_SCHEMA,
"valid": not any(item.get("severity") == "error" for item in diagnostics),
"window_days": days,
"window_start": window_start.replace(microsecond=0).isoformat(),
"window_end": window_end_dt.replace(microsecond=0).isoformat(),
"focus_store_id": focus_store.store_id if focus_store else "",
"store_count": len(summaries),
"aggregate": aggregate,
"stores": summaries,
"store_detail": store_detail,
"diagnostics": diagnostics,
"generated_at": utc_now_iso(),
}

View File

@@ -0,0 +1,33 @@
{
"aggregate": {
"active_store_count": 2,
"audit_event_count": 1,
"by_operation": {
"audit.query": 1
},
"by_outcome": {
"resolved": 1,
"skipped": 1,
"unknown": 1
},
"by_session_kind": {
"lifecycle.apply": 1,
"warden.agent.codex": 1,
"warden.operator": 1
},
"episode_count": 3
},
"store_count": 2,
"stores": [
{
"audit_event_count": 1,
"episode_count": 1,
"store_kind": "local_graph"
},
{
"audit_event_count": 0,
"episode_count": 2,
"store_kind": "ops_warden_coordination"
}
]
}

View File

@@ -0,0 +1,2 @@
{"operation":"package.compile","operation_id":"op:audit-old","timestamp":"2026-06-02T08:00:00+00:00"}
{"operation":"audit.query","operation_id":"op:audit-in-window","timestamp":"2026-07-02T10:00:00+00:00"}

View File

@@ -0,0 +1,2 @@
{"id":"event:old","kind":"path.created","schema_version":"markitect.memory.event.v1","timestamp":"2026-06-01T08:00:00+00:00"}
{"id":"event:in-window","kind":"lifecycle.apply","schema_version":"markitect.memory.event.v1","timestamp":"2026-07-02T09:00:00+00:00"}

View File

@@ -0,0 +1,3 @@
{
"schema_version": "phase_memory.local_store.v1"
}

View File

@@ -0,0 +1,4 @@
{"agent_id":"grok","command":"route find","diagnostic_codes":[],"event_id":"ops-warden-event:old","need_fingerprint":"abc123","outcome":"resolved","recorded_at":"2026-06-01T10:00:00+00:00","route_id":"openrouter-llm-connect","schema_version":"phase_memory.ops_warden.session_event.v1","session_kind":"warden.agent.grok"}
{"agent_id":"","command":"route find","diagnostic_codes":[],"event_id":"ops-warden-event:in-window-1","need_fingerprint":"def456","outcome":"resolved","recorded_at":"2026-07-01T10:00:00+00:00","route_id":"issue-core-ingestion-api-key","schema_version":"phase_memory.ops_warden.session_event.v1","session_kind":"warden.operator"}
{"agent_id":"codex","command":"access","diagnostic_codes":[],"event_id":"ops-warden-event:in-window-2","need_fingerprint":"ghi789","outcome":"skipped","recorded_at":"2026-07-02T11:00:00+00:00","route_id":"openrouter-llm-connect","schema_version":"phase_memory.ops_warden.session_event.v1","session_kind":"warden.agent.codex"}
{"agent_id":"","command":"memory.activate","diagnostic_codes":[],"event_id":"ops-warden-event:no-ts","need_fingerprint":"","outcome":"resolved","route_id":"","schema_version":"phase_memory.ops_warden.session_event.v1","session_kind":"warden.operator"}

View File

@@ -0,0 +1,5 @@
{
"created_at": "2026-07-01T10:00:00+00:00",
"profile_id": "ops-warden-coordination",
"schema_version": "phase_memory.ops_warden.runtime.v1"
}

13
tests/fixtures/management/registry.json vendored Normal file
View File

@@ -0,0 +1,13 @@
{
"schema_version": "phase_memory.management.store_registry.v1",
"stores": [
{
"store_id": "fixture-local-graph",
"path": "local-store",
"store_kind": "local_graph",
"profile_id": "local-dev",
"label": "Fixture local graph store",
"registered_at": "2026-07-01T10:00:00+00:00"
}
]
}

View File

@@ -17,6 +17,7 @@
"EVALUATION_TREND_REGRESSION_GATE_SCHEMA", "EVALUATION_TREND_REGRESSION_GATE_SCHEMA",
"EVALUATION_TREND_SCHEMA", "EVALUATION_TREND_SCHEMA",
"ExternalAdapterPack", "ExternalAdapterPack",
"FEDERATED_REPORT_SCHEMA",
"FakeExternalEventLog", "FakeExternalEventLog",
"FakeExternalGraphStore", "FakeExternalGraphStore",
"FakeExternalPolicyGateway", "FakeExternalPolicyGateway",
@@ -76,6 +77,8 @@
"RuntimeConfig", "RuntimeConfig",
"SERVICE_APP_SCHEMA", "SERVICE_APP_SCHEMA",
"SERVICE_BINDING_SCHEMA", "SERVICE_BINDING_SCHEMA",
"STORE_LIST_SCHEMA",
"STORE_REGISTRY_SCHEMA",
"ServiceAppConfig", "ServiceAppConfig",
"ServiceBinding", "ServiceBinding",
"ServiceResponse", "ServiceResponse",
@@ -89,8 +92,11 @@
"activation_quality_report", "activation_quality_report",
"adapter_pack_manifest", "adapter_pack_manifest",
"branch_path", "branch_path",
"build_federated_report",
"build_service_binding", "build_service_binding",
"build_session_event", "build_session_event",
"build_store_list",
"classify_store",
"compact_path", "compact_path",
"create_path", "create_path",
"create_wsgi_app", "create_wsgi_app",
@@ -99,6 +105,7 @@
"credentialed_operator_report", "credentialed_operator_report",
"credentialed_telemetry_retention_drill", "credentialed_telemetry_retention_drill",
"default_memory_store_path", "default_memory_store_path",
"discover_memory_stores",
"evaluation_threshold_report", "evaluation_threshold_report",
"evaluation_trend_artifact", "evaluation_trend_artifact",
"evaluation_trend_history", "evaluation_trend_history",

View File

@@ -0,0 +1,139 @@
"""Tests for federated store management and reporting."""
from __future__ import annotations
import json
from datetime import datetime, timezone
from pathlib import Path
import pytest
from phase_memory.cli import main
from phase_memory.management import (
FEDERATED_REPORT_SCHEMA,
STORE_KIND_LOCAL_GRAPH,
STORE_KIND_OPS_WARDEN,
build_federated_report,
build_store_list,
classify_store,
discover_memory_stores,
resolve_store_reference,
)
FIXTURES = Path(__file__).resolve().parent / "fixtures" / "management"
WINDOW_END = datetime(2026, 7, 3, 12, 0, tzinfo=timezone.utc)
def _fixture_environ(tmp_path: Path) -> dict[str, str]:
registry = FIXTURES / "registry.json"
ops_store = FIXTURES / "ops-warden-store"
local_store = FIXTURES / "local-store"
return {
"WARDEN_MEMORY_STORE": str(ops_store),
"PHASE_MEMORY_REGISTRY": str(registry),
"PHASE_MEMORY_STORE_PATHS": str(local_store),
"HOME": str(tmp_path),
"XDG_DATA_HOME": str(tmp_path / "xdg"),
}
def test_classify_store_kinds() -> None:
assert classify_store(FIXTURES / "ops-warden-store") == STORE_KIND_OPS_WARDEN
assert classify_store(FIXTURES / "local-store") == STORE_KIND_LOCAL_GRAPH
assert classify_store(FIXTURES / "registry.json") is None
def test_discover_memory_stores_deduplicates_and_merges_registry(tmp_path: Path) -> None:
environ = _fixture_environ(tmp_path)
stores, diagnostics = discover_memory_stores(environ)
assert not any(item.get("severity") == "error" for item in diagnostics)
store_ids = {store.store_id for store in stores}
assert "fixture-local-graph" in store_ids
assert len({str(store.path) for store in stores}) == len(stores)
def test_build_federated_report_aggregate_and_detail(tmp_path: Path) -> None:
environ = _fixture_environ(tmp_path)
report = build_federated_report(days=7, environ=environ, window_end=WINDOW_END)
assert report["schema_version"] == FEDERATED_REPORT_SCHEMA
assert report["valid"] is True
assert report["store_count"] == 2
aggregate = report["aggregate"]
assert aggregate["episode_count"] == 3
assert aggregate["audit_event_count"] == 1
assert aggregate["active_store_count"] == 2
assert aggregate["by_outcome"]["resolved"] == 1
assert aggregate["by_outcome"]["skipped"] == 1
assert aggregate["by_session_kind"]["warden.operator"] == 1
assert aggregate["by_operation"]["audit.query"] == 1
detail = build_federated_report(
days=7,
focus_store_id="fixture-local-graph",
environ=environ,
window_end=WINDOW_END,
)
assert detail["focus_store_id"] == "fixture-local-graph"
assert detail["store_count"] == 1
assert detail["store_detail"]["episode_count"] == 1
assert detail["store_detail"]["audit_event_count"] == 1
def test_resolve_store_reference_by_id_and_path(tmp_path: Path) -> None:
environ = _fixture_environ(tmp_path)
stores, _ = discover_memory_stores(environ)
by_id = resolve_store_reference("fixture-local-graph", stores, environ=environ)
assert by_id is not None
assert by_id.store_kind == STORE_KIND_LOCAL_GRAPH
by_path = resolve_store_reference(str(FIXTURES / "ops-warden-store"), stores, environ=environ)
assert by_path is not None
assert by_path.store_kind == STORE_KIND_OPS_WARDEN
def test_federated_report_aggregate_fixture(tmp_path: Path) -> None:
environ = _fixture_environ(tmp_path)
report = build_federated_report(days=7, environ=environ, window_end=WINDOW_END)
payload = {
"aggregate": report["aggregate"],
"store_count": report["store_count"],
"stores": [
{key: store[key] for key in ("store_id", "store_kind", "episode_count", "audit_event_count")}
for store in report["stores"]
],
}
expected = json.loads((FIXTURES / "federated-report-aggregate.json").read_text(encoding="utf-8"))
assert payload["aggregate"] == expected["aggregate"]
assert payload["store_count"] == expected["store_count"]
actual_stores = sorted(payload["stores"], key=lambda item: item["store_kind"])
expected_stores = sorted(expected["stores"], key=lambda item: item["store_kind"])
for actual, item in zip(actual_stores, expected_stores, strict=True):
assert actual["store_kind"] == item["store_kind"]
assert actual["episode_count"] == item["episode_count"]
assert actual["audit_event_count"] == item["audit_event_count"]
def test_cli_stores_list_and_report(tmp_path: Path, capsys, monkeypatch: pytest.MonkeyPatch) -> None:
environ = _fixture_environ(tmp_path)
monkeypatch.setenv("WARDEN_MEMORY_STORE", environ["WARDEN_MEMORY_STORE"])
monkeypatch.setenv("PHASE_MEMORY_REGISTRY", environ["PHASE_MEMORY_REGISTRY"])
monkeypatch.setenv("PHASE_MEMORY_STORE_PATHS", environ["PHASE_MEMORY_STORE_PATHS"])
assert main(["stores", "list", "--format", "summary"]) == 0
listed = capsys.readouterr().out
assert "Fixture local graph store" in listed
assert main(["report", "--days", "7", "--format", "summary"]) == 0
reported = capsys.readouterr().out
assert "episodes=" in reported
assert "active_stores=" in reported
def test_build_store_list_from_registry_only(tmp_path: Path) -> None:
environ = {
"WARDEN_MEMORY_STORE": str(tmp_path / "missing-ops"),
"PHASE_MEMORY_REGISTRY": str(FIXTURES / "registry.json"),
"PHASE_MEMORY_STORE_PATHS": str(FIXTURES / "local-store"),
}
listing = build_store_list(environ=environ)
assert listing["valid"] is True
assert listing["store_count"] == 1
assert listing["stores"][0]["store_id"] == "fixture-local-graph"

View File

@@ -0,0 +1,102 @@
---
id: PMEM-WP-0017
type: workplan
title: "Federated Store Management CLI And Activity Reports"
domain: communication
repo: phase-memory
status: finished
owner: codex
topic_slug: phase-memory
created: "2026-07-03"
updated: "2026-07-03"
---
# PMEM-WP-0017: Federated Store Management CLI And Activity Reports
## Goal
Extend the `phase-memory` CLI into a **management interface** for every on-disk
memory store the runtime has produced — not only the ops-warden coordination
store. Operators and agents need one command to answer:
- How much memory activity happened in the last *N* days **across all stores**?
- Which store contributed what (episodes, audit events, session kinds, outcomes)?
- What does a **single store** look like in detail for the same window?
LLM interactivity stays out of scope here. When memory is "brought to life"
through ops-warden (worker, route/access, coding-agent sessions), chat already
happens in that surface. This workplan adds **inspectable reporting** only.
## Implementation Update - 2026-07-03
Shipped federated store discovery, windowed activity aggregation, and CLI
reporting:
- `src/phase_memory/management.py` — discovery, classification, aggregation,
`build_federated_report`, `build_store_list`
- `phase-memory stores list` and `phase-memory report [--days N] [--store …]`
- `make report-7d` and `make report REPORT_DAYS=N`
- Tests in `tests/test_management_cli.py` with `tests/fixtures/management/`
Validation: `make test` → 117 passed, 1 skipped.
## T01 - Store discovery and classification
```task
id: PMEM-WP-0017-T01
status: done
priority: high
```
Implement `phase_memory.management` with `discover_memory_stores`,
`classify_store`, and registry read path.
## T02 - Windowed activity aggregation
```task
id: PMEM-WP-0017-T02
status: done
priority: high
```
Aggregate ops-warden episodes and local graph events/audit for a time window.
## T03 - Federated report envelope
```task
id: PMEM-WP-0017-T03
status: done
priority: high
```
`build_federated_report` returns `phase_memory.management.federated_report.v1`.
## T04 - CLI commands
```task
id: PMEM-WP-0017-T04
status: done
priority: high
```
`stores list` and `report` wired in `cli.py`.
## T05 - Makefile, README, and operator runbook
```task
id: PMEM-WP-0017-T05
status: done
priority: medium
```
`make report-7d`, README management section, runbook cross-store activity.
## T06 - Tests and evaluation hook
```task
id: PMEM-WP-0017-T06
status: done
priority: medium
```
`tests/test_management_cli.py` and management fixtures; public API snapshot updated.