Files
ops-warden/tests/test_routing.py
tegwick 46b340f45f feat(WARDEN-WP-0017): make the access front door discoverable (not SSH-only)
WP-0014 made ops-warden the operator access front door (warden access --fetch/--exec
proxies an exec_capable secret as the caller), but every discovery surface still told
the pre-WP-0014 "SSH certs only, pointer not key" story — so agents like whynot-design
never found the proxy and concluded they had to message ops-warden for a token value.

Messaging/discoverability only; the conduit security model is unchanged (no custody,
no broker).

T1 — CLI: `warden route` table warden column is now three-valued (issue/assist/route);
route + access JSON gain warden_role + exec_capable and a proxy-aware next_action;
`warden access` closing line leads with "ops-warden can fetch this for you as the
caller" for exec_capable lanes (route-only lanes keep "owner vends").

T2 — .claude/rules/credential-routing.md reframed (lead + routing table role column);
SCOPE one-liner + a second capability block for the access front door.

T3 — registered the State Hub capability "Operator access front door (caller-identity
fetch proxy)" (the hub had no ops-warden security capability at all); messaged
whynot-design the corrected `warden access "npm auth token" --fetch/--exec` path.

210 tests pass, lint clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-27 21:02:46 +02:00

350 lines
12 KiB
Python

