feat(WARDEN-WP-0014): T3 — OpenBao proxy lane (--fetch / --exec)

Adds transparent, policy-gated, audited proxy of a non-SSH credential
through `warden access`, for exec_capable lanes. Three guardrails in code:

- G1 caller identity: runs the owner's tool with the caller's own env;
  warden injects no token of its own (caller_auth_present check).
- G2 transit-only: --fetch inherits stdout (never PIPE) so the value
  never enters warden's memory or any log; --exec injects into the child
  env only. Audit (access-audit.log) is metadata-only.
- G3 policy gate: check_fetch_policy runs before any fetch; with
  policy.enabled=false the proxy refuses unless --no-policy is given.

resolve_fetch_command refuses unresolved <…> placeholders rather than
guess owner-side names. New warden/proxy.py + policy.check_fetch_policy;
tests/test_proxy.py asserts all three guardrails. 168 passed, lint clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-27 16:26:03 +02:00
parent 830a775bcf
commit 6dfa69e310
5 changed files with 588 additions and 11 deletions

182
tests/test_proxy.py Normal file
View File

@@ -0,0 +1,182 @@
"""Tests for the access proxy lane (WP-0014 T3) and its three guardrails."""
from __future__ import annotations
import json
import subprocess
from pathlib import Path
import pytest
from typer.testing import CliRunner
from warden.cli import app
from warden.proxy import (
ProxyError,
caller_auth_present,
proxy_exec,
proxy_fetch,
resolve_fetch_command,
write_audit,
)
from warden.routing.models import RouteEntry
runner = CliRunner()
def _entry(**over) -> RouteEntry:
base = dict(
id="openbao-api-key",
title="API key",
need_keywords=["npm", "token"],
owner_repo="railiance-platform",
subsystem="OpenBao",
warden_executes=False,
wiki_ref="w",
canon_ref="c",
reviewed="2026-06-27",
status="active",
path_template="platform/workloads/<domain>/<workload>/<bundle>",
fetch_command="bao kv get -field=<FIELD> <path_template>",
exec_capable=True,
)
base.update(over)
return RouteEntry(**base)
# --- resolve_fetch_command -------------------------------------------------
def test_resolve_builds_argv():
argv = resolve_fetch_command(
_entry(), domain="coulomb_social", field="NPM_AUTH_TOKEN", path="platform/x/y/z"
)
assert argv == ["bao", "kv", "get", "-field=NPM_AUTH_TOKEN", "platform/x/y/z"]
def test_resolve_refuses_unresolved_placeholder():
# no --field / --path → <FIELD>, <workload>, <bundle> remain
with pytest.raises(ProxyError, match="unresolved placeholder"):
resolve_fetch_command(_entry(), domain="coulomb_social")
def test_resolve_refuses_non_exec_capable():
with pytest.raises(ProxyError, match="not exec_capable"):
resolve_fetch_command(_entry(exec_capable=False, fetch_command=None))
# --- G2: transit-only fetch (inherited stdout) -----------------------------
def test_proxy_fetch_inherits_stdout_never_pipes(monkeypatch):
calls = {}
def fake_run(argv, **kw):
calls.update(kw)
return subprocess.CompletedProcess(argv, 0)
monkeypatch.setattr("warden.proxy.subprocess.run", fake_run)
rc = proxy_fetch(["bao", "kv", "get", "x"])
assert rc == 0
# The value must never enter warden's memory — stdout is inherited, not piped.
assert calls["stdout"] is None
assert calls.get("stderr") is None
# --- G1 + inject: exec injects value into child env, adds no warden token ---
def test_proxy_exec_injects_only_into_child_env(monkeypatch):
seen_env = {}
def fake_run(argv, **kw):
if argv[0] == "bao":
return subprocess.CompletedProcess(argv, 0, stdout="SECRETVAL\n")
seen_env.update(kw["env"])
return subprocess.CompletedProcess(argv, 0)
monkeypatch.setattr("warden.proxy.subprocess.run", fake_run)
monkeypatch.delenv("NPM_AUTH_TOKEN", raising=False)
rc = proxy_exec(["bao", "kv", "get", "x"], env_var="NPM_AUTH_TOKEN", child_argv=["true"])
assert rc == 0
# Value injected into child env (trailing newline stripped)…
assert seen_env["NPM_AUTH_TOKEN"] == "SECRETVAL"
# …and warden added no credential of its own beyond the caller's environment.
assert "VAULT_TOKEN" not in {k for k in seen_env if k not in __import__("os").environ}
def test_proxy_exec_requires_env_var():
with pytest.raises(ProxyError, match="requires --field"):
proxy_exec(["bao"], env_var="", child_argv=["true"])
# --- G1 caller auth detection ----------------------------------------------
def test_caller_auth_present_from_env(monkeypatch):
monkeypatch.setenv("VAULT_TOKEN", "x")
assert caller_auth_present() is True
def test_caller_auth_absent(monkeypatch, tmp_path):
monkeypatch.delenv("VAULT_TOKEN", raising=False)
monkeypatch.delenv("BAO_TOKEN", raising=False)
monkeypatch.setattr(Path, "home", lambda: tmp_path) # no ~/.vault-token
assert caller_auth_present() is False
# --- audit metadata only ---------------------------------------------------
def test_write_audit_has_no_value_field(tmp_path):
p = write_audit(
tmp_path, need_id="openbao-api-key", owner_repo="railiance-platform",
domain="coulomb_social", action="fetch", decision_id=None,
)
rec = json.loads(p.read_text().strip())
assert rec["need_id"] == "openbao-api-key"
assert "value" not in rec and "secret" not in rec
# --- CLI guardrail wiring ---------------------------------------------------
def _repo_catalog() -> Path:
return Path(__file__).resolve().parents[1] / "registry" / "routing" / "catalog.yaml"
def _warden_yaml(tmp_path: Path) -> Path:
cfg = tmp_path / "warden.yaml"
(tmp_path / "ca").write_text("")
cfg.write_text(
f"backend: local\nca_key: {tmp_path/'ca'}\nstate_dir: {tmp_path/'state'}\n"
"policy:\n enabled: false\n"
)
return cfg
def _proxy_env(monkeypatch, tmp_path):
monkeypatch.setenv("WARDEN_ROUTING_CATALOG", str(_repo_catalog()))
monkeypatch.setenv("WARDEN_CONFIG", str(_warden_yaml(tmp_path)))
def test_cli_proxy_refuses_without_policy_ack(monkeypatch, tmp_path):
_proxy_env(monkeypatch, tmp_path)
monkeypatch.setenv("VAULT_TOKEN", "caller")
# subprocess must never run if the gate blocks first.
monkeypatch.setattr(
"warden.proxy.subprocess.run",
lambda *a, **k: (_ for _ in ()).throw(AssertionError("fetch ran despite gate")),
)
r = runner.invoke(
app,
["access", "npm", "--domain", "coulomb_social", "--field", "NPM_AUTH_TOKEN",
"--path", "platform/x/y/z", "--fetch"],
)
assert r.exit_code == 4
assert "not enforced" in r.stdout or "not enforced" in str(r.output)
def test_cli_proxy_requires_caller_auth(monkeypatch, tmp_path):
_proxy_env(monkeypatch, tmp_path)
monkeypatch.delenv("VAULT_TOKEN", raising=False)
monkeypatch.delenv("BAO_TOKEN", raising=False)
monkeypatch.setattr(Path, "home", lambda: tmp_path)
r = runner.invoke(
app,
["access", "npm", "--domain", "coulomb_social", "--field", "NPM_AUTH_TOKEN",
"--path", "platform/x/y/z", "--fetch", "--no-policy"],
)
assert r.exit_code == 3