"""Read a single session digest from the local store (AGENTIC-WP-0011 T03). Thin read path for ``kaizen-agentic metrics correlate`` and other consumers. Does not run ingest. Usage: python -m session_memory.digest_lookup [--json] HELIX_STORE_DB=/abs/path/to/mem.db python -m session_memory.digest_lookup """ from __future__ import annotations import argparse import json import os import sys from .core.store import Store from .ingest import _expand, load_config def resolve_store_paths(*, config_path: str | None = None) -> tuple[str, str]: """Resolve db + blob paths from HELIX_STORE_DB or config.toml [store].""" env_db = os.environ.get("HELIX_STORE_DB") if env_db: db_path = _expand(env_db) blob_dir = os.path.join(os.path.dirname(db_path), "blobs") return db_path, blob_dir here = os.path.dirname(os.path.abspath(__file__)) cfg_path = config_path or os.path.join(here, "config.toml") store_cfg = load_config(cfg_path).get("store", {}) return _expand(store_cfg.get("db_path", "session_memory/.store/mem.db")), _expand( store_cfg.get("blob_dir", "session_memory/.store/blobs") ) def lookup_digest(session_uid: str, *, config_path: str | None = None) -> dict | None: db_path, blob_dir = resolve_store_paths(config_path=config_path) store = Store(db_path, blob_dir) try: return store.get_digest(session_uid) finally: store.close() def main(argv: list[str] | None = None) -> int: here = os.path.dirname(os.path.abspath(__file__)) ap = argparse.ArgumentParser( description="Read one session digest from the Helix Forge store (no ingest)." ) ap.add_argument("session_uid", help="Normalized session uid, e.g. claude:abc-123") ap.add_argument("--config", default=os.path.join(here, "config.toml"), help="config.toml when HELIX_STORE_DB is unset") ap.add_argument("--json", action="store_true", help="print digest JSON to stdout") args = ap.parse_args(argv) digest = lookup_digest(args.session_uid, config_path=args.config) if digest is None: print(f"digest not found: {args.session_uid}", file=sys.stderr) return 1 if args.json: print(json.dumps(digest, indent=2, sort_keys=True)) else: cost = digest.get("cost") or {} tokens = cost.get("input_tokens", 0) + cost.get("output_tokens", 0) print(f"session_uid: {digest.get('session_uid')}") print(f"repo: {digest.get('repo')} flavor: {digest.get('flavor')}") print(f"outcome: {digest.get('outcome')} tokens: {tokens}") print(f"started_at: {digest.get('started_at')} ended_at: {digest.get('ended_at')}") return 0 if __name__ == "__main__": raise SystemExit(main())