"""Tests for the routing pointer catalog and `warden route` CLI.
No test here requires a live subsystem — routing is a read-only pointer layer.
"""
import json
import re
from pathlib import Path
import pytest
import yaml
from typer.testing import CliRunner
from warden.cli import app
from datetime import date
from warden.routing import CatalogError, load_catalog
from warden.routing.catalog import days_since_review, find_catalog_path, is_review_stale
runner = CliRunner()
def _repo_catalog() -> Path:
return find_catalog_path()
def _write_catalog(tmp_path: Path, entries: list[dict]) -> Path:
path = tmp_path / "catalog.yaml"
path.write_text(yaml.dump({"version": 1, "entries": entries}))
return path
SSH_ENTRY = {
"id": "ssh-cert-host-access",
"title": "SSH cert",
"need_keywords": ["ssh", "cert", "sign"],
"owner_repo": "ops-warden",
"subsystem": "ops-warden",
"warden_executes": True,
"wiki_ref": "wiki/AccessRouting.md#issue-vs-route",
"canon_ref": "net-kingdom/docs/x.md",
"reviewed": "2026-06-18",
"status": "active",
"cert_command": "warden sign <actor> --pubkey <path>",
"steps": ["confirm inventory", "sign"],
}
ROUTED_ENTRY = {
"id": "openbao-api-key",
"title": "API key",
"need_keywords": ["api", "key", "openbao"],
"owner_repo": "railiance-platform",
"subsystem": "OpenBao",
"warden_executes": False,
"wiki_ref": "wiki/CredentialRouting.md#routing-table",
"canon_ref": "net-kingdom/docs/x.md",
"reviewed": "2026-06-18",
"status": "active",
}
# ---------------------------------------------------------------------------
# Catalog load + validation
# ---------------------------------------------------------------------------
def test_real_catalog_loads():
catalog = load_catalog(_repo_catalog())
assert len(catalog.entries) >= 6
ssh = catalog.get("ssh-cert-host-access")
assert ssh is not None and ssh.warden_executes is True
assert ssh.cert_command and "warden sign" in ssh.cert_command
def test_real_catalog_has_one_executed_lane():
catalog = load_catalog(_repo_catalog())
executed = [e for e in catalog.entries if e.warden_executes]
assert [e.id for e in executed] == ["ssh-cert-host-access"]
def test_no_double_source_rule_rejects_routed_steps(tmp_path):
bad = dict(ROUTED_ENTRY)
bad["steps"] = ["do a thing on OpenBao"] # non-SSH entry must not carry steps
path = _write_catalog(tmp_path, [SSH_ENTRY, bad])
with pytest.raises(CatalogError, match="no-double-source"):
load_catalog(path)
def test_routed_cert_command_rejected(tmp_path):
bad = dict(ROUTED_ENTRY)
bad["cert_command"] = "warden secret get"
path = _write_catalog(tmp_path, [bad])
with pytest.raises(CatalogError, match="cert_command"):
load_catalog(path)
def test_duplicate_id_rejected(tmp_path):
path = _write_catalog(tmp_path, [ROUTED_ENTRY, dict(ROUTED_ENTRY)])
with pytest.raises(CatalogError, match="duplicate"):
load_catalog(path)
def test_missing_field_rejected(tmp_path):
bad = {k: v for k, v in ROUTED_ENTRY.items() if k != "owner_repo"}
path = _write_catalog(tmp_path, [bad])
with pytest.raises(CatalogError, match="owner_repo"):
load_catalog(path)
def test_missing_catalog_file():
with pytest.raises(CatalogError):
load_catalog(Path("/nonexistent/catalog.yaml"))
# ---------------------------------------------------------------------------
# Structured handoff fields (WP-0014, T1)
# ---------------------------------------------------------------------------
def test_handoff_fields_parse_on_routed_entry(tmp_path):
entry = dict(ROUTED_ENTRY)
entry["auth_method"] = "key-cape OIDC → bao login -method=oidc role=<domain>"
entry["path_template"] = "platform/workloads/<domain>/<workload>/<bundle>"
entry["fetch_command"] = "bao kv get -field=<FIELD> <path_template>"
entry["policy_ref"] = "flex-auth check secret.read:<domain>"
entry["exec_capable"] = True
catalog = load_catalog(_write_catalog(tmp_path, [entry]))
e = catalog.get("openbao-api-key")
assert e.has_handoff is True
assert e.exec_capable is True
assert e.path_template.startswith("platform/workloads/")
def test_real_catalog_openbao_entry_has_handoff():
e = load_catalog(_repo_catalog()).get("openbao-api-key")
assert e is not None and e.has_handoff and e.exec_capable
assert "<" in e.path_template and "<" in e.fetch_command # templates, not values
def test_exec_capable_without_fetch_command_rejected(tmp_path):
bad = dict(ROUTED_ENTRY)
bad["exec_capable"] = True # no fetch_command
with pytest.raises(CatalogError, match="fetch_command"):
load_catalog(_write_catalog(tmp_path, [bad]))
@pytest.mark.parametrize(
"leaked",
[
"bao write x token=ghp_abcdef0123456789abcdef0123", # github token prefix
"x=AKIAIOSFODNN7EXAMPLE", # aws key id
"header=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9", # jwt prefix
"val=ZmFrZXNlY3JldDEyMzQ1Njc4OWFiY2RlZmdoaWprbA", # high-entropy run
],
)
def test_handoff_secret_material_rejected(tmp_path, leaked):
bad = dict(ROUTED_ENTRY)
bad["fetch_command"] = leaked
with pytest.raises(CatalogError, match="secret|high-entropy"):
load_catalog(_write_catalog(tmp_path, [bad]))
def test_handoff_template_with_placeholders_accepted(tmp_path):
ok = dict(ROUTED_ENTRY)
ok["fetch_command"] = "bao kv get -field=<FIELD> platform/workloads/<domain>/<bundle>"
catalog = load_catalog(_write_catalog(tmp_path, [ok]))
assert catalog.get("openbao-api-key").fetch_command.startswith("bao kv get")
# ---------------------------------------------------------------------------
# find ranking
# ---------------------------------------------------------------------------
def test_find_active_excludes_draft():
catalog = load_catalog(_repo_catalog())
ids = [e.id for e in catalog.find("issue core api key")]
assert "issue-core-ingestion-api-key" not in ids
def test_find_all_includes_draft():
catalog = load_catalog(_repo_catalog())
ids = [e.id for e in catalog.find("issue core api key", include_draft=True)]
assert "issue-core-ingestion-api-key" in ids
def test_find_ssh_tunnel_top_match():
catalog = load_catalog(_repo_catalog())
matches = catalog.find("ssh tunnel")
assert matches and matches[0].id == "ops-bridge-tunnel"
def test_find_openrouter_key():
catalog = load_catalog(_repo_catalog())
matches = catalog.find("openrouter api key", include_draft=True)
assert matches and matches[0].id == "openrouter-llm-connect"
def test_find_object_storage_sts():
catalog = load_catalog(_repo_catalog())
matches = catalog.find("s3 temporary credentials", include_draft=True)
assert matches and matches[0].id == "object-storage-sts"
# ---------------------------------------------------------------------------
# Review staleness
# ---------------------------------------------------------------------------
def test_days_since_review():
assert days_since_review("2026-06-01", today=date(2026, 6, 24)) == 23
def test_is_review_stale_past_threshold():
assert is_review_stale("2026-01-01", threshold_days=90, today=date(2026, 6, 24))
def test_is_review_stale_within_threshold():
assert not is_review_stale("2026-06-01", threshold_days=90, today=date(2026, 6, 24))
def test_catalog_stale_filters_entries():
catalog = load_catalog(_repo_catalog())
stale = catalog.stale(threshold_days=0, today=date(2026, 6, 25))
assert stale
assert all(e.reviewed <= "2026-06-24" for e in stale)
# ---------------------------------------------------------------------------
# CLI (uses the repo catalog via env override)
# ---------------------------------------------------------------------------
@pytest.fixture
def repo_catalog_env(monkeypatch):
monkeypatch.setenv("WARDEN_ROUTING_CATALOG", str(_repo_catalog()))
def test_cli_list_active_only(repo_catalog_env):
result = runner.invoke(app, ["route", "list", "--json"])
assert result.exit_code == 0
ids = [e["id"] for e in json.loads(result.stdout)]
assert "issue-core-ingestion-api-key" not in ids
def test_cli_list_all_includes_draft(repo_catalog_env):
result = runner.invoke(app, ["route", "list", "--all", "--json"])
ids = [e["id"] for e in json.loads(result.stdout)]
assert "issue-core-ingestion-api-key" in ids
def test_cli_show_ssh_json_includes_cert_pattern(repo_catalog_env):
result = runner.invoke(app, ["route", "show", "ssh-cert-host-access", "--json"])
assert result.exit_code == 0
data = json.loads(result.stdout)
assert data["warden_executes"] is True
assert data["warden_role"] == "issue"
assert "warden sign" in data["cert_command"]
assert data["steps"]
def test_cli_show_routed_has_next_action_not_steps(repo_catalog_env):
result = runner.invoke(app, ["route", "show", "openbao-api-key", "--json"])
data = json.loads(result.stdout)
assert data["warden_executes"] is False
# exec_capable lane surfaces as an "assist" role so agents see it is proxyable.
assert data["warden_role"] == "assist"
assert data["exec_capable"] is True
assert "steps" not in data
assert "next_action" in data
assert "proxy" in data["next_action"]
def test_cli_show_unknown_exits_one(repo_catalog_env):
result = runner.invoke(app, ["route", "show", "does-not-exist"])
assert result.exit_code == 1
def test_cli_find_json(repo_catalog_env):
result = runner.invoke(app, ["route", "find", "ssh tunnel", "--json"])
assert result.exit_code == 0
ids = [e["id"] for e in json.loads(result.stdout)]
assert "ops-bridge-tunnel" in ids
def test_cli_list_stale_json(repo_catalog_env):
result = runner.invoke(
app, ["route", "list", "--stale", "--stale-days", "1", "--json"]
)
assert result.exit_code == 0
data = json.loads(result.stdout)
assert data
assert all("days_since_review" in row for row in data)
assert all(row["stale_threshold_days"] == 1 for row in data)
def test_cli_list_stale_empty_with_high_threshold(repo_catalog_env):
result = runner.invoke(
app, ["route", "list", "--stale", "--stale-days", "9999"]
)
assert result.exit_code == 0
assert "No stale" in result.output
def test_cli_find_openrouter_draft_only_with_all(repo_catalog_env):
result = runner.invoke(
app, ["route", "find", "openrouter api key", "--all", "--json"]
)
assert result.exit_code == 0
ids = [e["id"] for e in json.loads(result.stdout)]
assert "openrouter-llm-connect" in ids
# ---------------------------------------------------------------------------
# T5 drift guard — every wiki_ref anchor resolves, every entry has a reviewed date
# ---------------------------------------------------------------------------
def _github_slug(heading: str) -> str:
"""Approximate GitHub's heading-anchor slug algorithm."""
text = heading.strip().lower()
text = re.sub(r"[^\w\s-]", "", text) # drop punctuation (em-dash, parens, etc.)
text = text.replace(" ", "-")
return text
def _heading_anchors(md_path: Path) -> set[str]:
anchors: set[str] = set()
for line in md_path.read_text().splitlines():
m = re.match(r"^#{1,6}\s+(.*)$", line)
if m:
anchors.add(_github_slug(m.group(1)))
return anchors
def test_every_wiki_ref_anchor_resolves():
catalog = load_catalog(_repo_catalog())
repo_root = _repo_catalog().parents[2] # registry/routing/catalog.yaml -> repo root
failures = []
for entry in catalog.entries:
rel, _, anchor = entry.wiki_ref.partition("#")
md_path = repo_root / rel
if not md_path.exists():
failures.append(f"{entry.id}: wiki file missing: {rel}")
continue
if anchor and anchor not in _heading_anchors(md_path):
failures.append(f"{entry.id}: anchor #{anchor} not found in {rel}")
assert not failures, "\n".join(failures)
def test_every_entry_has_reviewed_date():
catalog = load_catalog(_repo_catalog())
for entry in catalog.entries:
assert re.match(r"^\d{4}-\d{2}-\d{2}$", entry.reviewed), (
f"{entry.id}: reviewed must be YYYY-MM-DD, got {entry.reviewed!r}"
)