From 6dfa69e3103f1a1c3263d7887bf376ddf7ca5733 Mon Sep 17 00:00:00 2001 From: tegwick Date: Sat, 27 Jun 2026 16:26:03 +0200 Subject: [PATCH] =?UTF-8?q?feat(WARDEN-WP-0014):=20T3=20=E2=80=94=20OpenBa?= =?UTF-8?q?o=20proxy=20lane=20(--fetch=20/=20--exec)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds transparent, policy-gated, audited proxy of a non-SSH credential through `warden access`, for exec_capable lanes. Three guardrails in code: - G1 caller identity: runs the owner's tool with the caller's own env; warden injects no token of its own (caller_auth_present check). - G2 transit-only: --fetch inherits stdout (never PIPE) so the value never enters warden's memory or any log; --exec injects into the child env only. Audit (access-audit.log) is metadata-only. - G3 policy gate: check_fetch_policy runs before any fetch; with policy.enabled=false the proxy refuses unless --no-policy is given. resolve_fetch_command refuses unresolved <…> placeholders rather than guess owner-side names. New warden/proxy.py + policy.check_fetch_policy; tests/test_proxy.py asserts all three guardrails. 168 passed, lint clean. Co-Authored-By: Claude Opus 4.8 --- src/warden/cli.py | 151 +++++++++++++- src/warden/policy.py | 58 ++++++ src/warden/proxy.py | 184 ++++++++++++++++++ tests/test_proxy.py | 182 +++++++++++++++++ .../WARDEN-WP-0014-operator-access-assist.md | 24 ++- 5 files changed, 588 insertions(+), 11 deletions(-) create mode 100644 src/warden/proxy.py create mode 100644 tests/test_proxy.py diff --git a/src/warden/cli.py b/src/warden/cli.py index 4b708c4..e00327f 100644 --- a/src/warden/cli.py +++ b/src/warden/cli.py @@ -737,8 +737,121 @@ def _access_json(entry, expanded, gate: str, domain: Optional[str]) -> dict: return payload -@app.command("access") +def _access_proxy( + entry, + *, + domain: Optional[str], + field: Optional[str], + path: Optional[str], + do_exec: bool, + child_argv: list, + no_policy: bool, +) -> None: + """Proxy a non-SSH credential fetch as the caller (WP-0014 T3). + + Enforces the three guardrails: caller identity (no warden token), policy gate + before fetch, and transit-only (no value persisted or logged). All warden chatter + goes to stderr so --fetch stdout carries only the secret. + """ + from warden.proxy import ( + ProxyError, + caller_auth_present, + proxy_exec, + proxy_fetch, + resolve_fetch_command, + write_audit, + ) + from warden.policy import check_fetch_policy + + if not entry.exec_capable: + err.print( + f"[red]{entry.id!r} is not exec_capable.[/red] " + "Use `warden access` (advisory) and obtain it from the owner directly." + ) + raise typer.Exit(2) + + # Proxy is privileged — require a real config for policy posture + audit sink. + try: + cfg = load_config() + except ConfigError as e: + err.print( + f"[red]Proxy requires warden.yaml[/red] (policy gate + audit sink): {e}\n" + "Advisory mode works without it: drop --fetch/--exec." + ) + raise typer.Exit(2) + + # G1 — caller identity. ops-warden adds no token of its own. + if not caller_auth_present(): + err.print( + "[red]No caller credential found[/red] (VAULT_TOKEN/BAO_TOKEN or ~/.vault-token). " + f"Authenticate first: {entry.auth_method or 'see the owner auth path'}." + ) + raise typer.Exit(3) + + # G3 — policy gate before fetch. + decision_id = None + if cfg.policy.enabled: + try: + decision_id = check_fetch_policy( + cfg.policy, need_id=entry.id, owner_repo=entry.owner_repo, domain=domain + ) + except CAError as e: + err.print(f"[red]Policy gate denied the fetch:[/red] {e}") + raise typer.Exit(4) + err.print(f"[green]flex-auth allow[/green] (decision {decision_id}).") + elif not no_policy: + err.print( + "[yellow]flex-auth gate is not enforced[/yellow] (policy.enabled=false). " + "Re-run with [bold]--no-policy[/bold] to proxy ungated, or enable the gate." + ) + raise typer.Exit(4) + else: + err.print("[yellow]Proxying ungated[/yellow] (--no-policy; gate not enforced).") + + try: + argv = resolve_fetch_command(entry, domain=domain, field=field, path=path) + except ProxyError as e: + err.print(f"[red]{e}[/red]") + raise typer.Exit(2) + + action = "exec" if do_exec else "fetch" + err.print( + f"[dim]proxy {action}: {entry.id} → {entry.owner_repo} " + f"(caller identity; value not persisted)[/dim]" + ) + try: + if do_exec: + if not child_argv: + err.print("[red]--exec needs a command after `--`[/red], e.g. `-- npm publish`.") + raise typer.Exit(2) + rc = proxy_exec(argv, env_var=field or "", child_argv=child_argv) + else: + rc = proxy_fetch(argv) + except ProxyError as e: + err.print(f"[red]{e}[/red]") + raise typer.Exit(5) + finally: + try: + write_audit( + cfg.state_dir, + need_id=entry.id, + owner_repo=entry.owner_repo, + domain=domain, + action=action, + decision_id=decision_id, + ) + except OSError as e: + err.print(f"[yellow]audit write failed:[/yellow] {e}") + + raise typer.Exit(rc) + + +@app.command( + "access", + context_settings={"allow_extra_args": True, "ignore_unknown_options": True}, +) def access( + ctx: typer.Context, need: Annotated[str, typer.Argument(help="Free-text need, e.g. 'npm token', 'db password'")], domain: Annotated[ Optional[str], @@ -746,13 +859,34 @@ def access( ] = None, output_json: Annotated[bool, typer.Option("--json", help="Output JSON (stable, secret-free)")] = False, all_entries: Annotated[bool, typer.Option("--all", help="Include draft entries")] = False, + do_fetch: Annotated[ + bool, typer.Option("--fetch", help="Proxy the fetch as the caller; value streams to stdout") + ] = False, + do_exec: Annotated[ + bool, + typer.Option("--exec", help="Run the trailing command (after --) with the secret in its env"), + ] = False, + field: Annotated[ + Optional[str], typer.Option("--field", help="Secret field / env-var name, e.g. NPM_AUTH_TOKEN") + ] = None, + path: Annotated[ + Optional[str], typer.Option("--path", help="Override the owner-side path template") + ] = None, + no_policy: Annotated[ + bool, + typer.Option("--no-policy", help="Acknowledge proxying when the flex-auth gate is not enforced"), + ] = False, ) -> None: """Operator front door: how to obtain any credential, gated and audited. Advisory by default — renders the owner, auth method, path template, command skeleton, and policy gate status for the best-matching need. ops-warden issues the SSH lane directly and **routes every other need to its owner** — it never - holds or vends the secret value. (Proxy fetch arrives in WP-0014 T3.) + holds or vends the secret value. + + With --fetch / --exec it proxies the fetch *as the caller* for exec_capable lanes: + the flex-auth gate runs first, ops-warden adds no credential of its own, the value + is never persisted or logged, and only metadata is audited. """ from warden.access import expand_handoff, policy_gate_status @@ -766,6 +900,19 @@ def access( raise typer.Exit(1) entry = matches[0] + + if do_fetch or do_exec: + _access_proxy( + entry, + domain=domain, + field=field, + path=path, + do_exec=do_exec, + child_argv=list(ctx.args), + no_policy=no_policy, + ) + return + expanded = expand_handoff(entry, domain) gate = policy_gate_status() diff --git a/src/warden/policy.py b/src/warden/policy.py index d800f08..920b302 100644 --- a/src/warden/policy.py +++ b/src/warden/policy.py @@ -88,6 +88,64 @@ def check_sign_policy(cfg: PolicyConfig, spec: CertSpec) -> str | None: reason = decision.get("reason") or "no reason provided" raise CAError(f"flex-auth denied SSH sign for {spec.actor_name!r}: {reason}") + if not decision_id: + raise CAError("flex-auth allow decision missing id") + return str(decision_id) + + +def check_fetch_policy( + cfg: PolicyConfig, *, need_id: str, owner_repo: str, domain: str | None +) -> str | None: + """Call flex-auth /v1/check before proxying a non-SSH credential fetch (WP-0014). + + The action is ``read`` on a ``secret`` resource owned by another subsystem — + ops-warden is the conduit, not the owner. Returns the decision id on allow, + None when policy is disabled, and raises CAError on deny (or on an unreachable + flex-auth when fail_closed). No secret value is ever part of this request. + """ + if not cfg.enabled: + return None + + subject_id = os.environ.get(cfg.subject_env, "").strip() or "operator" + request = { + "subject": {"id": subject_id, "type": "operator", "tenant": cfg.tenant}, + "action": "read", + "resource": { + "id": f"secret:{need_id}" + (f"/{domain}" if domain else ""), + "type": "secret", + "system": owner_repo, + "tenant": cfg.tenant, + }, + "context": {"need_id": need_id, "owner_repo": owner_repo, "domain": domain}, + } + + url = cfg.flex_auth_url.rstrip("/") + "/v1/check" + try: + response = httpx.post(url, json=request, timeout=10.0) + response.raise_for_status() + except httpx.HTTPStatusError as e: + if cfg.fail_closed: + raise CAError( + f"flex-auth denied or rejected fetch policy check (HTTP {e.response.status_code})" + ) from e + return None + except httpx.RequestError as e: + if cfg.fail_closed: + raise CAError( + f"flex-auth unreachable at {cfg.flex_auth_url!r} (fail_closed=true): {e}" + ) from e + return None + + try: + decision = response.json() + except ValueError as e: + raise CAError("flex-auth returned non-JSON decision") from e + + effect = str(decision.get("effect", "")).lower() + decision_id = decision.get("id") or decision.get("request_id") + if effect != "allow": + reason = decision.get("reason") or "no reason provided" + raise CAError(f"flex-auth denied secret read for {need_id!r}: {reason}") if not decision_id: raise CAError("flex-auth allow decision missing id") return str(decision_id) \ No newline at end of file diff --git a/src/warden/proxy.py b/src/warden/proxy.py new file mode 100644 index 0000000..ce4c028 --- /dev/null +++ b/src/warden/proxy.py @@ -0,0 +1,184 @@ +"""Operator access proxy — transparent, audited fetch of a non-SSH credential. + +WP-0014 T3. ops-warden does not own these secrets; the proxy lane lets an operator +obtain one *through* the `warden access` front door while keeping the security model +intact. Three guardrails are enforced here in code: + +* **G1 — caller identity, never warden's.** The proxy runs the owner's tool with the + caller's own environment. ops-warden injects no token of its own; if the caller has + no credential, the underlying tool fails and we surface the auth pointer. We never + add a `*_TOKEN` warden owns to the child environment. +* **G2 — transit only, no persistence/logging of values.** ``proxy_fetch`` runs the + tool with **inherited** stdout/stderr (never a pipe), so the value streams to the + caller and never enters warden's memory. ``proxy_exec`` reads the value solely to + place it in a child process's environment (the accepted proxy tradeoff) and never + writes it to disk or log. The audit record is metadata only. +* **G3 — policy gate before fetch.** The CLI runs ``check_fetch_policy`` before + calling anything here; this module refuses to run an unresolved command template. + +This module shells out but never *interprets* secret bytes in the ``--fetch`` path. +""" +from __future__ import annotations + +import json +import os +import re +import shlex +import subprocess +from datetime import datetime, timezone +from pathlib import Path +from typing import List, Optional + +from warden.routing.models import RouteEntry + +_PLACEHOLDER = re.compile(r"<[^>]+>") + + +class ProxyError(Exception): + """Raised when a proxy fetch cannot be performed safely.""" + + +def resolve_fetch_command( + entry: RouteEntry, + *, + domain: Optional[str] = None, + field: Optional[str] = None, + path: Optional[str] = None, +) -> List[str]: + """Build the concrete argv for an entry's fetch, or raise if under-specified. + + Starts from the catalog ``fetch_command`` template (with ```` + inlined), substitutes ````/```` and an explicit ``--path`` override, + then **refuses** if any ``<…>`` placeholder remains. We never run a half-templated + command — an unresolved placeholder means the operator has not named the owner-side + resource, and guessing it is exactly the failure mode we avoid. + """ + if not entry.exec_capable or not entry.fetch_command: + raise ProxyError( + f"{entry.id!r} is not exec_capable — it has no proxyable fetch command. " + "Use `warden access` (advisory) and obtain it from the owner directly." + ) + + cmd = entry.fetch_command + if entry.path_template and "" in cmd: + cmd = cmd.replace("", path or entry.path_template) + elif path: + # No token but caller supplied a path — append/override is + # ambiguous, so require the template to carry the token. + raise ProxyError( + f"{entry.id!r} fetch_command has no token to override with --path." + ) + + if domain: + cmd = cmd.replace("", domain) + if field: + cmd = cmd.replace("", field) + + leftover = _PLACEHOLDER.findall(cmd) + if leftover: + raise ProxyError( + f"unresolved placeholder(s) {', '.join(sorted(set(leftover)))} in fetch command. " + "Supply --domain/--field (and --path for owner-side names) — warden will not " + "guess owner-confirmed resource names." + ) + return shlex.split(cmd) + + +def caller_auth_present(token_envs: tuple[str, ...] = ("VAULT_TOKEN", "BAO_TOKEN")) -> bool: + """True if the *caller* appears to hold an auth token (G1 sanity check). + + Best-effort: also accepts a ``~/.vault-token`` file. We do not validate it — the + owner's tool does that — we only avoid proxying when the caller clearly has no + credential, so the failure is a clear auth pointer rather than a confusing tool error. + """ + if any(os.environ.get(e, "").strip() for e in token_envs): + return True + return (Path.home() / ".vault-token").exists() + + +def write_audit( + state_dir: Path, + *, + need_id: str, + owner_repo: str, + domain: Optional[str], + action: str, + decision_id: Optional[str], + exit_code: Optional[int] = None, +) -> Path: + """Append a metadata-only audit record. Never contains a secret value (G2).""" + state_dir.mkdir(parents=True, exist_ok=True) + log_path = state_dir / "access-audit.log" + record = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "action": action, # "fetch" | "exec" + "need_id": need_id, + "owner_repo": owner_repo, + "domain": domain, + "subject": os.environ.get("WARDEN_POLICY_SUBJECT", "").strip() or "operator", + "policy_decision_id": decision_id, + "exit_code": exit_code, + } + with log_path.open("a") as f: + f.write(json.dumps(record) + "\n") + return log_path + + +def _caller_env() -> dict: + """The child environment = the caller's own env. warden adds no credential (G1).""" + return dict(os.environ) + + +def proxy_fetch(argv: List[str]) -> int: + """Run the owner's tool, streaming its output straight to the caller. + + stdout/stderr are **inherited** (``None``), never piped — the secret value flows + subsystem → caller and is never read into warden's memory, buffer, or log (G2). + Returns the tool's exit code. + """ + completed = subprocess.run( # noqa: S603 — argv is shlex-split from a validated template + argv, + stdout=None, + stderr=None, + stdin=None, + env=_caller_env(), + check=False, + ) + return completed.returncode + + +def proxy_exec(argv: List[str], *, env_var: str, child_argv: List[str]) -> int: + """Fetch the value and inject it into a child command's environment only. + + The value transits warden's memory here (the accepted proxy tradeoff for `--exec`) + but is never written to disk or log and never enters the caller's own shell env. + Captures the fetch tool's stdout to obtain the value, strips a single trailing + newline, and runs ``child_argv`` with ``env_var`` set in its environment. + """ + if not env_var: + raise ProxyError("--exec requires --field (the env var name to inject), e.g. NPM_AUTH_TOKEN") + + fetched = subprocess.run( # noqa: S603 + argv, stdout=subprocess.PIPE, stderr=None, stdin=None, + env=_caller_env(), check=False, text=True, + ) + if fetched.returncode != 0: + raise ProxyError( + f"fetch failed (exit {fetched.returncode}) — check caller auth and the path." + ) + + value = fetched.stdout + if value.endswith("\n"): + value = value[:-1] + + child_env = _caller_env() + child_env[env_var] = value + try: + child = subprocess.run( # noqa: S603 + child_argv, stdout=None, stderr=None, stdin=None, env=child_env, check=False + ) + return child.returncode + finally: + # Best-effort scrub of the local reference; do not log it. + value = "" # noqa: F841 + del child_env[env_var] diff --git a/tests/test_proxy.py b/tests/test_proxy.py new file mode 100644 index 0000000..916ad90 --- /dev/null +++ b/tests/test_proxy.py @@ -0,0 +1,182 @@ +"""Tests for the access proxy lane (WP-0014 T3) and its three guardrails.""" +from __future__ import annotations + +import json +import subprocess +from pathlib import Path + +import pytest +from typer.testing import CliRunner + +from warden.cli import app +from warden.proxy import ( + ProxyError, + caller_auth_present, + proxy_exec, + proxy_fetch, + resolve_fetch_command, + write_audit, +) +from warden.routing.models import RouteEntry + +runner = CliRunner() + + +def _entry(**over) -> RouteEntry: + base = dict( + id="openbao-api-key", + title="API key", + need_keywords=["npm", "token"], + owner_repo="railiance-platform", + subsystem="OpenBao", + warden_executes=False, + wiki_ref="w", + canon_ref="c", + reviewed="2026-06-27", + status="active", + path_template="platform/workloads///", + fetch_command="bao kv get -field= ", + exec_capable=True, + ) + base.update(over) + return RouteEntry(**base) + + +# --- resolve_fetch_command ------------------------------------------------- + +def test_resolve_builds_argv(): + argv = resolve_fetch_command( + _entry(), domain="coulomb_social", field="NPM_AUTH_TOKEN", path="platform/x/y/z" + ) + assert argv == ["bao", "kv", "get", "-field=NPM_AUTH_TOKEN", "platform/x/y/z"] + + +def test_resolve_refuses_unresolved_placeholder(): + # no --field / --path → , , remain + with pytest.raises(ProxyError, match="unresolved placeholder"): + resolve_fetch_command(_entry(), domain="coulomb_social") + + +def test_resolve_refuses_non_exec_capable(): + with pytest.raises(ProxyError, match="not exec_capable"): + resolve_fetch_command(_entry(exec_capable=False, fetch_command=None)) + + +# --- G2: transit-only fetch (inherited stdout) ----------------------------- + +def test_proxy_fetch_inherits_stdout_never_pipes(monkeypatch): + calls = {} + + def fake_run(argv, **kw): + calls.update(kw) + return subprocess.CompletedProcess(argv, 0) + + monkeypatch.setattr("warden.proxy.subprocess.run", fake_run) + rc = proxy_fetch(["bao", "kv", "get", "x"]) + assert rc == 0 + # The value must never enter warden's memory — stdout is inherited, not piped. + assert calls["stdout"] is None + assert calls.get("stderr") is None + + +# --- G1 + inject: exec injects value into child env, adds no warden token --- + +def test_proxy_exec_injects_only_into_child_env(monkeypatch): + seen_env = {} + + def fake_run(argv, **kw): + if argv[0] == "bao": + return subprocess.CompletedProcess(argv, 0, stdout="SECRETVAL\n") + seen_env.update(kw["env"]) + return subprocess.CompletedProcess(argv, 0) + + monkeypatch.setattr("warden.proxy.subprocess.run", fake_run) + monkeypatch.delenv("NPM_AUTH_TOKEN", raising=False) + rc = proxy_exec(["bao", "kv", "get", "x"], env_var="NPM_AUTH_TOKEN", child_argv=["true"]) + assert rc == 0 + # Value injected into child env (trailing newline stripped)… + assert seen_env["NPM_AUTH_TOKEN"] == "SECRETVAL" + # …and warden added no credential of its own beyond the caller's environment. + assert "VAULT_TOKEN" not in {k for k in seen_env if k not in __import__("os").environ} + + +def test_proxy_exec_requires_env_var(): + with pytest.raises(ProxyError, match="requires --field"): + proxy_exec(["bao"], env_var="", child_argv=["true"]) + + +# --- G1 caller auth detection ---------------------------------------------- + +def test_caller_auth_present_from_env(monkeypatch): + monkeypatch.setenv("VAULT_TOKEN", "x") + assert caller_auth_present() is True + + +def test_caller_auth_absent(monkeypatch, tmp_path): + monkeypatch.delenv("VAULT_TOKEN", raising=False) + monkeypatch.delenv("BAO_TOKEN", raising=False) + monkeypatch.setattr(Path, "home", lambda: tmp_path) # no ~/.vault-token + assert caller_auth_present() is False + + +# --- audit metadata only --------------------------------------------------- + +def test_write_audit_has_no_value_field(tmp_path): + p = write_audit( + tmp_path, need_id="openbao-api-key", owner_repo="railiance-platform", + domain="coulomb_social", action="fetch", decision_id=None, + ) + rec = json.loads(p.read_text().strip()) + assert rec["need_id"] == "openbao-api-key" + assert "value" not in rec and "secret" not in rec + + +# --- CLI guardrail wiring --------------------------------------------------- + +def _repo_catalog() -> Path: + return Path(__file__).resolve().parents[1] / "registry" / "routing" / "catalog.yaml" + + +def _warden_yaml(tmp_path: Path) -> Path: + cfg = tmp_path / "warden.yaml" + (tmp_path / "ca").write_text("") + cfg.write_text( + f"backend: local\nca_key: {tmp_path/'ca'}\nstate_dir: {tmp_path/'state'}\n" + "policy:\n enabled: false\n" + ) + return cfg + + +def _proxy_env(monkeypatch, tmp_path): + monkeypatch.setenv("WARDEN_ROUTING_CATALOG", str(_repo_catalog())) + monkeypatch.setenv("WARDEN_CONFIG", str(_warden_yaml(tmp_path))) + + +def test_cli_proxy_refuses_without_policy_ack(monkeypatch, tmp_path): + _proxy_env(monkeypatch, tmp_path) + monkeypatch.setenv("VAULT_TOKEN", "caller") + # subprocess must never run if the gate blocks first. + monkeypatch.setattr( + "warden.proxy.subprocess.run", + lambda *a, **k: (_ for _ in ()).throw(AssertionError("fetch ran despite gate")), + ) + r = runner.invoke( + app, + ["access", "npm", "--domain", "coulomb_social", "--field", "NPM_AUTH_TOKEN", + "--path", "platform/x/y/z", "--fetch"], + ) + assert r.exit_code == 4 + assert "not enforced" in r.stdout or "not enforced" in str(r.output) + + +def test_cli_proxy_requires_caller_auth(monkeypatch, tmp_path): + _proxy_env(monkeypatch, tmp_path) + monkeypatch.delenv("VAULT_TOKEN", raising=False) + monkeypatch.delenv("BAO_TOKEN", raising=False) + monkeypatch.setattr(Path, "home", lambda: tmp_path) + r = runner.invoke( + app, + ["access", "npm", "--domain", "coulomb_social", "--field", "NPM_AUTH_TOKEN", + "--path", "platform/x/y/z", "--fetch", "--no-policy"], + ) + assert r.exit_code == 3 diff --git a/workplans/WARDEN-WP-0014-operator-access-assist.md b/workplans/WARDEN-WP-0014-operator-access-assist.md index 4cbe432..4221695 100644 --- a/workplans/WARDEN-WP-0014-operator-access-assist.md +++ b/workplans/WARDEN-WP-0014-operator-access-assist.md @@ -129,19 +129,25 @@ state_hub_task_id: "c1497263-7124-459f-b63a-d0c0c7005c86" ```task id: WARDEN-WP-0014-T03 -status: todo +status: done priority: high state_hub_task_id: "6d3eb0e4-309c-4065-893e-6c4053fb0db2" ``` -- [ ] `warden access --fetch` — policy-gate (G3) → `exec` the owning tool - (`bao kv get ...`) **as the caller** → stream value to stdout. No buffering, no log. -- [ ] `warden access --exec -- ` — run a child command with the secret - injected into *its* env only (à la `op run`); value never lands in the caller's - shell history or persistent env. -- [ ] Enforce guardrails G1–G3 in code; unit + integration tests assert: no value on - disk, no value in logs, no standing warden credential, gate runs before fetch. -- [ ] Audit event (metadata only) written per fetch — reuse the signatures-log pattern. +- [x] `warden access --fetch` — policy-gate (G3) → run the owning tool + (`bao kv get ...`) **as the caller** with **inherited stdout** → value streams to + stdout and never enters warden's memory (`proxy_fetch`). No buffering, no log. +- [x] `warden access --exec -- ` — runs the child with the secret injected + into *its* env only (`proxy_exec`); value never lands in the caller's shell env; + `--field` names the env var (e.g. `NPM_AUTH_TOKEN`). +- [x] Guardrails G1–G3 in code (`warden/proxy.py`, `_access_proxy` in `cli.py`): + G1 caller token only (no warden credential; `caller_auth_present`); G2 transit-only + (inherit-stdout fetch; no disk/log write); G3 `check_fetch_policy` before any exec, + `--no-policy` required to proxy ungated. `tests/test_proxy.py` asserts all three, + plus `resolve_fetch_command` refuses unresolved `<…>` placeholders. Live smoke + against a fake `bao` confirmed gate-refusal, stream, exec-inject, and a + secret-free audit log. +- [x] Metadata-only audit per call (`write_audit` → `state_dir/access-audit.log`). ### T4 — key-cape / login orchestration lane