generated from coulomb/repo-seed
Add a read-only `warden route` command group (list/show/find) that reads registry/routing/catalog.yaml and tells a worker which subsystem owns a need and which wiki/canon doc to follow. ops-warden still executes exactly one lane (SSH); routed entries return a pointer and never call any subsystem. - src/warden/routing/: models.py + catalog.py loader; enforces the no-double-source rule (non-SSH entries with steps/cert_command fail validation), dup-id and schema checks. - route list (active-only unless --all, --tag), route show (SSH appends steps + cert pattern; routed ends with "next action on <owner> — see <wiki_ref>"), route find (keyword ranking, --json). - tests/test_routing.py: load/validation, find ranking, CLI JSON shapes, plus a drift guard (every wiki_ref anchor resolves; every entry has a reviewed date). - Docs: wiki/AccessRouting.md CLI section, README quick reference, SCOPE A3 -> A4. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
226 lines
7.5 KiB
Python
226 lines
7.5 KiB
Python
"""Tests for the routing pointer catalog and `warden route` CLI.
|
|
|
|
No test here requires a live subsystem — routing is a read-only pointer layer.
|
|
"""
|
|
import json
|
|
import re
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
import yaml
|
|
from typer.testing import CliRunner
|
|
|
|
from warden.cli import app
|
|
from warden.routing import CatalogError, load_catalog
|
|
from warden.routing.catalog import find_catalog_path
|
|
|
|
runner = CliRunner()
|
|
|
|
|
|
def _repo_catalog() -> Path:
|
|
return find_catalog_path()
|
|
|
|
|
|
def _write_catalog(tmp_path: Path, entries: list[dict]) -> Path:
|
|
path = tmp_path / "catalog.yaml"
|
|
path.write_text(yaml.dump({"version": 1, "entries": entries}))
|
|
return path
|
|
|
|
|
|
SSH_ENTRY = {
|
|
"id": "ssh-cert-host-access",
|
|
"title": "SSH cert",
|
|
"need_keywords": ["ssh", "cert", "sign"],
|
|
"owner_repo": "ops-warden",
|
|
"subsystem": "ops-warden",
|
|
"warden_executes": True,
|
|
"wiki_ref": "wiki/AccessRouting.md#issue-vs-route",
|
|
"canon_ref": "net-kingdom/docs/x.md",
|
|
"reviewed": "2026-06-18",
|
|
"status": "active",
|
|
"cert_command": "warden sign <actor> --pubkey <path>",
|
|
"steps": ["confirm inventory", "sign"],
|
|
}
|
|
|
|
ROUTED_ENTRY = {
|
|
"id": "openbao-api-key",
|
|
"title": "API key",
|
|
"need_keywords": ["api", "key", "openbao"],
|
|
"owner_repo": "railiance-platform",
|
|
"subsystem": "OpenBao",
|
|
"warden_executes": False,
|
|
"wiki_ref": "wiki/CredentialRouting.md#routing-table",
|
|
"canon_ref": "net-kingdom/docs/x.md",
|
|
"reviewed": "2026-06-18",
|
|
"status": "active",
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Catalog load + validation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_real_catalog_loads():
|
|
catalog = load_catalog(_repo_catalog())
|
|
assert len(catalog.entries) >= 6
|
|
ssh = catalog.get("ssh-cert-host-access")
|
|
assert ssh is not None and ssh.warden_executes is True
|
|
assert ssh.cert_command and "warden sign" in ssh.cert_command
|
|
|
|
|
|
def test_real_catalog_has_one_executed_lane():
|
|
catalog = load_catalog(_repo_catalog())
|
|
executed = [e for e in catalog.entries if e.warden_executes]
|
|
assert [e.id for e in executed] == ["ssh-cert-host-access"]
|
|
|
|
|
|
def test_no_double_source_rule_rejects_routed_steps(tmp_path):
|
|
bad = dict(ROUTED_ENTRY)
|
|
bad["steps"] = ["do a thing on OpenBao"] # non-SSH entry must not carry steps
|
|
path = _write_catalog(tmp_path, [SSH_ENTRY, bad])
|
|
with pytest.raises(CatalogError, match="no-double-source"):
|
|
load_catalog(path)
|
|
|
|
|
|
def test_routed_cert_command_rejected(tmp_path):
|
|
bad = dict(ROUTED_ENTRY)
|
|
bad["cert_command"] = "warden secret get"
|
|
path = _write_catalog(tmp_path, [bad])
|
|
with pytest.raises(CatalogError, match="cert_command"):
|
|
load_catalog(path)
|
|
|
|
|
|
def test_duplicate_id_rejected(tmp_path):
|
|
path = _write_catalog(tmp_path, [ROUTED_ENTRY, dict(ROUTED_ENTRY)])
|
|
with pytest.raises(CatalogError, match="duplicate"):
|
|
load_catalog(path)
|
|
|
|
|
|
def test_missing_field_rejected(tmp_path):
|
|
bad = {k: v for k, v in ROUTED_ENTRY.items() if k != "owner_repo"}
|
|
path = _write_catalog(tmp_path, [bad])
|
|
with pytest.raises(CatalogError, match="owner_repo"):
|
|
load_catalog(path)
|
|
|
|
|
|
def test_missing_catalog_file():
|
|
with pytest.raises(CatalogError):
|
|
load_catalog(Path("/nonexistent/catalog.yaml"))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# find ranking
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_find_active_excludes_draft():
|
|
catalog = load_catalog(_repo_catalog())
|
|
ids = [e.id for e in catalog.find("issue core api key")]
|
|
assert "issue-core-ingestion-api-key" not in ids
|
|
|
|
|
|
def test_find_all_includes_draft():
|
|
catalog = load_catalog(_repo_catalog())
|
|
ids = [e.id for e in catalog.find("issue core api key", include_draft=True)]
|
|
assert "issue-core-ingestion-api-key" in ids
|
|
|
|
|
|
def test_find_ssh_tunnel_top_match():
|
|
catalog = load_catalog(_repo_catalog())
|
|
matches = catalog.find("ssh tunnel")
|
|
assert matches and matches[0].id == "ops-bridge-tunnel"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI (uses the repo catalog via env override)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.fixture
|
|
def repo_catalog_env(monkeypatch):
|
|
monkeypatch.setenv("WARDEN_ROUTING_CATALOG", str(_repo_catalog()))
|
|
|
|
|
|
def test_cli_list_active_only(repo_catalog_env):
|
|
result = runner.invoke(app, ["route", "list", "--json"])
|
|
assert result.exit_code == 0
|
|
ids = [e["id"] for e in json.loads(result.stdout)]
|
|
assert "issue-core-ingestion-api-key" not in ids
|
|
|
|
|
|
def test_cli_list_all_includes_draft(repo_catalog_env):
|
|
result = runner.invoke(app, ["route", "list", "--all", "--json"])
|
|
ids = [e["id"] for e in json.loads(result.stdout)]
|
|
assert "issue-core-ingestion-api-key" in ids
|
|
|
|
|
|
def test_cli_show_ssh_json_includes_cert_pattern(repo_catalog_env):
|
|
result = runner.invoke(app, ["route", "show", "ssh-cert-host-access", "--json"])
|
|
assert result.exit_code == 0
|
|
data = json.loads(result.stdout)
|
|
assert data["warden_executes"] is True
|
|
assert "warden sign" in data["cert_command"]
|
|
assert data["steps"]
|
|
|
|
|
|
def test_cli_show_routed_has_next_action_not_steps(repo_catalog_env):
|
|
result = runner.invoke(app, ["route", "show", "openbao-api-key", "--json"])
|
|
data = json.loads(result.stdout)
|
|
assert data["warden_executes"] is False
|
|
assert "steps" not in data
|
|
assert "next_action" in data
|
|
|
|
|
|
def test_cli_show_unknown_exits_one(repo_catalog_env):
|
|
result = runner.invoke(app, ["route", "show", "does-not-exist"])
|
|
assert result.exit_code == 1
|
|
|
|
|
|
def test_cli_find_json(repo_catalog_env):
|
|
result = runner.invoke(app, ["route", "find", "ssh tunnel", "--json"])
|
|
assert result.exit_code == 0
|
|
ids = [e["id"] for e in json.loads(result.stdout)]
|
|
assert "ops-bridge-tunnel" in ids
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# T5 drift guard — every wiki_ref anchor resolves, every entry has a reviewed date
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _github_slug(heading: str) -> str:
|
|
"""Approximate GitHub's heading-anchor slug algorithm."""
|
|
text = heading.strip().lower()
|
|
text = re.sub(r"[^\w\s-]", "", text) # drop punctuation (em-dash, parens, etc.)
|
|
text = text.replace(" ", "-")
|
|
return text
|
|
|
|
|
|
def _heading_anchors(md_path: Path) -> set[str]:
|
|
anchors: set[str] = set()
|
|
for line in md_path.read_text().splitlines():
|
|
m = re.match(r"^#{1,6}\s+(.*)$", line)
|
|
if m:
|
|
anchors.add(_github_slug(m.group(1)))
|
|
return anchors
|
|
|
|
|
|
def test_every_wiki_ref_anchor_resolves():
|
|
catalog = load_catalog(_repo_catalog())
|
|
repo_root = _repo_catalog().parents[2] # registry/routing/catalog.yaml -> repo root
|
|
failures = []
|
|
for entry in catalog.entries:
|
|
rel, _, anchor = entry.wiki_ref.partition("#")
|
|
md_path = repo_root / rel
|
|
if not md_path.exists():
|
|
failures.append(f"{entry.id}: wiki file missing: {rel}")
|
|
continue
|
|
if anchor and anchor not in _heading_anchors(md_path):
|
|
failures.append(f"{entry.id}: anchor #{anchor} not found in {rel}")
|
|
assert not failures, "\n".join(failures)
|
|
|
|
|
|
def test_every_entry_has_reviewed_date():
|
|
catalog = load_catalog(_repo_catalog())
|
|
for entry in catalog.entries:
|
|
assert re.match(r"^\d{4}-\d{2}-\d{2}$", entry.reviewed), (
|
|
f"{entry.id}: reviewed must be YYYY-MM-DD, got {entry.reviewed!r}"
|
|
)
|