"""phase-memory bridge for ops-warden cross-runtime experiential memory.""" from __future__ import annotations import os from typing import Any, Mapping, Optional _PHASE_MEMORY_ERROR = ( "phase-memory is required for warden memory commands. " "From ~/ops-warden run: make install-memory (or make install-all). " "Dev fallback: PYTHONPATH=../phase-memory/src" ) # In-process cache: implicit activation is default; no separate `warden memory activate` # is required for normal route/access/worker/sign use within one CLI invocation tree. _CONTEXT_CACHE: dict[str, Any] | None = None _CONTEXT_CACHE_KEY: tuple[str, str] = ("", "") def _invalidate_context_cache() -> None: global _CONTEXT_CACHE, _CONTEXT_CACHE_KEY _CONTEXT_CACHE = None _CONTEXT_CACHE_KEY = ("", "") def _phase_memory(): try: import phase_memory.ops_warden as pm return pm except ImportError as exc: # pragma: no cover - exercised via tests with PYTHONPATH raise RuntimeError(_PHASE_MEMORY_ERROR) from exc def memory_available() -> bool: try: _phase_memory() return True except RuntimeError: return False def enabled(environ: Mapping[str, str] | None = None) -> bool: environ = environ or os.environ return str(environ.get("WARDEN_MEMORY", "1")).strip().lower() not in {"0", "false", "no", "off"} def store_path(environ: Mapping[str, str] | None = None): return _phase_memory().default_memory_store_path(environ) def session_kind(environ: Mapping[str, str] | None = None) -> str: return _phase_memory().resolve_session_kind(environ) def status(environ: Mapping[str, str] | None = None) -> dict[str, Any]: pm = _phase_memory() return pm.OpsWardenMemoryStore.open(environ=environ).status() def memory_context_summary(activation: dict[str, Any] | None) -> dict[str, Any]: if not activation: return {"enabled": False} return { "enabled": True, "implicit": bool(activation.get("implicit")), "session_kind": activation.get("session_kind", ""), "episode_count": activation.get("episode_count", 0), "stabilized_route_id": (activation.get("stabilized_route") or {}).get("route_id", ""), "llm_calls_avoided": bool(activation.get("llm_calls_avoided")), "selected_episode_count": len(activation.get("selected_episodes") or ()), } def ensure_memory_context( need: str = "", *, agent: Optional[str] = None, session_id: str = "", environ: Mapping[str, str] | None = None, implicit: bool = True, ) -> dict[str, Any] | None: """Load coordination memory for the current session (default, no extra command).""" global _CONTEXT_CACHE, _CONTEXT_CACHE_KEY if not enabled(environ): return None if not memory_available(): return None pm = _phase_memory() env = dict(environ or os.environ) if agent: env["WARDEN_AGENT_ID"] = agent kind = pm.resolve_session_kind(env) fingerprint = pm.need_fingerprint(need) if need else "" cache_key = (kind, fingerprint) if _CONTEXT_CACHE is not None and _CONTEXT_CACHE_KEY == cache_key: return _CONTEXT_CACHE try: activation = activate(need=need, agent=agent, session_id=session_id, environ=env) except RuntimeError: return None activation = {**activation, "implicit": implicit} _CONTEXT_CACHE = activation _CONTEXT_CACHE_KEY = cache_key return activation def activate( *, need: str = "", agent: Optional[str] = None, session_id: str = "", environ: Mapping[str, str] | None = None, implicit: bool = False, ) -> dict[str, Any]: pm = _phase_memory() env = dict(environ or os.environ) if agent: env["WARDEN_AGENT_ID"] = agent kind = pm.resolve_session_kind(env) activation = pm.activate_ops_warden_memory( pm.OpsWardenMemoryStore.open(environ=env), session_kind=kind, need=need, session_id=session_id, ) if implicit: activation = {**activation, "implicit": True} return activation def record_command_episode( *, command: str, outcome: str, need: str = "", route_id: str = "", diagnostic_codes: Optional[list[str]] = None, metadata: Optional[dict[str, Any]] = None, environ: Mapping[str, str] | None = None, ) -> dict[str, Any]: if not enabled(environ): return {"valid": True, "skipped": True, "reason": "WARDEN_MEMORY=0"} pm = _phase_memory() env = dict(environ or os.environ) event = pm.build_session_event( command=command, session_kind=pm.resolve_session_kind(env), outcome=outcome, need=need, route_id=route_id, agent_id=str(env.get("WARDEN_AGENT_ID") or ""), session_id=str(env.get("WARDEN_SESSION_ID") or ""), diagnostic_codes=diagnostic_codes, metadata=metadata, ) result = pm.record_session_event(pm.OpsWardenMemoryStore.open(environ=env), event) if result.get("valid"): _invalidate_context_cache() return result def worker_activation_context(need: str = "", environ: Mapping[str, str] | None = None) -> dict[str, Any]: env = dict(environ or os.environ) env["WARDEN_SESSION_KIND"] = "warden.worker" return ensure_memory_context(need=need, environ=env, implicit=True) or activate(need=need, environ=env, implicit=True) def stabilized_route_for_need(need: str, environ: Mapping[str, str] | None = None) -> Optional[dict[str, Any]]: pm = _phase_memory() store = pm.OpsWardenMemoryStore.open(environ=environ) return pm.stabilized_route_match(store.list_events(), need=need) def format_activation_summary(activation: dict[str, Any]) -> str: lines = [ f"store: {activation.get('episode_count', 0)} episodes", f"session_kind: {activation.get('session_kind', '')}", f"selected: {len(activation.get('selected_episodes', ()) )}", ] stabilized = activation.get("stabilized_route") if stabilized: lines.append( f"stabilized: {stabilized.get('route_id')} ({stabilized.get('confirmations')} confirmations)" ) if activation.get("llm_calls_avoided"): lines.append("llm_calls_avoided: true") return "\n".join(lines)