diff --git a/Makefile b/Makefile index a12e980..34b17b2 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ ROOT := $(dir $(abspath $(lastword $(MAKEFILE_LIST)))) .DEFAULT_GOAL := help -.PHONY: help install install-editable check test +.PHONY: help install install-editable check test report-7d report help: @echo "phase-memory make targets" @@ -12,6 +12,8 @@ help: @echo " make install-editable # editable install for local development" @echo " make check # verify import and print version" @echo " make test # run pytest" + @echo " make report-7d # federated activity report (last 7 days)" + @echo " make report # federated report (REPORT_DAYS=7 default)" @echo "" @echo "For ops-warden integration, prefer: cd ../ops-warden && make install-all" @@ -24,4 +26,12 @@ check: @PYTHONPATH="$(ROOT)src" python3 -c "import phase_memory; print('phase-memory', phase_memory.__version__, 'ok')" test: - PYTHONPATH="$(ROOT)src" python3 -m pytest \ No newline at end of file + PYTHONPATH="$(ROOT)src" python3 -m pytest + +REPORT_DAYS ?= 7 + +report-7d: + PYTHONPATH="$(ROOT)src" python3 -m phase_memory.cli report --days 7 --format summary + +report: + PYTHONPATH="$(ROOT)src" python3 -m phase_memory.cli report --days $(REPORT_DAYS) --format summary \ No newline at end of file diff --git a/README.md b/README.md index 22b8047..4769c9c 100644 --- a/README.md +++ b/README.md @@ -54,8 +54,25 @@ PYTHONPATH=src python3 -m phase_memory.cli store import --store .phase-memory-lo When installed, the package exposes the same commands as `phase-memory`. Commands emit JSON runtime envelopes by default and accept `--format summary` -for a concise human-readable view. All current commands are dry-run planning -operations; they do not mutate durable memory stores. +for a concise human-readable view. Profile, graph, and store import/export +commands are dry-run planning operations; they do not mutate durable memory +stores by default. + +## Management Reports + +Inspect activity across every discovered phase-memory store (ops-warden +coordination store, registry entries, and `PHASE_MEMORY_STORE_PATHS`): + +```bash +phase-memory stores list --format summary +phase-memory report --days 7 --format summary +phase-memory report --store ops-warden-default --days 30 +make report-7d +``` + +Discovery honours `WARDEN_MEMORY_STORE`, `PHASE_MEMORY_REGISTRY`, and +`PHASE_MEMORY_STORE_PATHS`. Reports are metadata-only and never include secret +values. ## Local Runtime diff --git a/docs/operator-readiness-runbook.md b/docs/operator-readiness-runbook.md index 9581563..05f8432 100644 --- a/docs/operator-readiness-runbook.md +++ b/docs/operator-readiness-runbook.md @@ -55,6 +55,20 @@ Routes: - `POST /operations/{operation}` - `POST /operations` with `{"operation": "...", "payload": {...}}` +## Cross-Store Activity + +Use the phase-memory CLI as the canonical cross-store activity check: + +```bash +phase-memory stores list --format summary +phase-memory report --days 7 --format summary +make report-7d +``` + +Federated reports aggregate ops-warden coordination episodes and local graph +store events/audit lines for the requested window. Drill into one store with +`phase-memory report --store --days N`. + ## Readiness Checks Before accepting traffic: diff --git a/src/phase_memory/__init__.py b/src/phase_memory/__init__.py index 29b3b86..50b2ccd 100644 --- a/src/phase_memory/__init__.py +++ b/src/phase_memory/__init__.py @@ -126,6 +126,15 @@ from .pilot import ( managed_deployment_pilot_report, write_live_pilot_evidence, ) +from .management import ( + FEDERATED_REPORT_SCHEMA, + STORE_LIST_SCHEMA, + STORE_REGISTRY_SCHEMA, + build_federated_report, + build_store_list, + classify_store, + discover_memory_stores, +) from .ops_warden import ( KNOWN_AGENT_IDS, OPS_WARDEN_ACTIVATION_SCHEMA, @@ -179,6 +188,9 @@ __all__ = [ "FakeExternalPolicyGateway", "FakeExternalSemanticIndex", "FakeKontextualRuntimeRegistry", + "FEDERATED_REPORT_SCHEMA", + "STORE_LIST_SCHEMA", + "STORE_REGISTRY_SCHEMA", "FakeMarkitectPackageCompiler", "FakeTelemetryAuditSink", "LiveShapedKontextualEventLog", @@ -268,7 +280,9 @@ __all__ = [ "WordCountTokenEstimator", "activate_ops_warden_memory", "activation_quality_report", + "build_federated_report", "build_session_event", + "build_store_list", "default_memory_store_path", "memory_enabled", "ops_warden_adapter_pack", @@ -298,7 +312,9 @@ __all__ = [ "WARDEN_ACCESS_NEED", "WARDEN_ROUTE_FIND_QUERY", "build_service_binding", + "classify_store", "create_wsgi_app", + "discover_memory_stores", "health_report", "resolve_credentialed_environ", "resolve_runtime_adapters", diff --git a/src/phase_memory/cli.py b/src/phase_memory/cli.py index 6420729..e720d1e 100644 --- a/src/phase_memory/cli.py +++ b/src/phase_memory/cli.py @@ -9,6 +9,7 @@ from pathlib import Path from typing import Any, Sequence from .adapters import FileBackedMemoryGraphStore, JsonlAuditSink, JsonlMemoryEventLog +from .management import build_federated_report, build_store_list from .runtime import PhaseMemoryRuntime @@ -80,6 +81,33 @@ def build_parser() -> argparse.ArgumentParser: _add_format(store_repair) store_repair.set_defaults(func=_store_repair) + stores = subparsers.add_parser("stores", help="Federated store discovery") + stores_subparsers = stores.add_subparsers(dest="stores_command") + stores_list = stores_subparsers.add_parser( + "list", + help="List discovered phase-memory stores (defaults, registry, PHASE_MEMORY_STORE_PATHS)", + ) + _add_format(stores_list) + stores_list.set_defaults(func=_stores_list) + + report = subparsers.add_parser( + "report", + help="Activity report across discovered stores for the last N days", + epilog=( + "Discovery: ops-warden default store, PHASE_MEMORY_REGISTRY, " + "PHASE_MEMORY_STORE_PATHS (colon-separated). Set WARDEN_MEMORY_STORE " + "to override the ops-warden store path." + ), + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + report.add_argument("--days", type=int, default=7, help="Look back N days (default: 7)") + report.add_argument( + "--store", + help="Focus on one store by registry store_id or absolute path", + ) + _add_format(report) + report.set_defaults(func=_report) + return parser @@ -148,6 +176,14 @@ def _store_repair(args: argparse.Namespace, runtime: PhaseMemoryRuntime) -> dict return _runtime_for_store(args.store).repair_diagnostics(source_ref=str(args.store)) +def _stores_list(args: argparse.Namespace, runtime: PhaseMemoryRuntime) -> dict[str, Any]: + return build_store_list() + + +def _report(args: argparse.Namespace, runtime: PhaseMemoryRuntime) -> dict[str, Any]: + return build_federated_report(days=args.days, focus_store_id=args.store, focus_store_path=args.store) + + def _missing_command(args: argparse.Namespace, runtime: PhaseMemoryRuntime) -> dict[str, Any]: raise SystemExit("Missing command. Use --help for usage.") @@ -200,6 +236,36 @@ def _print_summary(envelope: dict[str, Any]) -> None: if activation: print(f"selected_nodes={len(activation.get('selected_node_ids', []))}") print(f"omitted={len(activation.get('omitted', []))}") + if envelope.get("schema_version") == "phase_memory.management.store_list.v1": + print(f"stores={envelope.get('store_count', 0)}") + for store in envelope.get("stores", ()): + label = store.get("label") or store.get("store_id") + print(f"- {label} [{store.get('store_kind')}] {store.get('path')}") + if envelope.get("schema_version") == "phase_memory.management.federated_report.v1": + aggregate = envelope.get("aggregate") or {} + print( + f"window={envelope.get('window_start')}..{envelope.get('window_end')} " + f"stores={envelope.get('store_count', 0)}" + ) + print( + f"episodes={aggregate.get('episode_count', 0)} " + f"audit={aggregate.get('audit_event_count', 0)} " + f"active_stores={aggregate.get('active_store_count', 0)}" + ) + for store in envelope.get("stores", ()): + print( + f"- {store.get('store_id')} [{store.get('store_kind')}] " + f"episodes={store.get('episode_count', 0)} " + f"audit={store.get('audit_event_count', 0)} " + f"last={store.get('last_activity_at') or '—'}" + ) + detail = envelope.get("store_detail") + if detail: + print(f"detail={detail.get('store_id')} schema={detail.get('schema_version') or '—'}") + if detail.get("repair_diagnostics"): + print(f"repair_diagnostics={len(detail.get('repair_diagnostics', []))}") + if detail.get("episode_timeline"): + print(f"timeline={len(detail.get('episode_timeline', []))}") if __name__ == "__main__": diff --git a/src/phase_memory/management.py b/src/phase_memory/management.py new file mode 100644 index 0000000..20d8887 --- /dev/null +++ b/src/phase_memory/management.py @@ -0,0 +1,713 @@ +"""Federated store discovery and activity reporting.""" + +from __future__ import annotations + +import json +import os +from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Any, Mapping + +from .adapters import LOCAL_STORE_SCHEMA, FileBackedMemoryGraphStore, JsonlAuditSink, JsonlMemoryEventLog +from .models import Diagnostic +from .ops_warden import OPS_WARDEN_PROFILE_ID, OPS_WARDEN_RUNTIME_SCHEMA, OpsWardenMemoryStore, default_memory_store_path +from .utils import parse_iso_datetime, stable_digest, utc_now_iso + +STORE_REGISTRY_SCHEMA = "phase_memory.management.store_registry.v1" +STORE_LIST_SCHEMA = "phase_memory.management.store_list.v1" +FEDERATED_REPORT_SCHEMA = "phase_memory.management.federated_report.v1" + +STORE_KIND_OPS_WARDEN = "ops_warden_coordination" +STORE_KIND_LOCAL_GRAPH = "local_graph" + +DEFAULT_OPS_WARDEN_STORE_ID = "ops-warden-default" +TOP_ROUTE_IDS = 10 +DETAIL_EPISODE_LIMIT = 20 + + +@dataclass(frozen=True) +class MemoryStoreDescriptor: + store_id: str + path: Path + store_kind: str + profile_id: str = "" + label: str = "" + source: str = "discovered" + + +def default_registry_path(environ: Mapping[str, str] | None = None) -> Path: + environ = environ or os.environ + override = str(environ.get("PHASE_MEMORY_REGISTRY") or "").strip() + if override: + return Path(override).expanduser() + xdg_data = str(environ.get("XDG_DATA_HOME") or "").strip() + base = Path(xdg_data).expanduser() if xdg_data else Path.home() / ".local" / "share" + return base / "phase-memory" / "stores.json" + + +def classify_store(path: str | Path) -> str | None: + root = Path(path).expanduser().resolve() + local_metadata = root / "phase-memory.json" + if local_metadata.is_file(): + try: + data = json.loads(local_metadata.read_text(encoding="utf-8")) + except json.JSONDecodeError: + return None + if str(data.get("schema_version") or "") == LOCAL_STORE_SCHEMA: + return STORE_KIND_LOCAL_GRAPH + + ops_metadata = root / "metadata.json" + if ops_metadata.is_file(): + try: + data = json.loads(ops_metadata.read_text(encoding="utf-8")) + except json.JSONDecodeError: + return None + if str(data.get("schema_version") or "") == OPS_WARDEN_RUNTIME_SCHEMA: + return STORE_KIND_OPS_WARDEN + return None + + +def load_store_registry( + path: str | Path | None = None, + environ: Mapping[str, str] | None = None, +) -> dict[str, Any]: + registry_path = Path(path).expanduser() if path is not None else default_registry_path(environ) + diagnostics: list[dict[str, Any]] = [] + if not registry_path.is_file(): + return { + "schema_version": STORE_REGISTRY_SCHEMA, + "valid": True, + "registry_path": str(registry_path), + "entries": [], + "diagnostics": diagnostics, + } + try: + payload = json.loads(registry_path.read_text(encoding="utf-8")) + except json.JSONDecodeError as exc: + return { + "schema_version": STORE_REGISTRY_SCHEMA, + "valid": False, + "registry_path": str(registry_path), + "entries": [], + "diagnostics": [ + Diagnostic( + "warn", + "corrupt_store_registry", + "Store registry file is not valid JSON.", + str(registry_path), + {"error": str(exc)}, + ).to_dict() + ], + } + if not isinstance(payload, dict): + diagnostics.append( + Diagnostic( + "warn", + "invalid_store_registry_shape", + "Store registry must be a JSON object.", + str(registry_path), + ).to_dict() + ) + return { + "schema_version": STORE_REGISTRY_SCHEMA, + "valid": False, + "registry_path": str(registry_path), + "entries": [], + "diagnostics": diagnostics, + } + + schema = str(payload.get("schema_version") or "") + if schema and schema != STORE_REGISTRY_SCHEMA: + diagnostics.append( + Diagnostic( + "warn", + "unknown_store_registry_schema", + "Store registry declares an unknown schema version.", + str(registry_path), + {"schema_version": schema}, + ).to_dict() + ) + + entries: list[dict[str, Any]] = [] + for item in payload.get("stores") or (): + if not isinstance(item, dict): + continue + store_path = str(item.get("path") or "").strip() + store_kind = str(item.get("store_kind") or "").strip() + store_id = str(item.get("store_id") or "").strip() + if not store_path or not store_kind or not store_id: + diagnostics.append( + Diagnostic( + "warn", + "incomplete_registry_entry", + "Registry entry must include store_id, path, and store_kind.", + store_id or store_path or "unknown", + ).to_dict() + ) + continue + entries.append(item) + return { + "schema_version": STORE_REGISTRY_SCHEMA, + "valid": not any(item.get("severity") == "error" for item in diagnostics), + "registry_path": str(registry_path), + "entries": entries, + "diagnostics": diagnostics, + } + + +def _store_id_for_path(path: Path, environ: Mapping[str, str] | None = None) -> str: + resolved = path.expanduser().resolve() + default_ops = default_memory_store_path(environ).expanduser().resolve() + if resolved == default_ops: + return DEFAULT_OPS_WARDEN_STORE_ID + return f"store:{stable_digest(str(resolved))}" + + +def _descriptor_from_path( + path: Path, + *, + store_kind: str, + source: str, + profile_id: str = "", + label: str = "", + store_id: str = "", + environ: Mapping[str, str] | None = None, +) -> MemoryStoreDescriptor: + resolved = path.expanduser().resolve() + return MemoryStoreDescriptor( + store_id=store_id or _store_id_for_path(resolved, environ), + path=resolved, + store_kind=store_kind, + profile_id=profile_id, + label=label, + source=source, + ) + + +def discover_memory_stores( + environ: Mapping[str, str] | None = None, + *, + registry_path: str | Path | None = None, + extra_paths: tuple[str | Path, ...] = (), +) -> tuple[list[MemoryStoreDescriptor], list[dict[str, Any]]]: + environ = environ or os.environ + diagnostics: list[dict[str, Any]] = [] + by_path: dict[str, MemoryStoreDescriptor] = {} + + def add_descriptor(descriptor: MemoryStoreDescriptor) -> None: + key = str(descriptor.path) + existing = by_path.get(key) + if existing is None: + by_path[key] = descriptor + return + if descriptor.source == "registry" and existing.source != "registry": + by_path[key] = descriptor + + default_ops = default_memory_store_path(environ) + kind = classify_store(default_ops) + if kind: + profile_id = OPS_WARDEN_PROFILE_ID if kind == STORE_KIND_OPS_WARDEN else "" + add_descriptor( + _descriptor_from_path( + default_ops, + store_kind=kind, + source="default", + profile_id=profile_id, + label="ops-warden coordination store", + store_id=DEFAULT_OPS_WARDEN_STORE_ID, + environ=environ, + ) + ) + + registry = load_store_registry(registry_path, environ=environ) + diagnostics.extend(registry.get("diagnostics", ())) + registry_file = Path(str(registry.get("registry_path") or "")).expanduser() + for entry in registry.get("entries", ()): + entry_path = Path(str(entry.get("path") or "")).expanduser() + if not entry_path.is_absolute() and registry_file.is_file(): + entry_path = (registry_file.parent / entry_path).resolve() + if not entry_path.is_dir(): + diagnostics.append( + Diagnostic( + "warn", + "registry_store_missing", + "Registry entry path does not exist.", + str(entry.get("store_id") or entry_path), + {"path": str(entry_path)}, + ).to_dict() + ) + continue + declared_kind = str(entry.get("store_kind") or "") + actual_kind = classify_store(entry_path) + if actual_kind and actual_kind != declared_kind: + diagnostics.append( + Diagnostic( + "warn", + "registry_store_kind_mismatch", + "Registry store_kind does not match on-disk classification.", + str(entry.get("store_id") or entry_path), + {"declared": declared_kind, "actual": actual_kind}, + ).to_dict() + ) + store_kind = actual_kind or declared_kind + if not store_kind: + continue + add_descriptor( + _descriptor_from_path( + entry_path, + store_kind=store_kind, + source="registry", + profile_id=str(entry.get("profile_id") or ""), + label=str(entry.get("label") or ""), + store_id=str(entry.get("store_id") or ""), + environ=environ, + ) + ) + + env_paths = str(environ.get("PHASE_MEMORY_STORE_PATHS") or "").strip() + path_candidates = [item.strip() for item in env_paths.split(":") if item.strip()] + path_candidates.extend(str(item) for item in extra_paths) + for candidate in path_candidates: + candidate_path = Path(candidate).expanduser() + if not candidate_path.is_dir(): + continue + store_kind = classify_store(candidate_path) + if not store_kind: + continue + add_descriptor( + _descriptor_from_path( + candidate_path, + store_kind=store_kind, + source="env", + environ=environ, + ) + ) + + stores = sorted(by_path.values(), key=lambda item: (item.store_kind, item.store_id)) + return stores, diagnostics + + +def resolve_store_reference( + reference: str, + stores: list[MemoryStoreDescriptor], + environ: Mapping[str, str] | None = None, +) -> MemoryStoreDescriptor | None: + ref = str(reference or "").strip() + if not ref: + return None + for store in stores: + if ref == store.store_id or ref == str(store.path): + return store + candidate = Path(ref).expanduser() + if candidate.is_dir(): + store_kind = classify_store(candidate) + if store_kind: + return _descriptor_from_path(candidate, store_kind=store_kind, source="explicit", environ=environ) + for store in stores: + if candidate.resolve() == store.path: + return store + return None + + +def compute_report_window( + *, + days: int, + window_end: datetime | None = None, +) -> tuple[datetime, datetime]: + if days < 1: + raise ValueError("days must be at least 1") + end = window_end or datetime.now(timezone.utc) + if end.tzinfo is None: + end = end.replace(tzinfo=timezone.utc) + else: + end = end.astimezone(timezone.utc) + start = end - timedelta(days=days) + return start, end + + +def _event_in_window(timestamp: str | None, *, window_start: datetime, window_end: datetime) -> bool | None: + parsed = parse_iso_datetime(timestamp) + if parsed is None: + return None + return window_start <= parsed <= window_end + + +def _read_jsonl(path: Path) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: + if not path.is_file(): + return [], [] + events: list[dict[str, Any]] = [] + diagnostics: list[dict[str, Any]] = [] + for line_number, raw in enumerate(path.read_text(encoding="utf-8").splitlines(), start=1): + if not raw.strip(): + continue + try: + data = json.loads(raw) + except json.JSONDecodeError as exc: + diagnostics.append( + Diagnostic( + "warn", + "malformed_jsonl_line", + "JSONL line is not valid JSON.", + f"{path}:line:{line_number}", + {"error": str(exc)}, + ).to_dict() + ) + continue + if isinstance(data, dict): + events.append(data) + return events, diagnostics + + +def _increment(counter: dict[str, int], key: str) -> None: + normalized = key or "unknown" + counter[normalized] = counter.get(normalized, 0) + 1 + + +def _top_counter(counter: dict[str, int], *, limit: int) -> dict[str, int]: + return dict(sorted(counter.items(), key=lambda item: (-item[1], item[0]))[:limit]) + + +def aggregate_ops_warden_activity( + store: MemoryStoreDescriptor, + *, + window_start: datetime, + window_end: datetime, +) -> dict[str, Any]: + events_path = store.path / "events.jsonl" + raw_events, line_diagnostics = _read_jsonl(events_path) + diagnostics = list(line_diagnostics) + by_session_kind: dict[str, int] = {} + by_command: dict[str, int] = {} + by_outcome: dict[str, int] = {} + by_route_id: dict[str, int] = {} + episode_timeline: list[dict[str, Any]] = [] + first_activity_at = "" + last_activity_at = "" + in_window = 0 + missing_timestamp = 0 + + for event in raw_events: + timestamp = str(event.get("recorded_at") or "") + in_range = _event_in_window(timestamp, window_start=window_start, window_end=window_end) + if in_range is None: + missing_timestamp += 1 + diagnostics.append( + Diagnostic( + "warn", + "episode_missing_timestamp", + "Episode lacks a parseable recorded_at timestamp.", + str(event.get("event_id") or "unknown"), + ).to_dict() + ) + continue + if not in_range: + continue + in_window += 1 + _increment(by_session_kind, str(event.get("session_kind") or "")) + _increment(by_command, str(event.get("command") or "")) + _increment(by_outcome, str(event.get("outcome") or "")) + _increment(by_route_id, str(event.get("route_id") or "")) + if not first_activity_at or timestamp < first_activity_at: + first_activity_at = timestamp + if not last_activity_at or timestamp > last_activity_at: + last_activity_at = timestamp + episode_timeline.append( + { + "event_id": str(event.get("event_id") or ""), + "recorded_at": timestamp, + "session_kind": str(event.get("session_kind") or ""), + "command": str(event.get("command") or ""), + "route_id": str(event.get("route_id") or ""), + "outcome": str(event.get("outcome") or ""), + } + ) + + episode_timeline.sort(key=lambda item: item.get("recorded_at", ""), reverse=True) + metadata_path = store.path / "metadata.json" + schema_version = "" + if metadata_path.is_file(): + try: + schema_version = str(json.loads(metadata_path.read_text(encoding="utf-8")).get("schema_version") or "") + except json.JSONDecodeError: + pass + + return { + "store_id": store.store_id, + "store_kind": store.store_kind, + "path": str(store.path), + "profile_id": store.profile_id or OPS_WARDEN_PROFILE_ID, + "schema_version": schema_version, + "episode_count": in_window, + "audit_event_count": 0, + "missing_timestamp_count": missing_timestamp, + "first_activity_at": first_activity_at, + "last_activity_at": last_activity_at, + "by_session_kind": dict(sorted(by_session_kind.items())), + "by_command": dict(sorted(by_command.items())), + "by_outcome": dict(sorted(by_outcome.items())), + "by_route_id": _top_counter(by_route_id, limit=TOP_ROUTE_IDS), + "by_operation": {}, + "episode_timeline": episode_timeline[:DETAIL_EPISODE_LIMIT], + "repair_diagnostics": [], + "diagnostics": diagnostics, + } + + +def aggregate_local_graph_activity( + store: MemoryStoreDescriptor, + *, + window_start: datetime, + window_end: datetime, +) -> dict[str, Any]: + events_path = store.path / "events.jsonl" + audit_path = store.path / "audit.jsonl" + raw_events, event_line_diagnostics = _read_jsonl(events_path) + raw_audit, audit_line_diagnostics = _read_jsonl(audit_path) + diagnostics = list(event_line_diagnostics) + diagnostics.extend(audit_line_diagnostics) + + by_session_kind: dict[str, int] = {} + by_command: dict[str, int] = {} + by_outcome: dict[str, int] = {} + by_operation: dict[str, int] = {} + episode_timeline: list[dict[str, Any]] = [] + first_activity_at = "" + last_activity_at = "" + episode_count = 0 + audit_count = 0 + missing_timestamp = 0 + + for event in raw_events: + timestamp = str(event.get("timestamp") or event.get("recorded_at") or "") + in_range = _event_in_window(timestamp, window_start=window_start, window_end=window_end) + if in_range is None: + missing_timestamp += 1 + diagnostics.append( + Diagnostic( + "warn", + "episode_missing_timestamp", + "Event lacks a parseable timestamp.", + str(event.get("event_id") or event.get("id") or "unknown"), + ).to_dict() + ) + continue + if not in_range: + continue + episode_count += 1 + kind = str(event.get("kind") or event.get("session_kind") or "") + _increment(by_session_kind, kind) + _increment(by_command, str(event.get("command") or kind)) + _increment(by_outcome, str(event.get("outcome") or event.get("state") or "")) + if not first_activity_at or timestamp < first_activity_at: + first_activity_at = timestamp + if not last_activity_at or timestamp > last_activity_at: + last_activity_at = timestamp + episode_timeline.append( + { + "event_id": str(event.get("event_id") or event.get("id") or ""), + "recorded_at": timestamp, + "session_kind": kind, + "command": str(event.get("command") or kind), + "route_id": "", + "outcome": str(event.get("outcome") or event.get("state") or ""), + } + ) + + for event in raw_audit: + timestamp = str(event.get("timestamp") or event.get("recorded_at") or "") + in_range = _event_in_window(timestamp, window_start=window_start, window_end=window_end) + if in_range is None: + missing_timestamp += 1 + diagnostics.append( + Diagnostic( + "warn", + "audit_missing_timestamp", + "Audit event lacks a parseable timestamp.", + str(event.get("operation_id") or event.get("operation") or "unknown"), + ).to_dict() + ) + continue + if not in_range: + continue + audit_count += 1 + _increment(by_operation, str(event.get("operation") or "")) + if not first_activity_at or timestamp < first_activity_at: + first_activity_at = timestamp + if not last_activity_at or timestamp > last_activity_at: + last_activity_at = timestamp + + episode_timeline.sort(key=lambda item: item.get("recorded_at", ""), reverse=True) + graph_store = FileBackedMemoryGraphStore(store.path) + repair = graph_store.repair_diagnostics() + repair_diagnostics = [ + item.to_dict() + for item in repair + if item.severity in {"error", "warn"} + ] + schema_version = "" + try: + schema_version = str(graph_store.metadata().get("schema_version") or "") + except (json.JSONDecodeError, OSError): + pass + + return { + "store_id": store.store_id, + "store_kind": store.store_kind, + "path": str(store.path), + "profile_id": store.profile_id, + "schema_version": schema_version, + "episode_count": episode_count, + "audit_event_count": audit_count, + "missing_timestamp_count": missing_timestamp, + "first_activity_at": first_activity_at, + "last_activity_at": last_activity_at, + "by_session_kind": dict(sorted(by_session_kind.items())), + "by_command": dict(sorted(by_command.items())), + "by_outcome": dict(sorted(by_outcome.items())), + "by_route_id": {}, + "by_operation": dict(sorted(by_operation.items())), + "episode_timeline": episode_timeline[:DETAIL_EPISODE_LIMIT], + "repair_diagnostics": repair_diagnostics, + "diagnostics": diagnostics, + } + + +def aggregate_store_activity( + store: MemoryStoreDescriptor, + *, + window_start: datetime, + window_end: datetime, +) -> dict[str, Any]: + if store.store_kind == STORE_KIND_OPS_WARDEN: + return aggregate_ops_warden_activity(store, window_start=window_start, window_end=window_end) + return aggregate_local_graph_activity(store, window_start=window_start, window_end=window_end) + + +def build_store_list( + environ: Mapping[str, str] | None = None, + *, + registry_path: str | Path | None = None, + extra_paths: tuple[str | Path, ...] = (), +) -> dict[str, Any]: + stores, diagnostics = discover_memory_stores(environ, registry_path=registry_path, extra_paths=extra_paths) + return { + "schema_version": STORE_LIST_SCHEMA, + "valid": not any(item.get("severity") == "error" for item in diagnostics), + "store_count": len(stores), + "stores": [ + { + "store_id": store.store_id, + "path": str(store.path), + "store_kind": store.store_kind, + "profile_id": store.profile_id, + "label": store.label, + "source": store.source, + } + for store in stores + ], + "diagnostics": diagnostics, + } + + +def build_federated_report( + *, + days: int = 7, + focus_store_id: str | None = None, + focus_store_path: str | Path | None = None, + environ: Mapping[str, str] | None = None, + registry_path: str | Path | None = None, + extra_paths: tuple[str | Path, ...] = (), + window_end: datetime | None = None, +) -> dict[str, Any]: + environ = environ or os.environ + stores, discovery_diagnostics = discover_memory_stores( + environ, + registry_path=registry_path, + extra_paths=extra_paths, + ) + diagnostics = list(discovery_diagnostics) + + focus_reference = str(focus_store_id or focus_store_path or "").strip() + focus_store: MemoryStoreDescriptor | None = None + if focus_reference: + focus_store = resolve_store_reference(focus_reference, stores, environ=environ) + if focus_store is None: + diagnostics.append( + Diagnostic( + "error", + "unknown_store_reference", + "Requested store was not discovered and could not be classified.", + focus_reference, + ).to_dict() + ) + return { + "schema_version": FEDERATED_REPORT_SCHEMA, + "valid": False, + "window_days": days, + "window_start": "", + "window_end": "", + "focus_store_id": focus_reference, + "store_count": 0, + "aggregate": {}, + "stores": [], + "store_detail": None, + "diagnostics": diagnostics, + } + stores = [focus_store] + + window_start, window_end_dt = compute_report_window(days=days, window_end=window_end) + summaries: list[dict[str, Any]] = [] + aggregate = { + "episode_count": 0, + "audit_event_count": 0, + "active_store_count": 0, + "by_outcome": {}, + "by_session_kind": {}, + "by_operation": {}, + } + by_outcome: dict[str, int] = {} + by_session_kind: dict[str, int] = {} + by_operation: dict[str, int] = {} + + store_detail: dict[str, Any] | None = None + for store in stores: + summary = aggregate_store_activity(store, window_start=window_start, window_end=window_end_dt) + diagnostics.extend(summary.get("diagnostics", ())) + public_summary = { + key: value + for key, value in summary.items() + if key not in {"episode_timeline", "repair_diagnostics", "diagnostics"} + } + summaries.append(public_summary) + aggregate["episode_count"] += int(summary.get("episode_count") or 0) + aggregate["audit_event_count"] += int(summary.get("audit_event_count") or 0) + if int(summary.get("episode_count") or 0) or int(summary.get("audit_event_count") or 0): + aggregate["active_store_count"] += 1 + for key, value in (summary.get("by_outcome") or {}).items(): + by_outcome[key] = by_outcome.get(key, 0) + int(value) + for key, value in (summary.get("by_session_kind") or {}).items(): + by_session_kind[key] = by_session_kind.get(key, 0) + int(value) + for key, value in (summary.get("by_operation") or {}).items(): + by_operation[key] = by_operation.get(key, 0) + int(value) + if focus_store is not None and store.store_id == focus_store.store_id: + store_detail = summary + + aggregate["by_outcome"] = dict(sorted(by_outcome.items())) + aggregate["by_session_kind"] = dict(sorted(by_session_kind.items())) + aggregate["by_operation"] = dict(sorted(by_operation.items())) + + return { + "schema_version": FEDERATED_REPORT_SCHEMA, + "valid": not any(item.get("severity") == "error" for item in diagnostics), + "window_days": days, + "window_start": window_start.replace(microsecond=0).isoformat(), + "window_end": window_end_dt.replace(microsecond=0).isoformat(), + "focus_store_id": focus_store.store_id if focus_store else "", + "store_count": len(summaries), + "aggregate": aggregate, + "stores": summaries, + "store_detail": store_detail, + "diagnostics": diagnostics, + "generated_at": utc_now_iso(), + } \ No newline at end of file diff --git a/tests/fixtures/management/federated-report-aggregate.json b/tests/fixtures/management/federated-report-aggregate.json new file mode 100644 index 0000000..776e6a6 --- /dev/null +++ b/tests/fixtures/management/federated-report-aggregate.json @@ -0,0 +1,33 @@ +{ + "aggregate": { + "active_store_count": 2, + "audit_event_count": 1, + "by_operation": { + "audit.query": 1 + }, + "by_outcome": { + "resolved": 1, + "skipped": 1, + "unknown": 1 + }, + "by_session_kind": { + "lifecycle.apply": 1, + "warden.agent.codex": 1, + "warden.operator": 1 + }, + "episode_count": 3 + }, + "store_count": 2, + "stores": [ + { + "audit_event_count": 1, + "episode_count": 1, + "store_kind": "local_graph" + }, + { + "audit_event_count": 0, + "episode_count": 2, + "store_kind": "ops_warden_coordination" + } + ] +} \ No newline at end of file diff --git a/tests/fixtures/management/local-store/audit.jsonl b/tests/fixtures/management/local-store/audit.jsonl new file mode 100644 index 0000000..15f5783 --- /dev/null +++ b/tests/fixtures/management/local-store/audit.jsonl @@ -0,0 +1,2 @@ +{"operation":"package.compile","operation_id":"op:audit-old","timestamp":"2026-06-02T08:00:00+00:00"} +{"operation":"audit.query","operation_id":"op:audit-in-window","timestamp":"2026-07-02T10:00:00+00:00"} \ No newline at end of file diff --git a/tests/fixtures/management/local-store/events.jsonl b/tests/fixtures/management/local-store/events.jsonl new file mode 100644 index 0000000..1fa5351 --- /dev/null +++ b/tests/fixtures/management/local-store/events.jsonl @@ -0,0 +1,2 @@ +{"id":"event:old","kind":"path.created","schema_version":"markitect.memory.event.v1","timestamp":"2026-06-01T08:00:00+00:00"} +{"id":"event:in-window","kind":"lifecycle.apply","schema_version":"markitect.memory.event.v1","timestamp":"2026-07-02T09:00:00+00:00"} \ No newline at end of file diff --git a/tests/fixtures/management/local-store/phase-memory.json b/tests/fixtures/management/local-store/phase-memory.json new file mode 100644 index 0000000..0527576 --- /dev/null +++ b/tests/fixtures/management/local-store/phase-memory.json @@ -0,0 +1,3 @@ +{ + "schema_version": "phase_memory.local_store.v1" +} \ No newline at end of file diff --git a/tests/fixtures/management/ops-warden-store/events.jsonl b/tests/fixtures/management/ops-warden-store/events.jsonl new file mode 100644 index 0000000..c6a7643 --- /dev/null +++ b/tests/fixtures/management/ops-warden-store/events.jsonl @@ -0,0 +1,4 @@ +{"agent_id":"grok","command":"route find","diagnostic_codes":[],"event_id":"ops-warden-event:old","need_fingerprint":"abc123","outcome":"resolved","recorded_at":"2026-06-01T10:00:00+00:00","route_id":"openrouter-llm-connect","schema_version":"phase_memory.ops_warden.session_event.v1","session_kind":"warden.agent.grok"} +{"agent_id":"","command":"route find","diagnostic_codes":[],"event_id":"ops-warden-event:in-window-1","need_fingerprint":"def456","outcome":"resolved","recorded_at":"2026-07-01T10:00:00+00:00","route_id":"issue-core-ingestion-api-key","schema_version":"phase_memory.ops_warden.session_event.v1","session_kind":"warden.operator"} +{"agent_id":"codex","command":"access","diagnostic_codes":[],"event_id":"ops-warden-event:in-window-2","need_fingerprint":"ghi789","outcome":"skipped","recorded_at":"2026-07-02T11:00:00+00:00","route_id":"openrouter-llm-connect","schema_version":"phase_memory.ops_warden.session_event.v1","session_kind":"warden.agent.codex"} +{"agent_id":"","command":"memory.activate","diagnostic_codes":[],"event_id":"ops-warden-event:no-ts","need_fingerprint":"","outcome":"resolved","route_id":"","schema_version":"phase_memory.ops_warden.session_event.v1","session_kind":"warden.operator"} \ No newline at end of file diff --git a/tests/fixtures/management/ops-warden-store/metadata.json b/tests/fixtures/management/ops-warden-store/metadata.json new file mode 100644 index 0000000..8efcf5d --- /dev/null +++ b/tests/fixtures/management/ops-warden-store/metadata.json @@ -0,0 +1,5 @@ +{ + "created_at": "2026-07-01T10:00:00+00:00", + "profile_id": "ops-warden-coordination", + "schema_version": "phase_memory.ops_warden.runtime.v1" +} \ No newline at end of file diff --git a/tests/fixtures/management/registry.json b/tests/fixtures/management/registry.json new file mode 100644 index 0000000..5ebfebd --- /dev/null +++ b/tests/fixtures/management/registry.json @@ -0,0 +1,13 @@ +{ + "schema_version": "phase_memory.management.store_registry.v1", + "stores": [ + { + "store_id": "fixture-local-graph", + "path": "local-store", + "store_kind": "local_graph", + "profile_id": "local-dev", + "label": "Fixture local graph store", + "registered_at": "2026-07-01T10:00:00+00:00" + } + ] +} \ No newline at end of file diff --git a/tests/fixtures/public-api-snapshot.json b/tests/fixtures/public-api-snapshot.json index 3b5cbde..1f9d412 100644 --- a/tests/fixtures/public-api-snapshot.json +++ b/tests/fixtures/public-api-snapshot.json @@ -17,6 +17,7 @@ "EVALUATION_TREND_REGRESSION_GATE_SCHEMA", "EVALUATION_TREND_SCHEMA", "ExternalAdapterPack", + "FEDERATED_REPORT_SCHEMA", "FakeExternalEventLog", "FakeExternalGraphStore", "FakeExternalPolicyGateway", @@ -76,6 +77,8 @@ "RuntimeConfig", "SERVICE_APP_SCHEMA", "SERVICE_BINDING_SCHEMA", + "STORE_LIST_SCHEMA", + "STORE_REGISTRY_SCHEMA", "ServiceAppConfig", "ServiceBinding", "ServiceResponse", @@ -89,8 +92,11 @@ "activation_quality_report", "adapter_pack_manifest", "branch_path", + "build_federated_report", "build_service_binding", "build_session_event", + "build_store_list", + "classify_store", "compact_path", "create_path", "create_wsgi_app", @@ -99,6 +105,7 @@ "credentialed_operator_report", "credentialed_telemetry_retention_drill", "default_memory_store_path", + "discover_memory_stores", "evaluation_threshold_report", "evaluation_trend_artifact", "evaluation_trend_history", diff --git a/tests/test_management_cli.py b/tests/test_management_cli.py new file mode 100644 index 0000000..1f51323 --- /dev/null +++ b/tests/test_management_cli.py @@ -0,0 +1,139 @@ +"""Tests for federated store management and reporting.""" + +from __future__ import annotations + +import json +from datetime import datetime, timezone +from pathlib import Path + +import pytest + +from phase_memory.cli import main +from phase_memory.management import ( + FEDERATED_REPORT_SCHEMA, + STORE_KIND_LOCAL_GRAPH, + STORE_KIND_OPS_WARDEN, + build_federated_report, + build_store_list, + classify_store, + discover_memory_stores, + resolve_store_reference, +) + +FIXTURES = Path(__file__).resolve().parent / "fixtures" / "management" +WINDOW_END = datetime(2026, 7, 3, 12, 0, tzinfo=timezone.utc) + + +def _fixture_environ(tmp_path: Path) -> dict[str, str]: + registry = FIXTURES / "registry.json" + ops_store = FIXTURES / "ops-warden-store" + local_store = FIXTURES / "local-store" + return { + "WARDEN_MEMORY_STORE": str(ops_store), + "PHASE_MEMORY_REGISTRY": str(registry), + "PHASE_MEMORY_STORE_PATHS": str(local_store), + "HOME": str(tmp_path), + "XDG_DATA_HOME": str(tmp_path / "xdg"), + } + + +def test_classify_store_kinds() -> None: + assert classify_store(FIXTURES / "ops-warden-store") == STORE_KIND_OPS_WARDEN + assert classify_store(FIXTURES / "local-store") == STORE_KIND_LOCAL_GRAPH + assert classify_store(FIXTURES / "registry.json") is None + + +def test_discover_memory_stores_deduplicates_and_merges_registry(tmp_path: Path) -> None: + environ = _fixture_environ(tmp_path) + stores, diagnostics = discover_memory_stores(environ) + assert not any(item.get("severity") == "error" for item in diagnostics) + store_ids = {store.store_id for store in stores} + assert "fixture-local-graph" in store_ids + assert len({str(store.path) for store in stores}) == len(stores) + + +def test_build_federated_report_aggregate_and_detail(tmp_path: Path) -> None: + environ = _fixture_environ(tmp_path) + report = build_federated_report(days=7, environ=environ, window_end=WINDOW_END) + assert report["schema_version"] == FEDERATED_REPORT_SCHEMA + assert report["valid"] is True + assert report["store_count"] == 2 + aggregate = report["aggregate"] + assert aggregate["episode_count"] == 3 + assert aggregate["audit_event_count"] == 1 + assert aggregate["active_store_count"] == 2 + assert aggregate["by_outcome"]["resolved"] == 1 + assert aggregate["by_outcome"]["skipped"] == 1 + assert aggregate["by_session_kind"]["warden.operator"] == 1 + assert aggregate["by_operation"]["audit.query"] == 1 + + detail = build_federated_report( + days=7, + focus_store_id="fixture-local-graph", + environ=environ, + window_end=WINDOW_END, + ) + assert detail["focus_store_id"] == "fixture-local-graph" + assert detail["store_count"] == 1 + assert detail["store_detail"]["episode_count"] == 1 + assert detail["store_detail"]["audit_event_count"] == 1 + + +def test_resolve_store_reference_by_id_and_path(tmp_path: Path) -> None: + environ = _fixture_environ(tmp_path) + stores, _ = discover_memory_stores(environ) + by_id = resolve_store_reference("fixture-local-graph", stores, environ=environ) + assert by_id is not None + assert by_id.store_kind == STORE_KIND_LOCAL_GRAPH + by_path = resolve_store_reference(str(FIXTURES / "ops-warden-store"), stores, environ=environ) + assert by_path is not None + assert by_path.store_kind == STORE_KIND_OPS_WARDEN + + +def test_federated_report_aggregate_fixture(tmp_path: Path) -> None: + environ = _fixture_environ(tmp_path) + report = build_federated_report(days=7, environ=environ, window_end=WINDOW_END) + payload = { + "aggregate": report["aggregate"], + "store_count": report["store_count"], + "stores": [ + {key: store[key] for key in ("store_id", "store_kind", "episode_count", "audit_event_count")} + for store in report["stores"] + ], + } + expected = json.loads((FIXTURES / "federated-report-aggregate.json").read_text(encoding="utf-8")) + assert payload["aggregate"] == expected["aggregate"] + assert payload["store_count"] == expected["store_count"] + actual_stores = sorted(payload["stores"], key=lambda item: item["store_kind"]) + expected_stores = sorted(expected["stores"], key=lambda item: item["store_kind"]) + for actual, item in zip(actual_stores, expected_stores, strict=True): + assert actual["store_kind"] == item["store_kind"] + assert actual["episode_count"] == item["episode_count"] + assert actual["audit_event_count"] == item["audit_event_count"] + + +def test_cli_stores_list_and_report(tmp_path: Path, capsys, monkeypatch: pytest.MonkeyPatch) -> None: + environ = _fixture_environ(tmp_path) + monkeypatch.setenv("WARDEN_MEMORY_STORE", environ["WARDEN_MEMORY_STORE"]) + monkeypatch.setenv("PHASE_MEMORY_REGISTRY", environ["PHASE_MEMORY_REGISTRY"]) + monkeypatch.setenv("PHASE_MEMORY_STORE_PATHS", environ["PHASE_MEMORY_STORE_PATHS"]) + assert main(["stores", "list", "--format", "summary"]) == 0 + listed = capsys.readouterr().out + assert "Fixture local graph store" in listed + + assert main(["report", "--days", "7", "--format", "summary"]) == 0 + reported = capsys.readouterr().out + assert "episodes=" in reported + assert "active_stores=" in reported + + +def test_build_store_list_from_registry_only(tmp_path: Path) -> None: + environ = { + "WARDEN_MEMORY_STORE": str(tmp_path / "missing-ops"), + "PHASE_MEMORY_REGISTRY": str(FIXTURES / "registry.json"), + "PHASE_MEMORY_STORE_PATHS": str(FIXTURES / "local-store"), + } + listing = build_store_list(environ=environ) + assert listing["valid"] is True + assert listing["store_count"] == 1 + assert listing["stores"][0]["store_id"] == "fixture-local-graph" \ No newline at end of file diff --git a/workplans/PMEM-WP-0017-federated-store-management-cli.md b/workplans/PMEM-WP-0017-federated-store-management-cli.md new file mode 100644 index 0000000..d8f77c8 --- /dev/null +++ b/workplans/PMEM-WP-0017-federated-store-management-cli.md @@ -0,0 +1,102 @@ +--- +id: PMEM-WP-0017 +type: workplan +title: "Federated Store Management CLI And Activity Reports" +domain: communication +repo: phase-memory +status: finished +owner: codex +topic_slug: phase-memory +created: "2026-07-03" +updated: "2026-07-03" +--- + +# PMEM-WP-0017: Federated Store Management CLI And Activity Reports + +## Goal + +Extend the `phase-memory` CLI into a **management interface** for every on-disk +memory store the runtime has produced — not only the ops-warden coordination +store. Operators and agents need one command to answer: + +- How much memory activity happened in the last *N* days **across all stores**? +- Which store contributed what (episodes, audit events, session kinds, outcomes)? +- What does a **single store** look like in detail for the same window? + +LLM interactivity stays out of scope here. When memory is "brought to life" +through ops-warden (worker, route/access, coding-agent sessions), chat already +happens in that surface. This workplan adds **inspectable reporting** only. + +## Implementation Update - 2026-07-03 + +Shipped federated store discovery, windowed activity aggregation, and CLI +reporting: + +- `src/phase_memory/management.py` — discovery, classification, aggregation, + `build_federated_report`, `build_store_list` +- `phase-memory stores list` and `phase-memory report [--days N] [--store …]` +- `make report-7d` and `make report REPORT_DAYS=N` +- Tests in `tests/test_management_cli.py` with `tests/fixtures/management/` + +Validation: `make test` → 117 passed, 1 skipped. + +## T01 - Store discovery and classification + +```task +id: PMEM-WP-0017-T01 +status: done +priority: high +``` + +Implement `phase_memory.management` with `discover_memory_stores`, +`classify_store`, and registry read path. + +## T02 - Windowed activity aggregation + +```task +id: PMEM-WP-0017-T02 +status: done +priority: high +``` + +Aggregate ops-warden episodes and local graph events/audit for a time window. + +## T03 - Federated report envelope + +```task +id: PMEM-WP-0017-T03 +status: done +priority: high +``` + +`build_federated_report` returns `phase_memory.management.federated_report.v1`. + +## T04 - CLI commands + +```task +id: PMEM-WP-0017-T04 +status: done +priority: high +``` + +`stores list` and `report` wired in `cli.py`. + +## T05 - Makefile, README, and operator runbook + +```task +id: PMEM-WP-0017-T05 +status: done +priority: medium +``` + +`make report-7d`, README management section, runbook cross-store activity. + +## T06 - Tests and evaluation hook + +```task +id: PMEM-WP-0017-T06 +status: done +priority: medium +``` + +`tests/test_management_cli.py` and management fixtures; public API snapshot updated. \ No newline at end of file