Files
ops-warden/tests/test_access.py
tegwick 2c513864bc feat(WARDEN-WP-0014): T2 — warden access advisory front door
Adds `warden access <need> [--domain X] [--json]`: resolves a credential
need against the routing catalog and renders the structured handoff
(owner, auth method, path template, command skeleton, policy gate
status, proxy hint). SSH lane points at `warden sign`; routed lanes end
"warden advises, the owner vends". New pure warden/access.py module
(expand_handoff, policy_gate_status) reused by the T3 proxy lane. JSON
output is stable and secret-free. tests/test_access.py added.

157 passed, lint clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-27 16:13:51 +02:00

102 lines
3.7 KiB
Python

"""Tests for the `warden access` operator front door (WP-0014 T2)."""
from __future__ import annotations
import json
from pathlib import Path
from typer.testing import CliRunner
from warden.access import expand_handoff, policy_gate_status
from warden.cli import app
from warden.routing.models import RouteEntry
runner = CliRunner()
def _repo_catalog() -> Path:
return Path(__file__).resolve().parents[1] / "registry" / "routing" / "catalog.yaml"
def _openbao_entry() -> RouteEntry:
return RouteEntry(
id="openbao-api-key",
title="API key, DB credential, or dynamic lease",
need_keywords=["api", "key", "npm", "token"],
owner_repo="railiance-platform",
subsystem="OpenBao",
warden_executes=False,
wiki_ref="wiki/CredentialRouting.md#routing-table",
canon_ref="net-kingdom/docs/x.md",
reviewed="2026-06-27",
status="active",
auth_method="key-cape OIDC → bao login -method=oidc role=<domain>",
path_template="platform/workloads/<domain>/<workload>/<bundle>",
fetch_command="bao kv get -field=<FIELD> <path_template>",
policy_ref="flex-auth check secret.read:<domain>",
exec_capable=True,
)
# --- pure expansion --------------------------------------------------------
def test_expand_inlines_path_template_token():
e = expand_handoff(_openbao_entry())
assert "<path_template>" not in e.fetch_command
assert e.fetch_command.startswith("bao kv get -field=<FIELD> platform/workloads/")
def test_expand_substitutes_domain():
e = expand_handoff(_openbao_entry(), domain="coulomb_social")
assert "coulomb_social" in e.path_template
assert "<domain>" not in e.path_template
assert "<domain>" not in e.auth_method
# owner-side names stay as placeholders — warden does not invent them
assert "<workload>" in e.path_template and "<bundle>" in e.path_template
def test_expand_without_domain_keeps_placeholder():
e = expand_handoff(_openbao_entry())
assert "<domain>" in e.path_template
def test_policy_gate_status_no_config(monkeypatch, tmp_path):
monkeypatch.setenv("WARDEN_CONFIG", str(tmp_path / "nope.yaml"))
assert "advisory" in policy_gate_status()
# --- CLI -------------------------------------------------------------------
def test_access_advisory_output(monkeypatch):
monkeypatch.setenv("WARDEN_ROUTING_CATALOG", str(_repo_catalog()))
r = runner.invoke(app, ["access", "npm token", "--domain", "coulomb_social"])
assert r.exit_code == 0
assert "railiance-platform" in r.stdout
assert "platform/workloads/coulomb_social/" in r.stdout
assert "does not hold this secret" in r.stdout
def test_access_json_shape_is_secret_free(monkeypatch):
monkeypatch.setenv("WARDEN_ROUTING_CATALOG", str(_repo_catalog()))
r = runner.invoke(app, ["access", "npm token", "--domain", "coulomb_social", "--json"])
assert r.exit_code == 0
payload = json.loads(r.stdout)
assert payload["id"] == "openbao-api-key"
assert payload["domain"] == "coulomb_social"
assert payload["handoff"]["exec_capable"] is True
# only placeholders/templates — never a concrete credential
assert "<FIELD>" in payload["handoff"]["fetch_command"]
def test_access_ssh_lane_points_to_sign(monkeypatch):
monkeypatch.setenv("WARDEN_ROUTING_CATALOG", str(_repo_catalog()))
r = runner.invoke(app, ["access", "ssh cert for host access"])
assert r.exit_code == 0
assert "issues this directly" in r.stdout
assert "warden sign" in r.stdout
def test_access_no_match_exits_nonzero(monkeypatch):
monkeypatch.setenv("WARDEN_ROUTING_CATALOG", str(_repo_catalog()))
r = runner.invoke(app, ["access", "zzzz-no-such-need-xyzzy"])
assert r.exit_code == 1