generated from coulomb/repo-seed
Adds a lane: secret|login field to RouteEntry. The login lane is an interactive auth bootstrap: it skips the caller-auth precheck (no token yet — that's the point) and the secret-read gate (it establishes the identity the gate needs), runs the owner's login command interactively as the caller via inherited stdio, and rejects --exec. The token stays in the caller's own store; warden never captures it (G2 holds). Audited as action: login. key-cape-oidc-login populated as the reference login entry. Advisory proxy hint updated now that T3 has shipped. 172 passed, lint clean. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
239 lines
8.1 KiB
Python
239 lines
8.1 KiB
Python
"""Tests for the access proxy lane (WP-0014 T3) and its three guardrails."""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
from typer.testing import CliRunner
|
|
|
|
from warden.cli import app
|
|
from warden.proxy import (
|
|
ProxyError,
|
|
caller_auth_present,
|
|
proxy_exec,
|
|
proxy_fetch,
|
|
resolve_fetch_command,
|
|
write_audit,
|
|
)
|
|
from warden.routing.models import RouteEntry
|
|
|
|
runner = CliRunner()
|
|
|
|
|
|
def _entry(**over) -> RouteEntry:
|
|
base = dict(
|
|
id="openbao-api-key",
|
|
title="API key",
|
|
need_keywords=["npm", "token"],
|
|
owner_repo="railiance-platform",
|
|
subsystem="OpenBao",
|
|
warden_executes=False,
|
|
wiki_ref="w",
|
|
canon_ref="c",
|
|
reviewed="2026-06-27",
|
|
status="active",
|
|
path_template="platform/workloads/<domain>/<workload>/<bundle>",
|
|
fetch_command="bao kv get -field=<FIELD> <path_template>",
|
|
exec_capable=True,
|
|
)
|
|
base.update(over)
|
|
return RouteEntry(**base)
|
|
|
|
|
|
# --- resolve_fetch_command -------------------------------------------------
|
|
|
|
def test_resolve_builds_argv():
|
|
argv = resolve_fetch_command(
|
|
_entry(), domain="coulomb_social", field="NPM_AUTH_TOKEN", path="platform/x/y/z"
|
|
)
|
|
assert argv == ["bao", "kv", "get", "-field=NPM_AUTH_TOKEN", "platform/x/y/z"]
|
|
|
|
|
|
def test_resolve_refuses_unresolved_placeholder():
|
|
# no --field / --path → <FIELD>, <workload>, <bundle> remain
|
|
with pytest.raises(ProxyError, match="unresolved placeholder"):
|
|
resolve_fetch_command(_entry(), domain="coulomb_social")
|
|
|
|
|
|
def test_resolve_refuses_non_exec_capable():
|
|
with pytest.raises(ProxyError, match="not exec_capable"):
|
|
resolve_fetch_command(_entry(exec_capable=False, fetch_command=None))
|
|
|
|
|
|
# --- G2: transit-only fetch (inherited stdout) -----------------------------
|
|
|
|
def test_proxy_fetch_inherits_stdout_never_pipes(monkeypatch):
|
|
calls = {}
|
|
|
|
def fake_run(argv, **kw):
|
|
calls.update(kw)
|
|
return subprocess.CompletedProcess(argv, 0)
|
|
|
|
monkeypatch.setattr("warden.proxy.subprocess.run", fake_run)
|
|
rc = proxy_fetch(["bao", "kv", "get", "x"])
|
|
assert rc == 0
|
|
# The value must never enter warden's memory — stdout is inherited, not piped.
|
|
assert calls["stdout"] is None
|
|
assert calls.get("stderr") is None
|
|
|
|
|
|
# --- G1 + inject: exec injects value into child env, adds no warden token ---
|
|
|
|
def test_proxy_exec_injects_only_into_child_env(monkeypatch):
|
|
seen_env = {}
|
|
|
|
def fake_run(argv, **kw):
|
|
if argv[0] == "bao":
|
|
return subprocess.CompletedProcess(argv, 0, stdout="SECRETVAL\n")
|
|
seen_env.update(kw["env"])
|
|
return subprocess.CompletedProcess(argv, 0)
|
|
|
|
monkeypatch.setattr("warden.proxy.subprocess.run", fake_run)
|
|
monkeypatch.delenv("NPM_AUTH_TOKEN", raising=False)
|
|
rc = proxy_exec(["bao", "kv", "get", "x"], env_var="NPM_AUTH_TOKEN", child_argv=["true"])
|
|
assert rc == 0
|
|
# Value injected into child env (trailing newline stripped)…
|
|
assert seen_env["NPM_AUTH_TOKEN"] == "SECRETVAL"
|
|
# …and warden added no credential of its own beyond the caller's environment.
|
|
assert "VAULT_TOKEN" not in {k for k in seen_env if k not in __import__("os").environ}
|
|
|
|
|
|
def test_proxy_exec_requires_env_var():
|
|
with pytest.raises(ProxyError, match="requires --field"):
|
|
proxy_exec(["bao"], env_var="", child_argv=["true"])
|
|
|
|
|
|
# --- G1 caller auth detection ----------------------------------------------
|
|
|
|
def test_caller_auth_present_from_env(monkeypatch):
|
|
monkeypatch.setenv("VAULT_TOKEN", "x")
|
|
assert caller_auth_present() is True
|
|
|
|
|
|
def test_caller_auth_absent(monkeypatch, tmp_path):
|
|
monkeypatch.delenv("VAULT_TOKEN", raising=False)
|
|
monkeypatch.delenv("BAO_TOKEN", raising=False)
|
|
monkeypatch.setattr(Path, "home", lambda: tmp_path) # no ~/.vault-token
|
|
assert caller_auth_present() is False
|
|
|
|
|
|
# --- audit metadata only ---------------------------------------------------
|
|
|
|
def test_write_audit_has_no_value_field(tmp_path):
|
|
p = write_audit(
|
|
tmp_path, need_id="openbao-api-key", owner_repo="railiance-platform",
|
|
domain="coulomb_social", action="fetch", decision_id=None,
|
|
)
|
|
rec = json.loads(p.read_text().strip())
|
|
assert rec["need_id"] == "openbao-api-key"
|
|
assert "value" not in rec and "secret" not in rec
|
|
|
|
|
|
# --- CLI guardrail wiring ---------------------------------------------------
|
|
|
|
def _repo_catalog() -> Path:
|
|
return Path(__file__).resolve().parents[1] / "registry" / "routing" / "catalog.yaml"
|
|
|
|
|
|
def _warden_yaml(tmp_path: Path) -> Path:
|
|
cfg = tmp_path / "warden.yaml"
|
|
(tmp_path / "ca").write_text("")
|
|
cfg.write_text(
|
|
f"backend: local\nca_key: {tmp_path/'ca'}\nstate_dir: {tmp_path/'state'}\n"
|
|
"policy:\n enabled: false\n"
|
|
)
|
|
return cfg
|
|
|
|
|
|
def _proxy_env(monkeypatch, tmp_path):
|
|
monkeypatch.setenv("WARDEN_ROUTING_CATALOG", str(_repo_catalog()))
|
|
monkeypatch.setenv("WARDEN_CONFIG", str(_warden_yaml(tmp_path)))
|
|
|
|
|
|
def test_cli_proxy_refuses_without_policy_ack(monkeypatch, tmp_path):
|
|
_proxy_env(monkeypatch, tmp_path)
|
|
monkeypatch.setenv("VAULT_TOKEN", "caller")
|
|
# subprocess must never run if the gate blocks first.
|
|
monkeypatch.setattr(
|
|
"warden.proxy.subprocess.run",
|
|
lambda *a, **k: (_ for _ in ()).throw(AssertionError("fetch ran despite gate")),
|
|
)
|
|
r = runner.invoke(
|
|
app,
|
|
["access", "npm", "--domain", "coulomb_social", "--field", "NPM_AUTH_TOKEN",
|
|
"--path", "platform/x/y/z", "--fetch"],
|
|
)
|
|
assert r.exit_code == 4
|
|
assert "not enforced" in r.stdout or "not enforced" in str(r.output)
|
|
|
|
|
|
def test_cli_proxy_requires_caller_auth(monkeypatch, tmp_path):
|
|
_proxy_env(monkeypatch, tmp_path)
|
|
monkeypatch.delenv("VAULT_TOKEN", raising=False)
|
|
monkeypatch.delenv("BAO_TOKEN", raising=False)
|
|
monkeypatch.setattr(Path, "home", lambda: tmp_path)
|
|
r = runner.invoke(
|
|
app,
|
|
["access", "npm", "--domain", "coulomb_social", "--field", "NPM_AUTH_TOKEN",
|
|
"--path", "platform/x/y/z", "--fetch", "--no-policy"],
|
|
)
|
|
assert r.exit_code == 3
|
|
|
|
|
|
# --- T4: login lane --------------------------------------------------------
|
|
|
|
def test_cli_login_lane_runs_without_token_or_policy_ack(monkeypatch, tmp_path):
|
|
"""Login lane skips the caller-auth precheck and the secret-read gate."""
|
|
_proxy_env(monkeypatch, tmp_path)
|
|
monkeypatch.delenv("VAULT_TOKEN", raising=False)
|
|
monkeypatch.delenv("BAO_TOKEN", raising=False)
|
|
monkeypatch.setattr(Path, "home", lambda: tmp_path) # no ~/.vault-token
|
|
|
|
ran = {}
|
|
|
|
def fake_run(argv, **kw):
|
|
ran["argv"] = argv
|
|
ran["stdout"] = kw.get("stdout")
|
|
return subprocess.CompletedProcess(argv, 0)
|
|
|
|
monkeypatch.setattr("warden.proxy.subprocess.run", fake_run)
|
|
r = runner.invoke(app, ["access", "login oidc", "--domain", "coulomb_social", "--fetch"])
|
|
assert r.exit_code == 0
|
|
assert ran["argv"][:2] == ["bao", "login"] # interactive login ran
|
|
assert ran["stdout"] is None # inherited stdio — token not captured
|
|
|
|
|
|
def test_cli_login_lane_rejects_exec(monkeypatch, tmp_path):
|
|
_proxy_env(monkeypatch, tmp_path)
|
|
monkeypatch.setattr(
|
|
"warden.proxy.subprocess.run",
|
|
lambda *a, **k: (_ for _ in ()).throw(AssertionError("should not run")),
|
|
)
|
|
r = runner.invoke(
|
|
app, ["access", "login oidc", "--domain", "coulomb_social", "--exec", "--", "true"]
|
|
)
|
|
assert r.exit_code == 2
|
|
|
|
|
|
def test_real_catalog_login_entry_is_login_lane():
|
|
from warden.routing import load_catalog
|
|
e = load_catalog(_repo_catalog()).get("key-cape-oidc-login")
|
|
assert e is not None and e.lane == "login" and e.exec_capable
|
|
|
|
|
|
def test_invalid_lane_rejected(tmp_path):
|
|
import yaml
|
|
from warden.routing import CatalogError, load_catalog
|
|
entry = dict(
|
|
id="x", title="t", need_keywords=["k"], owner_repo="o", subsystem="s",
|
|
warden_executes=False, wiki_ref="w", canon_ref="c", reviewed="2026-06-27",
|
|
status="active", lane="bogus",
|
|
)
|
|
p = tmp_path / "c.yaml"
|
|
p.write_text(yaml.dump({"version": 1, "entries": [entry]}))
|
|
import pytest
|
|
with pytest.raises(CatalogError, match="invalid lane"):
|
|
load_catalog(p)
|