generated from coulomb/repo-seed
Adds `warden access <need> [--domain X] [--json]`: resolves a credential need against the routing catalog and renders the structured handoff (owner, auth method, path template, command skeleton, policy gate status, proxy hint). SSH lane points at `warden sign`; routed lanes end "warden advises, the owner vends". New pure warden/access.py module (expand_handoff, policy_gate_status) reused by the T3 proxy lane. JSON output is stable and secret-free. tests/test_access.py added. 157 passed, lint clean. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
102 lines
3.7 KiB
Python
102 lines
3.7 KiB
Python
"""Tests for the `warden access` operator front door (WP-0014 T2)."""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from pathlib import Path
|
|
|
|
from typer.testing import CliRunner
|
|
|
|
from warden.access import expand_handoff, policy_gate_status
|
|
from warden.cli import app
|
|
from warden.routing.models import RouteEntry
|
|
|
|
runner = CliRunner()
|
|
|
|
|
|
def _repo_catalog() -> Path:
|
|
return Path(__file__).resolve().parents[1] / "registry" / "routing" / "catalog.yaml"
|
|
|
|
|
|
def _openbao_entry() -> RouteEntry:
|
|
return RouteEntry(
|
|
id="openbao-api-key",
|
|
title="API key, DB credential, or dynamic lease",
|
|
need_keywords=["api", "key", "npm", "token"],
|
|
owner_repo="railiance-platform",
|
|
subsystem="OpenBao",
|
|
warden_executes=False,
|
|
wiki_ref="wiki/CredentialRouting.md#routing-table",
|
|
canon_ref="net-kingdom/docs/x.md",
|
|
reviewed="2026-06-27",
|
|
status="active",
|
|
auth_method="key-cape OIDC → bao login -method=oidc role=<domain>",
|
|
path_template="platform/workloads/<domain>/<workload>/<bundle>",
|
|
fetch_command="bao kv get -field=<FIELD> <path_template>",
|
|
policy_ref="flex-auth check secret.read:<domain>",
|
|
exec_capable=True,
|
|
)
|
|
|
|
|
|
# --- pure expansion --------------------------------------------------------
|
|
|
|
def test_expand_inlines_path_template_token():
|
|
e = expand_handoff(_openbao_entry())
|
|
assert "<path_template>" not in e.fetch_command
|
|
assert e.fetch_command.startswith("bao kv get -field=<FIELD> platform/workloads/")
|
|
|
|
|
|
def test_expand_substitutes_domain():
|
|
e = expand_handoff(_openbao_entry(), domain="coulomb_social")
|
|
assert "coulomb_social" in e.path_template
|
|
assert "<domain>" not in e.path_template
|
|
assert "<domain>" not in e.auth_method
|
|
# owner-side names stay as placeholders — warden does not invent them
|
|
assert "<workload>" in e.path_template and "<bundle>" in e.path_template
|
|
|
|
|
|
def test_expand_without_domain_keeps_placeholder():
|
|
e = expand_handoff(_openbao_entry())
|
|
assert "<domain>" in e.path_template
|
|
|
|
|
|
def test_policy_gate_status_no_config(monkeypatch, tmp_path):
|
|
monkeypatch.setenv("WARDEN_CONFIG", str(tmp_path / "nope.yaml"))
|
|
assert "advisory" in policy_gate_status()
|
|
|
|
|
|
# --- CLI -------------------------------------------------------------------
|
|
|
|
def test_access_advisory_output(monkeypatch):
|
|
monkeypatch.setenv("WARDEN_ROUTING_CATALOG", str(_repo_catalog()))
|
|
r = runner.invoke(app, ["access", "npm token", "--domain", "coulomb_social"])
|
|
assert r.exit_code == 0
|
|
assert "railiance-platform" in r.stdout
|
|
assert "platform/workloads/coulomb_social/" in r.stdout
|
|
assert "does not hold this secret" in r.stdout
|
|
|
|
|
|
def test_access_json_shape_is_secret_free(monkeypatch):
|
|
monkeypatch.setenv("WARDEN_ROUTING_CATALOG", str(_repo_catalog()))
|
|
r = runner.invoke(app, ["access", "npm token", "--domain", "coulomb_social", "--json"])
|
|
assert r.exit_code == 0
|
|
payload = json.loads(r.stdout)
|
|
assert payload["id"] == "openbao-api-key"
|
|
assert payload["domain"] == "coulomb_social"
|
|
assert payload["handoff"]["exec_capable"] is True
|
|
# only placeholders/templates — never a concrete credential
|
|
assert "<FIELD>" in payload["handoff"]["fetch_command"]
|
|
|
|
|
|
def test_access_ssh_lane_points_to_sign(monkeypatch):
|
|
monkeypatch.setenv("WARDEN_ROUTING_CATALOG", str(_repo_catalog()))
|
|
r = runner.invoke(app, ["access", "ssh cert for host access"])
|
|
assert r.exit_code == 0
|
|
assert "issues this directly" in r.stdout
|
|
assert "warden sign" in r.stdout
|
|
|
|
|
|
def test_access_no_match_exits_nonzero(monkeypatch):
|
|
monkeypatch.setenv("WARDEN_ROUTING_CATALOG", str(_repo_catalog()))
|
|
r = runner.invoke(app, ["access", "zzzz-no-such-need-xyzzy"])
|
|
assert r.exit_code == 1
|