Files
ops-warden/tests/test_proxy.py
tegwick 1c3d1b4d52 feat(WARDEN-WP-0014): T4 — key-cape login orchestration lane
Adds a lane: secret|login field to RouteEntry. The login lane is an
interactive auth bootstrap: it skips the caller-auth precheck (no token
yet — that's the point) and the secret-read gate (it establishes the
identity the gate needs), runs the owner's login command interactively
as the caller via inherited stdio, and rejects --exec. The token stays
in the caller's own store; warden never captures it (G2 holds). Audited
as action: login. key-cape-oidc-login populated as the reference login
entry. Advisory proxy hint updated now that T3 has shipped.

172 passed, lint clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-27 17:31:55 +02:00

239 lines
8.1 KiB
Python

"""Tests for the access proxy lane (WP-0014 T3) and its three guardrails."""
from __future__ import annotations
import json
import subprocess
from pathlib import Path
import pytest
from typer.testing import CliRunner
from warden.cli import app
from warden.proxy import (
ProxyError,
caller_auth_present,
proxy_exec,
proxy_fetch,
resolve_fetch_command,
write_audit,
)
from warden.routing.models import RouteEntry
runner = CliRunner()
def _entry(**over) -> RouteEntry:
base = dict(
id="openbao-api-key",
title="API key",
need_keywords=["npm", "token"],
owner_repo="railiance-platform",
subsystem="OpenBao",
warden_executes=False,
wiki_ref="w",
canon_ref="c",
reviewed="2026-06-27",
status="active",
path_template="platform/workloads/<domain>/<workload>/<bundle>",
fetch_command="bao kv get -field=<FIELD> <path_template>",
exec_capable=True,
)
base.update(over)
return RouteEntry(**base)
# --- resolve_fetch_command -------------------------------------------------
def test_resolve_builds_argv():
argv = resolve_fetch_command(
_entry(), domain="coulomb_social", field="NPM_AUTH_TOKEN", path="platform/x/y/z"
)
assert argv == ["bao", "kv", "get", "-field=NPM_AUTH_TOKEN", "platform/x/y/z"]
def test_resolve_refuses_unresolved_placeholder():
# no --field / --path → <FIELD>, <workload>, <bundle> remain
with pytest.raises(ProxyError, match="unresolved placeholder"):
resolve_fetch_command(_entry(), domain="coulomb_social")
def test_resolve_refuses_non_exec_capable():
with pytest.raises(ProxyError, match="not exec_capable"):
resolve_fetch_command(_entry(exec_capable=False, fetch_command=None))
# --- G2: transit-only fetch (inherited stdout) -----------------------------
def test_proxy_fetch_inherits_stdout_never_pipes(monkeypatch):
calls = {}
def fake_run(argv, **kw):
calls.update(kw)
return subprocess.CompletedProcess(argv, 0)
monkeypatch.setattr("warden.proxy.subprocess.run", fake_run)
rc = proxy_fetch(["bao", "kv", "get", "x"])
assert rc == 0
# The value must never enter warden's memory — stdout is inherited, not piped.
assert calls["stdout"] is None
assert calls.get("stderr") is None
# --- G1 + inject: exec injects value into child env, adds no warden token ---
def test_proxy_exec_injects_only_into_child_env(monkeypatch):
seen_env = {}
def fake_run(argv, **kw):
if argv[0] == "bao":
return subprocess.CompletedProcess(argv, 0, stdout="SECRETVAL\n")
seen_env.update(kw["env"])
return subprocess.CompletedProcess(argv, 0)
monkeypatch.setattr("warden.proxy.subprocess.run", fake_run)
monkeypatch.delenv("NPM_AUTH_TOKEN", raising=False)
rc = proxy_exec(["bao", "kv", "get", "x"], env_var="NPM_AUTH_TOKEN", child_argv=["true"])
assert rc == 0
# Value injected into child env (trailing newline stripped)…
assert seen_env["NPM_AUTH_TOKEN"] == "SECRETVAL"
# …and warden added no credential of its own beyond the caller's environment.
assert "VAULT_TOKEN" not in {k for k in seen_env if k not in __import__("os").environ}
def test_proxy_exec_requires_env_var():
with pytest.raises(ProxyError, match="requires --field"):
proxy_exec(["bao"], env_var="", child_argv=["true"])
# --- G1 caller auth detection ----------------------------------------------
def test_caller_auth_present_from_env(monkeypatch):
monkeypatch.setenv("VAULT_TOKEN", "x")
assert caller_auth_present() is True
def test_caller_auth_absent(monkeypatch, tmp_path):
monkeypatch.delenv("VAULT_TOKEN", raising=False)
monkeypatch.delenv("BAO_TOKEN", raising=False)
monkeypatch.setattr(Path, "home", lambda: tmp_path) # no ~/.vault-token
assert caller_auth_present() is False
# --- audit metadata only ---------------------------------------------------
def test_write_audit_has_no_value_field(tmp_path):
p = write_audit(
tmp_path, need_id="openbao-api-key", owner_repo="railiance-platform",
domain="coulomb_social", action="fetch", decision_id=None,
)
rec = json.loads(p.read_text().strip())
assert rec["need_id"] == "openbao-api-key"
assert "value" not in rec and "secret" not in rec
# --- CLI guardrail wiring ---------------------------------------------------
def _repo_catalog() -> Path:
return Path(__file__).resolve().parents[1] / "registry" / "routing" / "catalog.yaml"
def _warden_yaml(tmp_path: Path) -> Path:
cfg = tmp_path / "warden.yaml"
(tmp_path / "ca").write_text("")
cfg.write_text(
f"backend: local\nca_key: {tmp_path/'ca'}\nstate_dir: {tmp_path/'state'}\n"
"policy:\n enabled: false\n"
)
return cfg
def _proxy_env(monkeypatch, tmp_path):
monkeypatch.setenv("WARDEN_ROUTING_CATALOG", str(_repo_catalog()))
monkeypatch.setenv("WARDEN_CONFIG", str(_warden_yaml(tmp_path)))
def test_cli_proxy_refuses_without_policy_ack(monkeypatch, tmp_path):
_proxy_env(monkeypatch, tmp_path)
monkeypatch.setenv("VAULT_TOKEN", "caller")
# subprocess must never run if the gate blocks first.
monkeypatch.setattr(
"warden.proxy.subprocess.run",
lambda *a, **k: (_ for _ in ()).throw(AssertionError("fetch ran despite gate")),
)
r = runner.invoke(
app,
["access", "npm", "--domain", "coulomb_social", "--field", "NPM_AUTH_TOKEN",
"--path", "platform/x/y/z", "--fetch"],
)
assert r.exit_code == 4
assert "not enforced" in r.stdout or "not enforced" in str(r.output)
def test_cli_proxy_requires_caller_auth(monkeypatch, tmp_path):
_proxy_env(monkeypatch, tmp_path)
monkeypatch.delenv("VAULT_TOKEN", raising=False)
monkeypatch.delenv("BAO_TOKEN", raising=False)
monkeypatch.setattr(Path, "home", lambda: tmp_path)
r = runner.invoke(
app,
["access", "npm", "--domain", "coulomb_social", "--field", "NPM_AUTH_TOKEN",
"--path", "platform/x/y/z", "--fetch", "--no-policy"],
)
assert r.exit_code == 3
# --- T4: login lane --------------------------------------------------------
def test_cli_login_lane_runs_without_token_or_policy_ack(monkeypatch, tmp_path):
"""Login lane skips the caller-auth precheck and the secret-read gate."""
_proxy_env(monkeypatch, tmp_path)
monkeypatch.delenv("VAULT_TOKEN", raising=False)
monkeypatch.delenv("BAO_TOKEN", raising=False)
monkeypatch.setattr(Path, "home", lambda: tmp_path) # no ~/.vault-token
ran = {}
def fake_run(argv, **kw):
ran["argv"] = argv
ran["stdout"] = kw.get("stdout")
return subprocess.CompletedProcess(argv, 0)
monkeypatch.setattr("warden.proxy.subprocess.run", fake_run)
r = runner.invoke(app, ["access", "login oidc", "--domain", "coulomb_social", "--fetch"])
assert r.exit_code == 0
assert ran["argv"][:2] == ["bao", "login"] # interactive login ran
assert ran["stdout"] is None # inherited stdio — token not captured
def test_cli_login_lane_rejects_exec(monkeypatch, tmp_path):
_proxy_env(monkeypatch, tmp_path)
monkeypatch.setattr(
"warden.proxy.subprocess.run",
lambda *a, **k: (_ for _ in ()).throw(AssertionError("should not run")),
)
r = runner.invoke(
app, ["access", "login oidc", "--domain", "coulomb_social", "--exec", "--", "true"]
)
assert r.exit_code == 2
def test_real_catalog_login_entry_is_login_lane():
from warden.routing import load_catalog
e = load_catalog(_repo_catalog()).get("key-cape-oidc-login")
assert e is not None and e.lane == "login" and e.exec_capable
def test_invalid_lane_rejected(tmp_path):
import yaml
from warden.routing import CatalogError, load_catalog
entry = dict(
id="x", title="t", need_keywords=["k"], owner_repo="o", subsystem="s",
warden_executes=False, wiki_ref="w", canon_ref="c", reviewed="2026-06-27",
status="active", lane="bogus",
)
p = tmp_path / "c.yaml"
p.write_text(yaml.dump({"version": 1, "entries": [entry]}))
import pytest
with pytest.raises(CatalogError, match="invalid lane"):
load_catalog(p)